使用 Spring for Apache Pulsar
1. 前言
我们建议对基于 Apache Pulsar 的 Spring 应用程序使用 Spring-Boot-First 方法,因为这极大地简化了事情。
为此,您可以添加spring-pulsar-spring-boot-starter module 作为依赖项。 |
此参考的大部分内容都希望读者使用 starter,并在考虑这一点的情况下提供大多数配置说明。 但是,当说明特定于 Spring Boot Starters用法时,会努力调用。 |
2. 快速导览
我们将通过展示一个生成和消费的示例 Spring Boot 应用程序来快速浏览 Spring for Apache Pulsar。
这是一个完整的应用程序,不需要任何额外的配置,只要你在默认位置运行一个 Pulsar 集群 -localhost:6650
.
2.1. 依赖项
Spring Boot 应用程序只需要spring-pulsar-spring-boot-starter
Dependency。下面的清单分别显示了如何定义 Maven 和 Gradle 的依赖关系:
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar</artifactId>
<version>3.2.12</version>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-pulsar:3.2.12'
}
使用
|
2.2. 应用程序代码
下面的清单显示了该示例的 Spring Boot 应用程序案例:
@SpringBootApplication
public class PulsarBootHelloWorld {
public static void main(String[] args) {
SpringApplication.run(PulsarBootHelloWorld.class, args);
}
@Bean
ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Pulsar World!");
}
@PulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
void listen(String message) {
System.out.println("Message Received: " + message);
}
}
让我们快速了解此应用程序的更高级别详细信息。 在文档的后面部分,我们将更详细地了解这些组件。
在前面的示例中,我们严重依赖 Spring Boot 自动配置。
Spring Boot 为我们的应用程序自动配置了多个组件。
它会自动提供PulsarClient
,该请求由生成者和使用者用于应用程序。
Spring Boot 还可以自动配置PulsarTemplate
,然后将其注入应用程序并开始向 Pulsar 主题发送记录。
应用程序将消息发送到名为hello-pulsar
.
请注意,应用程序没有指定任何 schema 信息,因为 Spring for Apache Pulsar 库会自动从你发送的数据类型中推断 schema 类型。
我们使用PulsarListener
注解以从hello-pulsar
我们发布数据的主题。PulsarListener
是一个方便的注解,它将消息侦听器容器基础设施包装在 Spring for Apache Pulsar 中。
在幕后,它创建了一个消息侦听器容器来创建和管理 Pulsar 消费者。
与常规 Pulsar 消费者一样,使用PulsarListener
是Exclusive
模式。
当记录发布到hello-pulsar
topic 中,Pulsarlistener
使用它们并在控制台上打印它们。
框架还从数据类型推断所使用的架构类型,其中PulsarListner
method 用作有效负载 —String
,在本例中。
3. Pulsar 客户端
当您使用 Pulsar Spring Boot Starter 时,您将获得PulsarClient
auto-configured的。
默认情况下,应用程序会尝试连接到位于pulsar://localhost:6650
.
这可以通过设置spring.pulsar.client.service-url
属性设置为不同的值。
该值必须是有效的 Pulsar 协议 URL |
您可以通过指定任何spring.pulsar.client.*
应用程序属性。
如果您不使用Starters,则需要配置和注册PulsarClient 你自己。
有一个DefaultPulsarClientFactory 接受可用于帮助实现此目的的 Builder 定制器。 |
3.1. TLS 加密 (SSL)
默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务通信。 以下部分介绍如何配置 Pulsar 客户端以使用 TLS 加密 (SSL)。 先决条件是 Broker 也已配置为使用 TLS 加密。
Spring Boot 自动配置当前不支持任何 TLS/SSL 配置属性。
您可以改为提供PulsarClientBuilderCustomizer
在 Pulsar 客户端构建器上设置必要的属性。
Pulsar 支持隐私增强邮件 (PEM) 和 Java 密钥库 (JKS) 证书格式。
请按照以下步骤配置 TLS:
-
调整 Pulsar 客户端服务 url 以使用
pulsar+ssl://
方案和 TLS 端口(通常为6651
). -
调整管理员客户端服务 URL 以使用
https://
方案和 TLS Web 端口(通常为8443
). -
提供客户端生成器定制器,用于在生成器上设置相关属性。
您可以在 Pulsar TLS 加密官方文档中找到有关上述内容的更多信息。
3.2. 身份验证
要连接到需要身份验证的 Pulsar 集群,您需要指定要使用的身份验证插件以及指定插件所需的任何参数。 使用 Spring Boot 自动配置时,您可以通过配置属性设置插件和插件参数(在大多数情况下)。
您需要确保在 例如,如果要为 |
对 auth 参数使用环境变量通常是有问题的,因为在转换过程中会丢失区分大小写。
例如,请考虑以下
当 Spring Boot 加载此属性时,它将使用
|
当不使用 Spring Boot 自动配置时,您可以使用org.apache.pulsar.client.api.AuthenticationFactory
创建身份验证,然后在您提供给客户端工厂的客户端定制器中的 Pulsar 客户端构建器上直接设置它。
以下列表显示了如何配置每个受支持的身份验证机制。
单击此处了解 Athenz
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationAthenz param: tenantDomain: ... tenantService: ... providerDomain: ... privateKey: ... keyId: ...
这也需要 TLS 加密。 |
单击此处获取令牌
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken param: token: some-token-goes-here
单击此处查看 Basic
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationBasic param: userId: ... password: ...
单击此处获取 OAuth2
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 param: issuerUrl: ... privateKey: ... audience: ... scope: ...
点击这里查看 Sasl
spring: pulsar: client: authentication: plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationSasl param: saslJaasClientSectionName: ... serverType: ...
单击此处获取 mTLS (PEM)
由于此选项需要 TLS 加密,而这已经要求您提供客户端生成器定制器,因此建议直接在提供的 TLS 定制器中的客户端生成器上添加身份验证。
您可以使用org.apache.pulsar.client.api.AuthenticationFactory 帮助创建 Authentication 对象,如下所示: |
Authentication auth = AuthenticationFactory.TLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem");
请参阅有关 mTLS (PEM) 的 Pulsar 官方文档。
单击此处获取 mTLS (JKS)
由于此选项需要 TLS 加密,而这已经要求您提供客户端生成器定制器,因此建议直接在提供的 TLS 定制器中的客户端生成器上添加身份验证。
您可以使用org.apache.pulsar.client.api.AuthenticationFactory 帮助创建 Authentication 对象,如下所示: |
Authentication auth = AuthenticationFactory.create(
"org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
Map.of("keyStoreType", "JKS", "keyStorePath", "/path/to/my/keystore.jks", "keyStorePassword", "clientpw"));
请参阅有关 mTLS (JKS) 的 Pulsar 官方文档。
您可以在官方 Pulsar 安全文档中找到有关每个支持插件及其所需属性的更多信息。
4. 消息制作
4.1. Pulsar 模板
在 Pulsar 生产者端,Spring Boot 自动配置提供了一个PulsarTemplate
用于发布记录。该模板实现了一个名为PulsarOperations
并提供通过其 Contract 发布记录的方法。
这些发送 API 方法分为两类:send
和sendAsync
.
这send
methods 使用 Pulsar producer 上的同步发送功能来阻止调用。
它们返回MessageId
的消息。
这sendAsync
方法调用是非阻塞的异步调用。
他们返回一个CompletableFuture
,可用于在消息发布后异步接收消息 ID。
对于不包含 topic 参数的 API 变体,将使用主题解析过程来确定目标主题。 |
4.1.1. 简单的 API
该模板为简单的发送请求提供了一些方法(前缀为 'send')。对于更复杂的发送请求,Fluent API 允许您配置更多选项。
4.1.2. Fluent API
该模板提供了一个 Fluent 构建器来处理更复杂的发送请求。
4.1.3. 消息自定义
您可以指定TypedMessageBuilderCustomizer
以配置传出消息。例如,以下代码演示如何发送键控消息:
template.newMessage(msg)
.withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
.send();
4.1.4. 生产者自定义
您可以指定ProducerBuilderCustomizer
配置底层 Pulsar 生产者构建器,最终构建用于发送传出消息的生产者。
请谨慎使用,因为这会提供对 producer 构建器的完全访问权限,并调用其某些方法(例如create ) 可能会产生意想不到的副作用。 |
例如,以下代码演示如何禁用批处理和启用分块:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
.send();
另一个示例显示了在将记录发布到分区主题时如何使用自定义路由。
指定您的自定义MessageRouter
在Producer
构建器,例如:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
.send();
请注意,当使用MessageRouter ,则spring.pulsar.producer.message-routing-mode 是custom . |
这个另一个示例展示了如何添加ProducerInterceptor
这将在将消息发布到 broker 之前拦截和更改 producer 收到的消息:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.intercept(interceptor))
.send();
定制器将仅适用于用于发送作的创建者。 如果要将定制器应用于所有生产者,则必须将它们提供给生产者工厂,如全局生产者定制中所述。
使用 Lambda 定制器时,必须遵循“Lambda 定制器注意事项”中描述的规则。 |
4.2. 指定 Schema 信息
如果您使用 Java 基元类型,框架会自动为您检测架构,并且您无需指定任何架构类型来发布数据。
对于非原始类型,如果在对PulsarTemplate
,Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从 type.
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。 |
4.2.1. 自定义 Schema 映射
作为在PulsarTemplate
对于复杂类型,可以使用类型的映射配置 Schema Resolver。
这样就无需在框架使用传出消息类型咨询解析程序时指定架构。
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
要为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是 Message 类的完全限定名称。 |
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。
以下示例使用架构解析程序定制器为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
有了这个配置,就不需要设置 specify the schema on send作。
4.3. Pulsar Producer 工厂
这PulsarTemplate
依赖于PulsarProducerFactory
实际创建底层 producer。
Spring Boot 自动配置还提供了这个生产者工厂,你可以通过指定任何spring.pulsar.producer.*
应用程序属性。
4.3.1. 全局 producer 自定义
该框架提供了ProducerBuilderCustomizer
Contract 的 Contract,它允许您配置用于构建每个生产者的底层构建器。
要自定义所有生产者,您可以将定制器列表传递到PulsarProducerFactory
构造 函数。
使用多个定制器时,将按照它们在列表中的显示顺序应用它们。
如果使用 Spring Boot 自动配置,则可以将定制器指定为 bean,它们将自动传递给PulsarProducerFactory ,根据其@Order 注解。 |
如果您只想将定制器应用于单个创建者,则可以使用 Fluent API 并在发送时指定定制器。
4.4. Pulsar 生产者缓存
每个底层 Pulsar 生产者都会消耗资源。 为了提高性能并避免持续创建 Producer,Producer Factory 会缓存它创建的 Producer。 它们以 LRU 方式缓存,并在配置的时间段内未使用时被驱逐。 缓存键由足够的信息组成,以确保在后续创建请求中向调用方返回相同的创建者。
此外,您还可以通过指定任何spring.pulsar.producer.cache.*
应用程序属性。
4.4.1. 对 Lambda 定制器的注意事项
用户提供的任何创建者定制器也包含在缓存键中。
由于缓存键依赖于equals/hashCode
,在使用 Lambda 定制器时必须小心。
统治:作为 Lambda 实现的两个定制器将在equals/hashCode 当且仅当它们使用相同的 Lambda 实例,并且不需要在其闭包之外定义任何变量时。 |
为了澄清上述规则,我们将看几个例子。
在以下示例中,定制器定义为内联 Lambda,这意味着每次调用sendUser
使用相同的 Lambda 实例。此外,它不需要在其闭包之外的变量。因此,它将作为缓存键进行匹配。
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName("user"))
.send();
}
在下一个案例中,定制器被定义为内联 Lambda,这意味着对sendUser
使用相同的 Lambda 实例。但是,它需要一个 Variable outside its closure。因此,它不会作为缓存键进行匹配。
void sendUser() {
var user = randomUser();
var name = randomName();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName(name))
.send();
}
在最后一个示例中,定制器被定义为内联 Lambda,这意味着每次调用sendUser
使用相同的 Lambda 实例。虽然它确实使用变量名称,但它并不源自其闭包之外,因此将作为缓存键进行匹配。
这说明了变量可以在 Lambda 闭包中使用,甚至可以调用静态方法。
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> {
var name = SomeHelper.someStaticMethod();
b.producerName(name);
})
.send();
}
统治:如果您的 Lambda 定制器未定义一次且仅定义一次(在后续调用中使用相同的实例),或者它需要在其闭包之外定义变量,则必须为定制器实现提供有效的equals/hashCode 实现。 |
如果不遵循这些规则,则创建者缓存将始终丢失,并且您的应用程序性能将受到负面影响。 |
4.5. 在 Producer 上拦截消息
添加ProducerInterceptor
允许您在将 Producer 收到的消息发布到 Broker 之前拦截和更改这些消息。
为此,您可以将拦截器列表传递到PulsarTemplate
构造 函数。
当使用多个侦听器时,它们的应用 Sequence 是它们在列表中的显示顺序。
如果使用 Spring Boot 自动配置,则可以将拦截器指定为 Beans。
它们会自动传递到PulsarTemplate
.
拦截器的排序是通过使用@Order
注解如下:
@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
...
}
@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
...
}
如果您不使用 Starter,则需要自己配置和注册上述组件。 |
5. 消息消费
5.1. Pulsar 监听器
对于 Pulsar 消费者,我们建议最终用户应用程序使用PulsarListener
注解。
要使用PulsarListener
,您需要使用@EnablePulsar
注解。
当您使用 Spring Boot 支持时,它会自动启用此 Comments 并配置所有必要的组件PulsarListener
,例如消息侦听器基础设施(负责创建 Pulsar 消费者)。PulsarMessageListenerContainer
使用PulsarConsumerFactory
创建和管理 Pulsar 消费者,即用于消费消息的底层 Pulsar 消费者。
Spring Boot 提供了这个消费者工厂,您可以通过指定spring.pulsar.consumer.*
应用程序属性。工厂上的大多数已配置属性将在侦听器中得到遵守,但以下情况除外:
这spring.pulsar.consumer.subscription.name 属性将被忽略,而是在未在 Comments 上指定时生成。 |
这spring.pulsar.consumer.subscription-type property 被忽略,而是取自 annotation 上的值。但是,您可以将subscriptionType = {} 以改用 Property 值作为默认值。 |
让我们重新审视一下PulsarListener
我们在 Quick-Tour 部分看到的代码片段:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
您可以进一步简化此方法:
@PulsarListener
public void listen(String message) {
System.out.println("Message Received: " + message);
}
在这种最基本的形式中,当subscriptionName
未在@PulsarListener
annotation 将使用自动生成的订阅名称。
同样,当topics
未直接提供,则使用主题解析过程来确定目标主题。
在PulsarListener
方法,我们接收数据为String
,但我们没有指定任何 schema 类型。
在内部,该框架依赖于 Pulsar 的 schema 机制将数据转换为所需的类型。
框架检测到您希望String
type,然后根据该信息推断 schema 类型。
然后,它将该架构提供给使用者。
对于 Java 中的所有原始类型,框架都会执行此推理。
对于任何复杂类型(例如 JSON、AVRO 等),框架无法进行此推理,用户需要使用schemaType
财产。
以下示例显示了另一个PulsarListener
方法,该方法采用Integer
:
@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
System.out.println(message);
}
以下内容PulsarListener
method 展示了我们如何从 topic 中使用复杂类型:
@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
System.out.println(message);
}
请注意,添加了schemaType
属性PulsarListener
.
这是因为该库无法从提供的类型推断架构类型:Foo
.我们必须告诉框架要使用什么 schema。
让我们看看更多方法。
你可以直接消费 Pulsar 消息:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
}
下面的示例使用 Spring 消息传递信封来使用记录:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
}
现在让我们看看如何批量使用记录。
以下示例使用PulsarListener
要批量使用记录作为 POJO:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
请注意,在此示例中,我们将记录作为集合 (List
) 的对象。
此外,要在PulsarListener
level 中,您需要设置batch
属性设置为true
.
根据实际类型List
holds,则框架会尝试推断要使用的 schema。
如果List
包含复杂类型,您仍然需要提供schemaType
上PulsarListener
.
下面使用Message
envelope 的 Pulsar Java 客户端提供的 Paypal 中:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
以下示例使用带有 Spring 消息传递信封的批处理记录Message
类型:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}
最后,您还可以使用Messages
holder 对象用于批处理监听器:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
当您使用PulsarListener
,你可以直接在 Comments 本身上提供 Pulsar consumer 属性。
如果您不想使用前面提到的 Boot 配置属性或具有多个PulsarListener
方法。
以下示例直接在PulsarListener
:
@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用的属性是直接的 Pulsar 消费者属性,而不是spring.pulsar.consumer 应用程序配置属性 |
5.1.1. 自定义 ConsumerBuilder
您可以通过ConsumerBuilder
使用PulsarListenerConsumerBuilderCustomizer
通过提供@Bean
的类型PulsarListenerConsumerBuilderCustomizer
然后将其提供给PulsarListener
如下所示。
@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只有一个@PulsarListener 和一个PulsarListenerConsumerBuilderCustomizer bean 已注册,则将自动应用定制器。 |
5.2. 指定 Schema 信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在PulsarListener
.
对于非原始类型,如果未在注解上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从 type.
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。 |
5.2.1. 自定义 Schema 映射
作为在PulsarListener
对于复杂类型,可以使用类型的映射配置 Schema Resolver。
这样就无需在侦听器上设置 schema,因为框架使用传入消息类型咨询解析程序。
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
要为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是 Message 类的完全限定名称。 |
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。
以下示例使用架构解析程序定制器为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
有了这个配置,就不需要在侦听器上设置 schema,例如:
@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
System.out.println(user);
}
5.3. 访问 Pulsar Consumer 对象
有时,你需要直接访问 Pulsar Consumer 对象。 以下示例显示了如何获取它:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
访问Consumer 对象,请不要调用任何会通过调用任何 receive 方法来改变 Consumer 光标位置的作。
所有此类作都必须由容器完成。 |
5.4. Pulsar 消息监听器容器
现在我们已经看到了消费者端的基本交互PulsarListener
.现在让我们深入了解一下PulsarListener
与底层 Pulsar 消费者交互。
请记住,对于最终用户应用程序,在大多数情况下,我们建议使用PulsarListener
注解,以便在使用 Spring for Apache Pulsar 时直接从 Pulsar 主题中使用,因为该模型涵盖了广泛的应用程序用例。
但是,了解如何作非常重要PulsarListener
在内部工作。本节将介绍这些详细信息。
如前所述,当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息消费的核心。PulsarListener
在幕后使用消息侦听器容器基础设施来创建和管理 Pulsar 消费者。
Spring for Apache Pulsar 通过以下方式提供此消息侦听器容器的 ContractPulsarMessageListenerContainer
.
此消息侦听器容器的默认实现是通过DefaultPulsarMessageListenerContainer
.
顾名思义,PulsarMessageListenerContainer
包含消息侦听器。
容器创建 Pulsar 消费者,然后运行一个单独的线程来接收和处理数据。
数据由提供的消息侦听器实现处理。
消息侦听器容器使用使用者的batchReceive
方法。
收到数据后,数据将移交给选定的消息侦听器实现。
使用 Spring for Apache Pulsar 时,可以使用以下消息侦听器类型。
我们将在以下部分中看到有关这些不同消息侦听器的详细信息。
但是,在这样做之前,让我们仔细看看容器本身。
5.4.1. 默认 PulsarMessageListenerContainer
这是一个基于使用者的消息侦听器容器。 下面的清单显示了它的构造函数:
public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
}
它接收一个PulsarConsumerFactory
(用于创建使用者)和PulsarContainerProperties
对象(包含有关容器属性的信息)。PulsarContainerProperties
具有以下构造函数:
public PulsarContainerProperties(String... topics)
public PulsarContainerProperties(Pattern topicPattern)
您可以通过以下方式提供主题信息PulsarContainerProperties
或作为提供给 Consumer Factory 的 Consumer Property 的 Consumer 属性。
以下示例使用DefaultPulsarMessageListenerContainer
:
Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
});
DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
pulsarContainerProperties);
return pulsarListenerContainer;
DefaultPulsarMessageListenerContainer
仅创建一个使用者。
如果要通过多个线程管理多个使用者,则需要使用ConcurrentPulsarMessageListenerContainer
.
5.4.2. ConcurrentPulsarMessageListenerContainer
ConcurrentPulsarMessageListenerContainer
具有以下构造函数:
public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
ConcurrentPulsarMessageListenerContainer
允许您指定concurrency
property 的 setter 实现。
并发性超过1
仅允许在非独占订阅 (failover
,shared
和key-shared
).
您只能使用默认的1
用于并发。
以下示例启用concurrency
通过PulsarListener
注解failover
订阅。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
在前面的监听器中,假设 topicmy-topic
有三个分区。
如果是非分区主题,请将并发设置为3
什么都不做。除了主要的活跃使用者之外,您还会获得两个空闲使用者。
如果主题具有三个以上的分区,则消息将在容器创建的使用者之间进行负载均衡。
如果您运行此PulsarListener
,您会看到来自不同分区的消息通过不同的使用者使用,如前面示例中的线程名称和使用者名称打印输出所暗示的那样。
当您使用Failover subscription 这样,Pulsar 保证消息排序。 |
下面的清单显示了PulsarListener
,但使用Shared
subscription 和concurrency
启用。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
在前面的示例中,PulsarListener
创建五个不同的使用者(这一次,我们假设主题有 5 个分区)。
在这个版本中,没有消息排序,因为Shared 订阅不保证 Pulsar 中的任何消息排序。 |
如果您需要消息排序,但仍需要共享订阅类型,则需要使用Key_Shared
订阅类型。
5.4.3. 消息消费
让我们看看消息侦听器容器如何实现基于单记录和基于批处理的消息使用。
单记录消耗
让我们重新审视一下我们的基本PulsarListener
为了这次讨论:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
有了这个PulsarListener
方法,我们必须要求 Spring for Apache Pulsar 每次调用具有单个记录的监听器方法。
我们提到了消息侦听器容器使用batchReceive
方法。
框架检测到PulsarListener
,在本例中,接收单个记录。这意味着,在每次调用该方法时,它都需要一个 singe 记录。
尽管消息侦听器容器会批量使用记录,但它会遍历收到的批处理,并通过适配器调用PulsarRecordMessageListener
.
正如您在上一节中所看到的,PulsarRecordMessageListener
从MessageListener
由 Pulsar Java 客户端提供,并且支持基本的received
方法。
批量消耗
以下示例显示了PulsarListener
批量消费记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
当您使用这种类型的PulsarListener
,框架会检测到您处于批处理模式。
由于它已经使用 Consumer 的batchReceive
方法,它会通过适配器将整个 Batch 移交给 listener 方法PulsarBatchMessageListener
.
5.5. Pulsar 头文件
Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头的列表可以在 PulsarHeaders.java 中找到。
5.5.1. 在基于单记录的 Consumer 中访问
以下示例展示了如何在使用单记录消费模式的应用程序中访问各种 Pulsar Headers:
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
}
在前面的示例中,我们访问messageId
和rawData
message 元数据以及名为foo
.
Spring@Header
annotation 用于每个 Header 字段。
您还可以使用 Pulsar 的Message
作为 envelope 来承载负载。
这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。
但是,为方便起见,您也可以使用Header
注解。
请注意,您还可以使用 Spring 消息传递Message
envelope 来携带 payload,然后使用@Header
.
5.5.2. 基于批量记录的 Consumer 访问
在本节中,我们将了解如何在使用批处理 consumer 的应用程序中访问各种 Pulsar Headers:
@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {
}
在前面的示例中,我们将数据作为List<String>
.
在提取各种 header 时,我们以List<>
也。
Spring for Apache Pulsar 确保 header 列表与 data 列表相对应。
当您使用批处理侦听器并接收有效负载时,您还可以以相同的方式提取标头List<org.apache.pulsar.client.api.Message<?>
,org.apache.pulsar.client.api.Messages<?>
或org.springframework.messaging.Messsge<?>
.
5.6. 消息确认
当您使用 Spring for Apache Pulsar 时,消息确认由框架处理,除非应用程序选择退出。 在本节中,我们将详细介绍框架如何处理消息确认。
5.6.1. 消息 ACK 模式
Spring for Apache Pulsar 提供了以下确认消息的模式:
-
BATCH
-
RECORD
-
MANUAL
BATCH
确认模式是默认模式,但您可以在 Message Listener Container (消息侦听器) 容器上更改它。
在以下部分中,我们将了解在同时使用PulsarListener
以及它们如何转换为后备消息侦听器容器(并最终转换为 Pulsar 消费者)。
5.6.2. 单记录模式下的自动消息确认
让我们重新审视一下我们基本的基于PulsarListener
:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
很自然地想知道,当您使用PulsarListener
,特别是如果你熟悉直接使用 Pulsar consumer 的话。
答案归结为消息侦听器容器,因为这是 Spring for Apache Pulsar 中协调所有与消费者相关的活动的中心位置。
假设您没有覆盖默认行为,那么当您使用前面的PulsarListener
:
-
首先,侦听器容器从 Pulsar 消费者批量接收消息。
-
收到的消息将传递给
PulsarListener
一次一条消息。 -
当所有记录都传递给侦听器方法并成功处理时,容器将确认来自原始批处理的所有消息。
这是正常流程。如果原始批处理中的任何记录引发异常, Spring for Apache Pulsar 会单独跟踪这些记录。
当批处理中的所有记录都被处理完时, Spring for Apache Pulsar 会确认所有成功的消息,并否定地确认 (nack) 所有失败的消息。
换句话说,当使用PulsarRecordMessageListener
和默认的 ACK 模式BATCH
时,框架会等待从batchReceive
调用以成功处理,然后调用acknowledge
方法。
如果任何特定记录在调用处理程序方法时引发异常,Spring for Apache Pulsar 会跟踪这些记录并单独调用negativeAcknowledge
在处理整个批次之后的这些记录上。
如果应用程序希望每条记录发生确认或否定确认,则RECORD
可以启用 ACK 模式。
在这种情况下,在处理每条记录后,如果没有错误,则确认消息,如果有错误,则否定确认消息。
以下示例启用RECORD
ack 模式:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
5.6.3. 单记录模式下的手动消息确认
您可能并不总是希望框架发送确认,而是直接从应用程序本身发送。 Spring for Apache Pulsar 提供了几种启用手动消息确认的方法。以下示例显示了其中之一:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
有几件事值得在这里解释。首先,我们通过设置ackMode
上PulsarListener
.
启用手动 ack 模式时, Spring for Apache Pulsar 允许应用程序注入Acknowledgment
对象。
框架通过选择兼容的消息侦听器容器来实现此目的:PulsarAcknowledgingMessageListener
对于基于单个记录的消费,它允许您访问Acknowledgment
对象。
这Acknowledgment
object 提供了以下 API 方法:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
你可以注入这个Acknowledgment
object 导入到PulsarListener
使用MANUAL
ack 模式,然后调用相应的方法之一。
在前面的PulsarListener
例如,我们将无参数acknowledge
方法。
这是因为框架知道哪个Message
它目前正在 Under.
调用acknowledge()
,则无需接收带有Message
enveloper' 的 URL,而是使用 target 类型 —String
,在此示例中。
您还可以调用acknowledge
通过提供消息 ID:acknowledge.acknowledge(message.getMessageId());
当您使用acknowledge(messageId)
,您必须使用Message<?>
信封。
与可能的确认类似,Acknowledgment
API 还提供了用于否定确认的选项。
请参阅前面显示的 nack 方法。
您还可以调用acknowledge
直接在 Pulsar 消费者上:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
调用acknowledge
直接在底层 Consumer 上,你需要自己做错误处理。
使用Acknowledgment
不需要这样做,因为框架可以为您执行此作。
因此,您应该使用Acknowledgment
Object 方法。
使用手动确认时,请务必了解框架完全不执行任何确认。 因此,在设计应用程序时考虑正确的确认策略非常重要。 |
5.6.4. 批量消费时自动确认消息
当您批量使用记录时(请参阅“消息 ACK 模式”),并且使用默认的 ack 模式BATCH
,则当成功处理整个批处理时,将确认整个批处理。
如果任何记录引发异常,则对整个批处理进行否定确认。
请注意,这可能不是在生产者端批处理的同一批次。相反,这是从调用batchReceive
在消费者身上
请考虑以下批处理侦听器:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
当传入集合中的所有邮件 (messages
在此示例中)进行处理,则框架会确认所有这些 ID。
在批处理模式下使用时,RECORD
不是允许的 ACK 模式。
这可能会导致问题,因为应用程序可能不希望再次重新交付整个批处理。
在这种情况下,您需要使用MANUAL
确认模式。
5.6.5. 批量消费时手动消息确认
如上一节所示,当MANUAL
ack 模式,框架不做任何确认,无论是肯定的还是否定的。
处理此类问题完全取决于应用程序。
什么时候MANUAL
ack 模式,则 Spring for Apache Pulsar 会选择兼容的消息侦听器容器:PulsarBatchAcknowledgingMessageListener
用于批量使用,这使您可以访问Acknowledgment
对象。
以下是Acknowledgment
应用程序接口:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
你可以注入这个Acknowledgment
object 导入到PulsarListener
使用MANUAL
ACK 模式。
下面的清单显示了基于批处理的侦听器的基本示例:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
使用批处理侦听器时,消息侦听器容器无法知道它当前正在处理哪个记录。
因此,要手动确认,您需要使用一个重载的acknowledge
方法,该方法采用MessageId
或List<MessageId>
.
您也可以使用MessageId
对于批处理侦听器。
5.7. 消息重新投递和错误处理
现在我们已经看到了两者PulsarListener
以及消息侦听器容器基础设施及其各种功能,现在让我们尝试了解消息重投和错误处理。
Apache Pulsar 提供了各种用于消息重新传递和错误处理的原生策略。我们来看看它们,看看如何通过 Spring for Apache Pulsar 使用它们。
5.7.1. 为消息重新投递指定 Acknowledgment Timeout(确认超时)
默认情况下,除非 Consumer 崩溃,否则 Pulsar Consumer 不会重新传递消息,但你可以通过在 Pulsar Consumer 上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 消费者在该超时时间内没有确认消息,则重新传递该消息。
使用 Spring for Apache Pulsar 时,可以通过消费者定制器或本机 Pulsar 设置此属性ackTimeout
属性在properties
属性@PulsarListener
:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeout=60s"})
public void listen(String s) {
...
}
当你指定 ack 超时时,如果 Consumer 在 60 秒内没有发送确认,则 Pulsar 会将消息重新投递给 Consumer。
如果要为具有不同延迟的 ack timeout 指定一些高级回退选项,可以执行以下作:
@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeout=60s" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}
@Bean
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
在前面的示例中,我们为 Pulsar 的RedeliveryBackoff
最小延迟为 1 秒,最大延迟为 10 秒,回退乘数为 2。
在发生初始 ack 超时后,通过此回退 Bean 控制消息重新传递。
我们将 backoff bean 提供给PulsarListener
注解,方法是将ackTimeoutRedeliveryBackoff
property 设置为实际的 bean 名称 —ackTimeoutRedeliveryBackoff
,在本例中。
5.7.2. 指定否定确认重新传递
当否定确认时,Pulsar consumer 允许你指定应用程序希望如何重新传递消息。
默认设置是在一分钟内重新传递消息,但您可以通过消费者定制器或使用本机 Pulsar 进行更改negativeAckRedeliveryDelay
属性在properties
属性@PulsarListener
:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
...
}
您还可以通过提供RedeliveryBackoff
Bean 并提供 Bean 名称作为negativeAckRedeliveryBackoff
property 属性,如下所示:
@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {
@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@Bean
RedeliveryBackoff redeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
5.7.3. 使用 Apache Pulsar 的 Dead Letter Topic 进行消息重投和错误处理
Apache Pulsar 允许应用程序在消费者上使用死信主题,并带有Shared
订阅类型。
对于Exclusive
和Failover
订阅类型,则此功能不可用。
基本思想是,如果消息重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。
让我们通过检查一些代码片段来了解有关此功能的一些细节:
@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
}
首先,我们有一个特殊的 beanDeadLetterPolicy
,它被命名为deadLetterPolicy
(您可以根据需要使用任何名称)。
这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 —my-dlq-topic
,在本例中。
如果未指定 DLQ 主题名称,则默认为<topicname>-<subscriptionname>-DLQ
在 Pulsar 中。
接下来,我们将此 bean 名称提供给PulsarListener
通过设置deadLetterPolicy
财产。
请注意,PulsarListener
的订阅类型为Shared
,因为 DLQ 功能仅适用于共享订阅。
此代码主要用于演示目的,因此我们提供了一个ackTimeout
值为 1 秒。
这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它会重试。
如果该周期持续 10 次(因为这是我们在DeadLetterPolicy
),则 Pulsar consumer 会将消息发布到 DLQ 主题。
我们还有另一个PulsarListener
侦听 DLQ 主题以在发布到 DLQ 主题时接收数据。
5.7.4. Spring for Apache Pulsar 中的原生错误处理
正如我们之前提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。
如果应用程序需要对非共享订阅使用一些类似的功能,该怎么办?
Pulsar 不支持独占订阅和故障转移订阅的 DLQ 的主要原因是因为这些订阅类型是有订单保证的。
允许重新投递、DLQ 等实际上会不按顺序接收消息。
但是,如果应用程序对此没有问题,但更重要的是,需要此 DLQ 功能用于非共享订阅,该怎么办?
为此,Spring for Apache Pulsar 提供了一个PulsarConsumerErrorHandler
,您可以在 Pulsar 中的任何订阅类型中使用它:Exclusive
,Failover
,Shared
或Key_Shared
.
当您使用PulsarConsumerErrorHandler
从 Spring for Apache Pulsar 开始,请确保不要在侦听器上设置 ack timeout 属性。
让我们通过检查一些代码片段来了解一些细节:
@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
topics = "pulsarConsumerErrorHandler-topic",
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
void listenDlt(String msg) {
System.out.println("From DLT: " + msg);
}
}
考虑一下pulsarConsumerErrorHandler
豆。
这将创建一个PulsarConsumerErrorHandler
并使用 Spring for Apache Pulsar 提供的开箱即用的默认实现:DefaultPulsarConsumerErrorHandler
.DefaultPulsarConsumerErrorHandler
有一个构造函数,该构造函数采用PulsarMessageRecovererFactory
以及org.springframework.util.backoff.Backoff
.PulsarMessageRecovererFactory
是具有以下 API 的功能性接口:
@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {
/**
* Provides a message recoverer {@link PulsarMessageRecoverer}.
* @param consumer Pulsar consumer
* @return {@link PulsarMessageRecoverer}.
*/
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);
}
这recovererForConsumer
method 接受一个 Pulsar Consumer 并返回一个PulsarMessageRecoverer
,这是另一个功能接口。
下面是 APIPulsarMessageRecoverer
:
public interface PulsarMessageRecoverer<T> {
/**
* Recover a failed message, for e.g. send the message to a DLT.
* @param message Pulsar message
* @param exception exception from failed message
*/
void recoverMessage(Message<T> message, Exception exception);
}
Spring for Apache Pulsar 为PulsarMessageRecovererFactory
叫PulsarDeadLetterPublishingRecoverer
,它提供了一个默认实现,可以通过将消息发送到死信主题 (DLT) 来恢复消息。
我们将此实现提供给前面的构造函数DefaultPulsarConsumerErrorHandler
.
作为第二个参数,我们提供了一个FixedBackOff
.
您还可以提供ExponentialBackoff
来自 Spring 的高级回退功能。
然后我们为PulsarConsumerErrorHandler
作为属性添加到PulsarListener
.
该属性称为pulsarConsumerErrorHandler
.
每次使用PulsarListener
方法失败,则会重试。
重试次数由Backoff
提供的实现值。在我们的示例中,我们执行 10 次重试(总共 11 次尝试 — 第一次,然后是 10 次重试)。
用尽所有重试后,消息将发送到 DLT 主题。
这PulsarDeadLetterPublishingRecoverer
implementation 我们使用PulsarTemplate
用于将消息发布到 DLT。
在大多数情况下,相同的自动配置PulsarTemplate
从 Spring Boot 就足够了,但需要注意分区主题。
使用分区主题并为主主题使用自定义消息路由时,您必须使用不同的PulsarTemplate
那不采用自动配置的PulsarProducerFactory
,该值填充为custompartition
为message-routing-mode
.
您可以使用PulsarConsumerErrorHandler
使用以下蓝图:
@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
(c, m) -> "my-foo-dlt";
PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
new FixedBackOff(100, 5));
}
请注意,我们为PulsarDeadLetterPublishingRecoverer
作为第二个 constructor 参数。
如果未提供,则PulsarDeadLetterPublishingRecoverer
使用<subscription-name>-<topic-name>-DLT>
作为 DLT 主题名称。
使用此功能时,您应该通过设置目标解析程序而不是使用默认值来使用正确的目标名称。
当使用单记录消息侦听器时,就像我们对PulsarConsumerErrorHnadler
,如果使用手动确认,请确保在引发异常时不要否定确认消息。
相反,将异常重新引发回容器。否则,容器会认为消息是单独处理的,不会触发错误处理。
最后,我们有第二个PulsarListener
接收来自 DLT 主题的消息。
到目前为止,在本节提供的示例中,我们只看到了如何使用PulsarConsumerErrorHandler
使用单个记录消息侦听器。
接下来,我们看看如何在批处理侦听器上使用它。
5.7.5. 使用 PulsarConsumerErrorHandler 的批处理监听器
首先,让我们看一个批次PulsarListener
方法:
@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
subscriptionType = SubscriptionType.Failover,
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
for (Message<Integer> datum : data) {
if (datum.getValue() == 5) {
throw new PulsarBatchListenerFailedException("failed", datum);
}
acknowledgement.acknowledge(datum.getMessageId());
}
}
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
System.out.println("DLT - RECEIVED: " + message.getValue());
}
我们再次提供pulsarConsumerErrorHandler
属性替换为PulsarConsumerErrorHandler
Bean 名称。
当您使用批处理侦听器(如前面的示例所示)并希望使用PulsarConsumerErrorHandler
从 Spring for Apache Pulsar 开始,您需要使用手动确认。
这样,您可以确认所有成功的单个消息。
对于失败的 API,您必须抛出一个PulsarBatchListenerFailedException
替换为失败的消息。
如果没有此异常,框架不知道如何处理失败。
重试时,容器会向侦听器发送一批新的消息,从失败的消息开始。
如果再次失败,则重试,直到重试次数用完,此时消息将发送到 DLT。
此时,容器会确认消息,并将原始批处理中的后续消息移交给侦听器。
5.8. PulsarListener 的消费者定制
Spring for Apache Pulsar 提供了一种便捷的方式来自定义由PulsarListener
.
应用程序可以为PulsarListenerConsumerBuilderCustomizer
.
下面是一个示例。
@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> {
cb.subscriptionName("modified-subscription-name");
};
}
然后,可以将此定制器 Bean 名称作为PuslarListener
annotation 中,如下所示。
@PulsarListener(subscriptionName = "my-subscription",
topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {
}
框架通过PulsarListener
并在创建 Pulsar Consumer 之前将此定制器应用于 Consumer 构建器。
如果您有多个PulsarListener
方法,并且每个方法都有不同的自定义规则,则应创建多个定制器 bean 并在每个 bean 上附加适当的定制器PulsarListener
.
5.9. 暂停和恢复消息侦听器容器
在某些情况下,应用程序可能希望暂时暂停消息使用,然后稍后恢复。 Spring for Apache Pulsar 提供了暂停和恢复底层消息侦听器容器的能力。 当 Pulsar 消息侦听器容器暂停时,容器为从 Pulsar 消费者接收数据而进行的任何轮询都将被暂停。 同样,当容器恢复时,如果主题在暂停时添加了任何新记录,则下一次轮询将开始返回数据。
要暂停或恢复侦听器容器,请首先通过PulsarListenerEndpointRegistry
bean,然后在容器实例上调用 pause/resume API - 如下面的代码段所示:
@Autowired
private PulsarListenerEndpointRegistry registry;
void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
传递给getListenerContainer 是容器 ID - 这将是@PulsarListener id 属性@PulsarListener . |
5.10. Pulsar Reader 支持
该框架支持通过PulsarReaderFactory
.
Spring Boot 提供了这个读取器工厂,您可以通过指定任何spring.pulsar.reader.*
应用程序属性。
5.10.1. PulsarReader 注解
虽然可以使用PulsarReaderFactory
直接,Spring for Apache Pulsar 提供了PulsarReader
注解,你可以用它来快速读取主题,而无需自己设置任何 Reader 工厂。
这与背后的相同想法类似PulsarListener.
下面是一个简短的示例。
@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
//...
}
这id
属性是可选的,但最佳实践是提供对应用程序有意义的值。
如果未指定,则将使用自动生成的 ID。
另一方面,topics
和startMessageId
属性是必需的。
这topics
attribute 可以是单个主题或逗号分隔的主题列表。
这startMessageId
属性指示读者从主题中的特定消息开始。
的有效值startMessageId
是earliest
或latest.
假设您希望读者从最早或最新的可用消息以外的主题开始任意读取消息。在这种情况下,您需要使用ReaderBuilderCustomizer
要自定义ReaderBuilder
所以它知道正确的MessageId
开始。
5.10.2. 自定义 ReaderBuilder
您可以通过ReaderBuilder
使用PulsarReaderReaderBuilderCustomizer
在 Spring 中用于 Apache Pulsar。
您可以提供@Bean
的类型PulsarReaderReaderBuilderCustomizer
,然后将其提供给PulsarReader
如下。
@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
readerCustomizer = "myCustomizer")
void read(String message) {
//...
}
@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
return readerBuilder -> {
readerBuilder.startMessageId(messageId); // the first message read is after this message id.
// Any other customizations on the readerBuilder
};
}
如果您的应用程序只有一个@PulsarReader 和一个PulsarReaderReaderBuilderCustomizer bean 已注册,则将自动应用定制器。 |
6. 主题解析
在生成或使用消息时,需要目标主题。 框架会查找以下有序位置以确定主题(在第一次查找时停止):
-
用户指定
-
消息类型默认
-
全局默认值
当通过默认机制之一找到主题时,无需在 produce 或 consume API 上指定主题。
当找不到主题时,API 将相应地引发异常。
6.1. 用户指定
传递到正在使用的 API 中的主题具有最高优先级(例如。PulsarTemplate.send("my-topic", myMessage)
或@PulsarListener(topics = "my-topic"
).
6.2. 消息类型默认
当没有主题传递到 API 中时,系统会查找为正在生成或使用的消息类型配置的消息类型到主题映射。
映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
配置在消费或生成时使用的默认主题Foo
或Bar
消息:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.Foo
topic-name: foo-topic
- message-type: com.acme.Bar
topic-name: bar-topic
这message-type 是 Message 类的完全限定名称。 |
如果消息(或Publisher input) 为null ,框架将无法从中确定主题。如果您的应用程序可能会发送null 消息。 |
7. 发布和使用分区主题
在以下示例中,我们发布到名为hello-pulsar-partitioned
.
它是一个已分区的主题,对于此示例,我们假设该主题已创建具有三个分区。
@SpringBootApplication
public class PulsarBootPartitioned {
public static void main(String[] args) {
SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
}
@Bean
public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
return args -> {
for (int i = 0; i < 10; i++) {
pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
}
};
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
static class FooRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
}
}
static class BarRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 1;
}
}
static class BuzzRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 2;
}
}
}
在前面的示例中,我们发布到分区主题,并且我们希望将一些数据段发布到特定分区。
如果将其保留为 Pulsar 的默认值,则它遵循分区分配的循环模式,我们想覆盖它。
为此,我们提供了一个消息路由器对象,其中包含send
方法。
考虑实现的 3 个消息路由器。FooRouter
始终将数据发送到分区0
,BarRouter
发送到分区1
和BuzzRouter
发送到分区2
.
另请注意,我们现在使用sendAsync
method 的PulsarTemplate
返回一个CompletableFuture
.
在运行应用程序时,我们还需要设置messageRoutingMode
在 producer 上更改为CustomPartition
(spring.pulsar.producer.message-routing-mode
).
在消费者方面,我们使用PulsarListener
具有独占订阅类型。
这意味着来自所有分区的数据最终都位于同一个使用者中,并且没有排序保证。
如果我们希望每个分区都由单个不同的使用者使用,我们该怎么办?
我们可以切换到failover
订阅模式并添加三个单独的使用者:
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
System.out.println("Message Received 3: " + foo);
}
当您遵循此方法时,单个分区始终由专用使用者使用。
同样,如果你想使用 Pulsar 的共享消费者类型,你可以使用shared
订阅类型。
但是,当您使用shared
模式,您将失去任何排序保证,因为单个使用者可能会在另一个使用者有机会之前从所有分区收到消息。
请考虑以下示例:
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}