消息消费
1. @ReactivePulsarListener
对于 Pulsar 消费者,我们建议最终用户应用程序使用ReactivePulsarListener
注解。
要使用ReactivePulsarListener
,您需要使用@EnableReactivePulsar
注解。
当您使用 Spring Boot 支持时,它会自动启用此注释并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层 Pulsar 消费者)。
让我们重新审视一下ReactivePulsarListener
我们在 Quick-Tour 部分看到的代码片段:
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
listener 方法返回一个Mono<Void> 以指示消息是否已成功处理。Mono.empty() 表示成功 (确认) 和Mono.error() 表示失败(否定确认)。 |
您还可以进一步简化此方法:
@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
在这种最基本的形式中,当topics
未直接提供,则使用主题解析过程来确定目标主题。
同样,当subscriptionName
未在@ReactivePulsarListener
annotation 将使用自动生成的订阅名称。
在ReactivePulsarListener
方法,我们接收数据为String
,但我们没有指定任何 schema 类型。
在内部,该框架依赖于 Pulsar 的 schema 机制将数据转换为所需的类型。
框架检测到您希望String
type 的 Schema 类型,然后根据该信息推断 schema 类型,并将该 schema 提供给使用者。
框架对所有基元类型执行此推理。
对于所有非基元类型,默认架构假定为 JSON。
如果复杂类型使用 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须使用schemaType
财产。
此示例显示了我们如何使用 topic 中的复杂类型:
@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
让我们看看我们可以消费的更多方式。
此示例直接使用 Pulsar 消息:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
此示例使用包装在 Spring 消息传递信封中的记录:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
1.1. 流式处理
以上都是逐个使用单个记录的示例。 然而,使用 Reactive 的一个令人信服的理由是具有背压支持的流功能。
以下示例使用ReactivePulsarListener
要使用 POJO 流:
@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
在这里,我们以Flux
的 Pulsar 消息。
此外,要在ReactivePulsarListener
level 中,您需要设置stream
属性设置为true
.
listener 方法返回一个Flux<MessageResult<Void>> 其中,每个元素表示一条已处理的消息,并保存消息 ID、值以及是否被确认。
这MessageResult 有一组静态工厂方法,可用于创建适当的MessageResult 实例。 |
根据Flux
,框架会尝试推断要使用的 schema。
如果它包含复杂类型,您仍然需要提供schemaType
上ReactivePulsarListener
.
以下侦听器使用 Spring 消息传递Message
具有复杂类型的 envelope :
@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
listener 方法返回一个Flux<MessageResult<Void>> 其中,每个元素表示一条已处理的消息,并保存消息 ID、值以及是否被确认。
SpringMessageUtils 有一组静态工厂方法,可用于创建适当的MessageResult 实例。
这MessageUtils 为 Spring 消息提供与MessagResult 用于 Pulsar 消息。 |
不支持使用org.apache.pulsar.client.api.Messages<T> 在@ReactivePulsarListener
|
1.2. 配置 - 应用程序属性
侦听器依赖于ReactivePulsarConsumerFactory
创建和管理用于消费消息的底层 Pulsar 消费者。
Spring Boot 提供了这个消费者工厂,您可以通过指定spring.pulsar.consumer.*
应用程序属性。工厂上的大多数已配置属性将在侦听器中得到遵守,但以下情况除外:
这spring.pulsar.consumer.subscription.name 属性将被忽略,而是在未在 Comments 上指定时生成。 |
这spring.pulsar.consumer.subscription-type property 被忽略,而是取自 annotation 上的值。但是,您可以将subscriptionType = {} 以改用 Property 值作为默认值。 |
1.3. 使用 AUTO_CONSUME 的通用记录
如果无法提前知道某个 Pulsar 主题的 schema 类型,可以使用AUTO_CONSUME
schema 类型来使用通用记录。
在这种情况下,主题将消息反序列化为GenericRecord
对象使用与主题关联的架构信息。
要使用通用记录,请将schemaType = SchemaType.AUTO_CONSUME
在您的@ReactivePulsarListener
并使用GenericRecord
作为 message 参数,如下所示。
@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
return Mono.empty();
}
这GenericRecord API 允许访问字段及其关联值 |
1.4. 消费者定制
您可以指定ReactivePulsarListenerMessageConsumerBuilderCustomizer
配置底层 Pulsar consumer builder,最终构建侦听器用来接收消息的 consumer。
请谨慎使用,因为这会提供对使用者构建器的完全访问权限,并调用其某些方法(例如create ) 可能会产生意想不到的副作用。 |
例如,下面的代码演示如何将订阅的初始位置设置为主题上最早的消息。
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只有一个@ReactivePulsarListener 和一个ReactivePulsarListenerMessageConsumerBuilderCustomizer bean 已注册,则将自动应用定制器。 |
您还可以使用定制器向 Consumer Builder 提供直接的 Pulsar Consumer 属性。
如果您不想使用前面提到的 Boot 配置属性或具有多个ReactivePulsarListener
方法。
以下定制器示例使用直接 Pulsar 消费者属性:
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 消费者属性,而不是spring.pulsar.consumer Spring Boot 配置属性 |
2. 指定 Schema 信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在ReactivePulsarListener
.
对于非原始类型,如果未在注解上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从 type.
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE w/ INLINE 编码。 |
2.1. 自定义 Schema 映射
作为在ReactivePulsarListener
对于复杂类型,可以使用类型的映射配置 Schema Resolver。
这样就无需在侦听器上设置 schema,因为框架使用传入消息类型咨询解析程序。
2.1.1. 配置属性
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
要为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是 Message 类的完全限定名称。 |
2.1.2. Schema 解析器定制器
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。
以下示例使用架构解析程序定制器为User
和Address
复杂对象 usingAVRO
和JSON
schemas 的 Schemas:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
指定要用于特定消息类型的默认架构信息的另一个选项是使用@PulsarMessage
注解。
架构信息可以通过schemaType
属性。
以下示例将系统配置为在生成或使用 JSON 类型的消息时使用 JSON 作为默认架构Foo
:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了这个配置,就不需要在侦听器上设置 schema,例如:
@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
3. 消息侦听器容器基础设施
在大多数情况下,我们建议使用ReactivePulsarListener
注解直接用于从 Pulsar 主题中使用,因为该模型涵盖了广泛的应用程序用例。
但是,了解如何作非常重要ReactivePulsarListener
在内部工作。
当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息使用的核心。
这ReactivePulsarListener
在幕后使用消息监听器容器基础设施来创建和管理底层 Pulsar 消费者。
3.1. ReactivePulsarMessageListenerContainer
此消息侦听器容器的协定是通过ReactivePulsarMessageListenerContainer
其默认实现会创建一个反应式 Pulsar 消费者,并连接一个使用创建的消费者的反应式消息管道。
4. 并发
在流式处理模式下使用记录 (stream = true
) 并发性自然而然地通过客户端实现中的底层 Reactive 支持来实现。
但是,在逐个处理消息时,可以指定并发以提高处理吞吐量。
只需将concurrency
属性@ReactivePulsarListener
.
此外,当concurrency > 1
你可以确保消息按键排序,从而通过设置useKeyOrderedProcessing = "true"
在注释上。
同样,ReactiveMessagePipeline
执行繁重的工作,我们只需为其设置 Properties。
5. Pulsar 标头
Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头的列表可以在 PulsarHeaders.java 中找到。
5.1. 在 OneByOne Listener 中访问
以下示例显示了在使用逐个消息侦听器时如何访问 Pulsar Headers:
@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
在前面的示例中,我们访问messageId
message 元数据以及名为foo
.
Spring@Header
annotation 用于每个 Header 字段。
您还可以使用 Pulsar 的Message
作为 envelope 来承载负载。
这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。
但是,为方便起见,您也可以使用Header
注解。
请注意,您还可以使用 Spring 消息传递Message
envelope 来携带 payload,然后使用@Header
.
6. 消息鸣谢
7. 消息重新投递和错误处理
Apache Pulsar 提供了各种用于消息重新传递和错误处理的原生策略。 我们将看看它们,并了解如何通过 Spring for Apache Pulsar 使用它们。
7.1. 确认超时
默认情况下,除非 Consumer 崩溃,否则 Pulsar Consumer 不会重新传递消息,但你可以通过在 Pulsar Consumer 上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 消费者在该超时时间内没有确认消息,则重新传递该消息。
你可以通过消费者定制器将此属性直接指定为 Pulsar 消费者属性,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeout", "60s");
}
7.2. 否定确认重新传递延迟
当否定确认时,Pulsar consumer 允许你指定应用程序希望如何重新传递消息。 默认情况下,在一分钟内重新传送消息,但您可以通过使用者定制器进行更改,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
7.3. 死信主题
Apache Pulsar 允许应用程序在消费者上使用死信主题,并带有Shared
订阅类型。
对于Exclusive
和Failover
订阅类型,则此功能不可用。
基本思想是,如果消息重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。
让我们通过检查一些代码片段来了解有关此功能的一些细节:
@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {
@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeout", "1s");
}
}
首先,我们有一个特殊的 beanDeadLetterPolicy
,它被命名为deadLetterPolicy
(您可以根据需要使用任何名称)。
这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 —my-dlq-topic
,在本例中。
如果未指定 DLQ 主题名称,则默认为<topicname>-<subscriptionname>-DLQ
在 Pulsar 中。
接下来,我们将此 bean 名称提供给ReactivePulsarListener
通过设置deadLetterPolicy
财产。
请注意,ReactivePulsarListener
订阅类型的值为Shared
,因为 DLQ 功能仅适用于共享订阅。
此代码主要用于演示目的,因此我们提供了一个ackTimeout
值为 1 秒。
这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它会重试。
如果该周期持续 10 次(因为这是我们在DeadLetterPolicy
),则 Pulsar consumer 会将消息发布到 DLQ 主题。
我们还有另一个ReactivePulsarListener
侦听 DLQ 主题以在发布到 DLQ 主题时接收数据。
8. Pulsar Reader 支持
该框架支持以响应式方式使用 Pulsar Reader,通过ReactivePulsarReaderFactory
.
Spring Boot 提供了这个 reader 工厂,它可以配置任何spring.pulsar.reader.*
应用程序属性。