Pulsar 函数

Spring for Apache Pulsar 为 Pulsar IO(连接器)和 Pulsar Functions 提供基本支持,允许用户定义由sources,processorssinks. 这sourcessinksPulsar IO(连接器)建模,而processorsPulsar Functions 表示。spring-doc.cadn.net.cn

因为连接器只是特殊功能,为了简单起见,我们将 source、sink 和 functions 统称为“Pulsar 函数”。
先决条件

熟悉度 - 观众应该对 Pulsar IOPulsar 函数有所熟悉。 如果不是这种情况,查看他们的入门指南可能会有所帮助。spring-doc.cadn.net.cn

功能已启用 - 要使用这些功能,必须启用并配置 Apache Pulsar 中支持的函数(默认情况下是禁用的)。 内置连接器可能还需要安装在 Pulsar 集群上。spring-doc.cadn.net.cn

有关更多详细信息,请参阅 Pulsar IOPulsar Functions 文档。spring-doc.cadn.net.cn

1. Pulsar 函数管理

该框架提供了PulsarFunctionAdministration组件来管理 Pulsar 函数。 当您使用 Pulsar Spring Boot Starters时,您将获得PulsarFunctionAdministrationauto-configured的。spring-doc.cadn.net.cn

默认情况下,应用程序会尝试连接到位于localhost:8080. 但是,由于它利用了已配置的PulsarAdministration,请参阅 Pulsar Admin Client 了解可用的客户端选项(包括身份验证)。 其他配置选项可通过spring.pulsar.function.*应用程序属性。spring-doc.cadn.net.cn

2. 自动功能管理

在应用程序启动时,框架会查找所有PulsarFunction,PulsarSinkPulsarSourcebean 中的对象。 对于每个 bean,都会创建或更新相应的 Pulsar 函数。 根据函数类型、函数配置以及函数是否已存在来调用适当的 API。spring-doc.cadn.net.cn

PulsarFunction,PulsarSinkPulsarSourcebean 是 Apache Pulsar 配置对象的简单包装器FunctionConfig,SinkConfigSourceConfig分别。 由于支持的连接器数量众多(及其不同的配置),框架不会尝试创建配置属性层次结构来镜像不同的 Apache Pulsar 连接器。 相反,用户有责任提供完整的 config 对象,然后框架使用提供的 config 处理 Management (create/update)。

在应用程序关闭时,应用程序启动期间处理的所有函数都会强制执行其停止策略,并且要么单独保留、停止或从 Pulsar 服务器中删除。spring-doc.cadn.net.cn

3. 限制

3.1. 没有 Magic Pulsar 函数

Pulsar 函数和自定义连接器由自定义应用程序代码(例如java.util.Function). 没有自动注册自定义代码的神奇支持。 虽然这将是惊人的,但它存在一些技术挑战并且尚未实现。 因此,用户有责任确保函数(或自定义连接器)在函数配置中指定的位置可用。 例如,如果函数配置具有jar的值./some/path/MyFunction.jar则函数 jar 文件必须位于指定路径下。spring-doc.cadn.net.cn

3.2. 名称标识符

name属性用作标识符来确定函数是否已经存在,以便决定是否执行 UPDATE 或 CREATE作。 因此,如果需要更新函数,则不应修改名称。spring-doc.cadn.net.cn

4. 配置

4.1. Pulsar 函数归档

每个 Pulsar 函数都由一个实际的存档(eg. jar 文件)表示。 存档的路径是通过archive属性,以及jar函数的属性。spring-doc.cadn.net.cn

以下规则确定 path 的 “type” :spring-doc.cadn.net.cn

在创建/更新作期间发生的作取决于路径 “type”,如下所示:spring-doc.cadn.net.cn

4.2. 内置 source 和 sink

Apache Pulsar 提供了许多开箱即用的 source 和 sink 连接器,也就是内置连接器。要使用内置连接器,只需将archivebuiltin://<connector-type>(例如builtin://rabbit).spring-doc.cadn.net.cn

5. 自定义函数

有关如何开发和打包自定义函数的详细信息,请参阅 Pulsar 文档。 但是,在高级别上,要求如下:spring-doc.cadn.net.cn

构建和打包函数后,有几种方法可以使其可用于函数注册。spring-doc.cadn.net.cn

5.1. file://

jar 文件可以上传到服务器,然后通过file://jarfunction config 的属性spring-doc.cadn.net.cn

5.2. 本地

jar 文件可以保持在本地,然后通过jar函数 config 的属性。spring-doc.cadn.net.cn

5.3. http://

jar 文件可以通过 HTTP 服务器提供,然后通过http(s)://jarfunction config 的属性spring-doc.cadn.net.cn

5.4. function://

jar 文件可以上传到 Pulsar 包管理器,然后通过function://jarfunction config 的属性spring-doc.cadn.net.cn

6. 示例

以下是一些示例,演示如何配置PulsarSourcebean 的 Bean 生成PulsarFunctionAdministration自动创建支持的 Pulsar 源连接器。spring-doc.cadn.net.cn

使用内置 Rabbit 连接器的 PulsarSource
@Bean
PulsarSource rabbitSource() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", "my.rabbit.host");
    configs.put("port", 5672);
    configs.put("virtualHost", "/");
    configs.put("username", "guest");
    configs.put("password", "guest");
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

下一个示例与上一个示例相同,不同之处在于它使用自动配置的 Spring BootRabbitProperties以减轻配置负担。这当然要求应用程序在启用 Rabbit 自动配置的情况下使用 Spring Boot。spring-doc.cadn.net.cn

使用内置 Rabbit 连接器和 Spring Boot RabbitProperties 的 PulsarSource
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", props.determineHost());
    configs.put("port", props.determinePort());
    configs.put("virtualHost", props.determineVirtualHost());
    configs.put("username", props.determineUsername());
    configs.put("password", props.determinePassword());
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}
有关更详细的示例,请参阅使用 Pulsar Functions 的示例流管道示例应用程序

APP信息