Pulsar 函数
Spring for Apache Pulsar 为 Pulsar IO(连接器)和 Pulsar Functions 提供基本支持,允许用户定义由sources
,processors
和sinks
.
这sources
和sinks
由 Pulsar IO(连接器)建模,而processors
由 Pulsar Functions 表示。
因为连接器只是特殊功能,为了简单起见,我们将 source、sink 和 functions 统称为“Pulsar 函数”。 |
1. Pulsar 函数管理
该框架提供了PulsarFunctionAdministration
组件来管理 Pulsar 函数。
当您使用 Pulsar Spring Boot Starters时,您将获得PulsarFunctionAdministration
auto-configured的。
默认情况下,应用程序会尝试连接到位于localhost:8080
.
但是,由于它利用了已配置的PulsarAdministration
,请参阅 Pulsar Admin Client 了解可用的客户端选项(包括身份验证)。
其他配置选项可通过spring.pulsar.function.*
应用程序属性。
2. 自动功能管理
在应用程序启动时,框架会查找所有PulsarFunction
,PulsarSink
和PulsarSource
bean 中的对象。
对于每个 bean,都会创建或更新相应的 Pulsar 函数。
根据函数类型、函数配置以及函数是否已存在来调用适当的 API。
这PulsarFunction ,PulsarSink 和PulsarSource bean 是 Apache Pulsar 配置对象的简单包装器FunctionConfig ,SinkConfig 和SourceConfig 分别。
由于支持的连接器数量众多(及其不同的配置),框架不会尝试创建配置属性层次结构来镜像不同的 Apache Pulsar 连接器。
相反,用户有责任提供完整的 config 对象,然后框架使用提供的 config 处理 Management (create/update)。 |
在应用程序关闭时,应用程序启动期间处理的所有函数都会强制执行其停止策略,并且要么单独保留、停止或从 Pulsar 服务器中删除。
3. 限制
4. 配置
4.1. Pulsar 函数归档
每个 Pulsar 函数都由一个实际的存档(eg. jar 文件)表示。
存档的路径是通过archive
属性,以及jar
函数的属性。
以下规则确定 path 的 “type” :
-
该路径以 w/ 开头时为 URL
(file|http|https|function|sink|source)://
-
该路径在开始时是内置的 w/
builtin://
(指向提供的现成连接器之一) -
否则,路径是本地路径。
在创建/更新作期间发生的作取决于路径 “type”,如下所示:
-
当路径为 URL 时,内容由服务器下载
-
当路径内置时,内容已在服务器上可用
-
当路径为本地路径时,内容将上传到服务器
5. 自定义函数
有关如何开发和打包自定义函数的详细信息,请参阅 Pulsar 文档。 但是,在高级别上,要求如下:
-
代码使用 Java8
-
代码实现
java.util.Function
或org.apache.pulsar.functions.api.Function
-
打包为 uber jar
构建和打包函数后,有几种方法可以使其可用于函数注册。
6. 示例
以下是一些示例,演示如何配置PulsarSource
bean 的 Bean 生成PulsarFunctionAdministration
自动创建支持的 Pulsar 源连接器。
@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
下一个示例与上一个示例相同,不同之处在于它使用自动配置的 Spring BootRabbitProperties
以减轻配置负担。这当然要求应用程序在启用 Rabbit 自动配置的情况下使用 Spring Boot。
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
有关更详细的示例,请参阅使用 Pulsar Functions 的示例流管道示例应用程序 |