Spring for Apache Pulsar 为 Pulsar IO(连接器)和 Pulsar Functions 提供基本支持,允许用户定义由 、 和 组成的流处理管道。
和 由 Pulsar IO(连接器)建模,而 则由 Pulsar Functions 表示。sources
processors
sinks
sources
sinks
processors
因为连接器只是特殊功能,为了简单起见,我们将 source、sink 和 functions 统称为“Pulsar 函数”。 |
因为连接器只是特殊功能,为了简单起见,我们将 source、sink 和 functions 统称为“Pulsar 函数”。 |
1. Pulsar 函数管理
该框架提供了管理 Pulsar 函数的组件。
当您使用 Pulsar Spring Boot Starters时,您将获得自动配置。PulsarFunctionAdministration
PulsarFunctionAdministration
默认情况下,应用程序会尝试连接到 位于 的本地 Pulsar 实例。
但是,由于它利用了已配置的 ,请参阅 Pulsar Admin Client 以获取可用的客户端选项(包括身份验证)。
spring.pulsar.function.*
应用程序属性提供了其他配置选项。localhost:8080
PulsarAdministration
2. 自动功能管理
在应用程序启动时,框架会在应用程序上下文中查找所有 、 和 bean。
对于每个 bean,都会创建或更新相应的 Pulsar 函数。
根据函数类型、函数配置以及函数是否已存在来调用适当的 API。PulsarFunction
PulsarSink
PulsarSource
、 和 bean 分别是 Apache Pulsar 配置对象 、 、 和 的简单包装器。
由于支持的连接器数量众多(及其不同的配置),框架不会尝试创建配置属性层次结构来镜像不同的 Apache Pulsar 连接器。
相反,用户有责任提供完整的 config 对象,然后框架使用提供的 config 处理 Management (create/update)。PulsarFunction PulsarSink PulsarSource FunctionConfig SinkConfig SourceConfig |
在应用程序关闭时,应用程序启动期间处理的所有函数都会强制执行其停止策略,并且要么单独保留、停止或从 Pulsar 服务器中删除。
、 和 bean 分别是 Apache Pulsar 配置对象 、 、 和 的简单包装器。
由于支持的连接器数量众多(及其不同的配置),框架不会尝试创建配置属性层次结构来镜像不同的 Apache Pulsar 连接器。
相反,用户有责任提供完整的 config 对象,然后框架使用提供的 config 处理 Management (create/update)。PulsarFunction PulsarSink PulsarSource FunctionConfig SinkConfig SourceConfig |
3. 限制
4. 配置
4.1. Pulsar 函数归档
每个 Pulsar 函数都由一个实际的存档(eg. jar 文件)表示。
存档的路径是通过 sources 和 sink 的属性以及 functions 的属性指定的。archive
jar
以下规则确定 path 的 “type” :
-
该路径以 w/ 开头时为 URL
(file|http|https|function|sink|source)://
-
该路径在开始时是内置的 w/ (指向提供的开箱即用连接器之一)
builtin://
-
否则,路径是本地路径。
在创建/更新操作期间发生的操作取决于路径 “type”,如下所示:
-
当路径为 URL 时,内容由服务器下载
-
当路径内置时,内容已在服务器上可用
-
当路径为本地路径时,内容将上传到服务器
5. 自定义函数
有关如何开发和打包自定义函数的详细信息,请参阅 Pulsar 文档。 但是,在高级别上,要求如下:
-
代码使用 Java8
-
代码实现 或
java.util.Function
org.apache.pulsar.functions.api.Function
-
打包为 uber jar
构建和打包函数后,有几种方法可以使其可用于函数注册。
6. 示例
以下是一些示例,展示了如何配置 bean,从而自动创建支持式 Pulsar 源连接器。PulsarSource
PulsarFunctionAdministration
@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
下一个示例与上一个示例相同,只是它使用自动配置的 Spring Boot 来减轻配置负担。这当然要求应用程序在启用 Rabbit 自动配置的情况下使用 Spring Boot。RabbitProperties
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
有关更详细的示例,请参阅使用 Pulsar Functions 的示例流管道示例应用程序 |
有关更详细的示例,请参阅使用 Pulsar Functions 的示例流管道示例应用程序 |