Pulsar 函数

Spring for Apache Pulsar 为 Pulsar IO(连接器)和 Pulsar Functions 提供基本支持,允许用户定义由 、 和 组成的流处理管道。 和 由 Pulsar IO(连接器)建模,而 则由 Pulsar Functions 表示。sourcesprocessorssinkssourcessinksprocessorsspring-doc.cn

因为连接器只是特殊功能,为了简单起见,我们将 source、sink 和 functions 统称为“Pulsar 函数”。
先决条件

熟悉度 - 观众应该对 Pulsar IOPulsar 函数有所熟悉。 如果不是这种情况,查看他们的入门指南可能会有所帮助。spring-doc.cn

功能已启用 - 要使用这些功能,必须启用并配置 Apache Pulsar 中支持的函数(默认情况下是禁用的)。 内置连接器可能还需要安装在 Pulsar 集群上。spring-doc.cn

有关更多详细信息,请参阅 Pulsar IOPulsar Functions 文档。spring-doc.cn

1. Pulsar 函数管理

该框架提供了管理 Pulsar 函数的组件。 当您使用 Pulsar Spring Boot Starters时,您将获得自动配置。PulsarFunctionAdministrationPulsarFunctionAdministrationspring-doc.cn

默认情况下,应用程序会尝试连接到 位于 的本地 Pulsar 实例。 但是,由于它利用了已配置的 ,请参阅 Pulsar Admin Client 以获取可用的客户端选项(包括身份验证)。 spring.pulsar.function.* 应用程序属性提供了其他配置选项。localhost:8080PulsarAdministrationspring-doc.cn

2. 自动功能管理

在应用程序启动时,框架会在应用程序上下文中查找所有 、 和 bean。 对于每个 bean,都会创建或更新相应的 Pulsar 函数。 根据函数类型、函数配置以及函数是否已存在来调用适当的 API。PulsarFunctionPulsarSinkPulsarSourcespring-doc.cn

、 和 bean 分别是 Apache Pulsar 配置对象 、 、 和 的简单包装器。 由于支持的连接器数量众多(及其不同的配置),框架不会尝试创建配置属性层次结构来镜像不同的 Apache Pulsar 连接器。 相反,用户有责任提供完整的 config 对象,然后框架使用提供的 config 处理 Management (create/update)。PulsarFunctionPulsarSinkPulsarSourceFunctionConfigSinkConfigSourceConfig

在应用程序关闭时,应用程序启动期间处理的所有函数都会强制执行其停止策略,并且要么单独保留、停止或从 Pulsar 服务器中删除。spring-doc.cn

3. 限制

3.1. 没有 Magic Pulsar 函数

Pulsar 函数和自定义连接器由自定义应用程序代码(例如 a )表示。 没有自动注册自定义代码的神奇支持。 虽然这将是惊人的,但它存在一些技术挑战并且尚未实现。 因此,用户有责任确保函数(或自定义连接器)在函数配置中指定的位置可用。 例如,如果函数 config 的值为 ,则函数 jar 文件必须存在于指定的路径中。java.util.Functionjar./some/path/MyFunction.jarspring-doc.cn

3.2. 名称标识符

函数 config 中的属性用作标识符,以确定函数是否已经存在,以便决定是否执行 update 或 create 操作。 因此,如果需要更新函数,则不应修改名称。namespring-doc.cn

4. 配置

4.1. Pulsar 函数归档

每个 Pulsar 函数都由一个实际的存档(eg. jar 文件)表示。 存档的路径是通过 sources 和 sink 的属性以及 functions 的属性指定的。archivejarspring-doc.cn

以下规则确定 path 的 “type” :spring-doc.cn

  • 该路径以 w/ 开头时为 URL(file|http|https|function|sink|source)://spring-doc.cn

  • 该路径在开始时是内置的 w/ (指向提供的开箱即用连接器之一)builtin://spring-doc.cn

  • 否则,路径是本地路径。spring-doc.cn

在创建/更新操作期间发生的操作取决于路径 “type”,如下所示:spring-doc.cn

  • 当路径为 URL 时,内容由服务器下载spring-doc.cn

  • 路径内置时,内容已在服务器上可用spring-doc.cn

  • 当路径为本地路径时,内容将上传到服务器spring-doc.cn

4.2. 内置 source 和 sink

Apache Pulsar 提供了许多开箱即用的 source 和 sink 连接器,也就是内置连接器。要使用内置连接器,只需将 设置为 (例如 )。archivebuiltin://<connector-type>builtin://rabbitspring-doc.cn

5. 自定义函数

有关如何开发和打包自定义函数的详细信息,请参阅 Pulsar 文档。 但是,在高级别上,要求如下:spring-doc.cn

构建和打包函数后,有几种方法可以使其可用于函数注册。spring-doc.cn

5.1. file://

jar 文件可以上传到服务器,然后通过函数 config 的属性引用file://jarspring-doc.cn

5.2. 本地

jar 文件可以保持在本地,然后通过函数 config 属性中的本地路径进行引用。jarspring-doc.cn

5.3. http://

jar 文件可以通过 HTTP 服务器提供,然后通过函数 config 的属性引用http(s)://jarspring-doc.cn

5.4. function://

jar 文件可以上传到 Pulsar 包管理器,然后通过函数 config 的属性中引用function://jarspring-doc.cn

6. 示例

以下是一些示例,展示了如何配置 bean,从而自动创建支持式 Pulsar 源连接器。PulsarSourcePulsarFunctionAdministrationspring-doc.cn

使用内置 Rabbit 连接器的 PulsarSource
@Bean
PulsarSource rabbitSource() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", "my.rabbit.host");
    configs.put("port", 5672);
    configs.put("virtualHost", "/");
    configs.put("username", "guest");
    configs.put("password", "guest");
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

下一个示例与上一个示例相同,只是它使用自动配置的 Spring Boot 来减轻配置负担。这当然要求应用程序在启用 Rabbit 自动配置的情况下使用 Spring Boot。RabbitPropertiesspring-doc.cn

使用内置 Rabbit 连接器和 Spring Boot RabbitProperties 的 PulsarSource
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", props.determineHost());
    configs.put("port", props.determinePort());
    configs.put("virtualHost", props.determineVirtualHost());
    configs.put("username", props.determineUsername());
    configs.put("password", props.determinePassword());
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}
有关更详细的示例,请参阅使用 Pulsar Functions 的示例流管道示例应用程序