该框架为几乎所有支持的功能提供了 Reactive 对应项。
如果你把这个词放在提供的命令式组件前面,你很可能会找到它的 Reactive 对应物。
|
但是,尚不支持以下各项:
-
非共享订阅中的错误处理
-
通过流式模式访问 Pulsar 头文件
@Header
-
观察
如果你把这个词放在提供的命令式组件前面,你很可能会找到它的 Reactive 对应物。
|
1. 前言
我们建议对基于 Apache Pulsar 的 Spring 应用程序使用 Spring-Boot-First 方法,因为这极大地简化了事情。
为此,您可以将模块添加为依赖项。spring-pulsar-reactive-spring-boot-starter |
此参考的大部分内容都希望读者使用 starter,并在考虑这一点的情况下提供大多数配置说明。 但是,当说明特定于 Spring Boot Starters用法时,会努力调用。 |
我们建议对基于 Apache Pulsar 的 Spring 应用程序使用 Spring-Boot-First 方法,因为这极大地简化了事情。
为此,您可以将模块添加为依赖项。spring-pulsar-reactive-spring-boot-starter |
此参考的大部分内容都希望读者使用 starter,并在考虑这一点的情况下提供大多数配置说明。 但是,当说明特定于 Spring Boot Starters用法时,会努力调用。 |
2. 快速导览
我们将通过展示一个以 Reactive 方式生成和消费的示例 Spring Boot 应用程序,快速浏览一下 Spring for Apache Pulsar 中的 Reactive 支持。
这是一个完整的应用程序,不需要任何其他配置,只要您在默认位置 - 上运行 Pulsar 集群即可。localhost:6650
2.1. 依赖项
Spring Boot 应用程序只需要依赖项。下面的清单分别显示了如何定义 Maven 和 Gradle 的依赖关系:spring-pulsar-reactive-spring-boot-starter
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar-reactive</artifactId>
<version>3.2.11-SNAPSHOT</version>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-pulsar-reactive:3.2.11-SNAPSHOT'
}
使用上述坐标时,更改如下:
|
2.2. 应用程序代码
以下是应用程序源代码:
@SpringBootApplication
public class ReactiveSpringPulsarHelloWorld {
public static void main(String[] args) {
SpringApplication.run(ReactiveSpringPulsarHelloWorld.class, args);
}
@Bean
ApplicationRunner runner(ReactivePulsarTemplate<String> pulsarTemplate) {
return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Reactive Pulsar World!").subscribe();
}
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println("Reactive listener received: " + message);
return Mono.empty();
}
}
就是这样,只需几行代码,我们就有了一个可以工作的 Spring Boot 应用程序,它以响应式方式生成和消费来自 Pulsar 主题的消息。
启动后,应用程序使用 向 发送消息。
然后,它使用 .ReactivePulsarTemplate
hello-pulsar-topic
hello-pulsar-topic
@ReactivePulsarListener
简单性的关键要素之一是 Spring Boot Starters,它可以自动配置并为应用程序提供所需的组件 |
使用上述坐标时,更改如下:
|
简单性的关键要素之一是 Spring Boot Starters,它可以自动配置并为应用程序提供所需的组件 |
3. 设计
以下是需要牢记的几个关键设计点。
3.1. Apache Pulsar 反应式
反应式支持最终由 Apache Pulsar Reactive 客户端提供,其当前实现是围绕常规 Pulsar 客户端异步 API 的完全非阻塞适配器。 这意味着 Reactive 客户端需要常规客户端。
4. 反应式 Pulsar 客户端
当您使用 Reactive Pulsar Spring Boot Starter 时,您将获得自动配置。ReactivePulsarClient
默认情况下,应用程序会尝试连接到 位于 的本地 Pulsar 实例。
这可以通过将属性设置为其他值来调整。pulsar://localhost:6650
spring.pulsar.client.service-url
该值必须是有效的 Pulsar 协议 URL |
还有许多其他应用程序属性(继承自改编的命令式客户端)可供配置。
请参阅 spring.pulsar.client.*
应用程序属性。
4.1. 身份验证
要连接到需要身份验证的 Pulsar 集群,请按照与命令式客户端相同的步骤进行操作。 同样,这是因为反应式 Client 端采用了处理所有安全配置的命令式 Client 端。
该值必须是有效的 Pulsar 协议 URL |
5. 消息制作
5.1. ReactivePulsarTemplate
在 Pulsar 生产者端,Spring Boot 自动配置提供了一个用于发布记录的功能。该模板实现一个称为的接口,并提供通过其协定发布记录的方法。ReactivePulsarTemplate
ReactivePulsarOperations
该模板提供了 send 方法,这些方法接受单个消息并返回 .
它还提供了接受多条消息(以 ReactiveStreams 类型的形式)并返回 .Mono<MessageId>
Publisher
Flux<MessageId>
对于不包含 topic 参数的 API 变体,将使用主题解析过程来确定目标主题。 |
5.1.1. Fluent API
该模板提供了一个 Fluent 构建器来处理更复杂的发送请求。
5.1.2. 消息自定义
您可以指定 a 来配置传出消息。例如,以下代码演示如何发送键控消息:MessageSpecBuilderCustomizer
template.newMessage(msg)
.withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
.send();
5.1.3. 发件人自定义
你可以指定 a 来配置底层 Pulsar sender 构建器,该构建器最终构建用于发送外发消息的 sender。ReactiveMessageSenderBuilderCustomizer
请谨慎使用,因为这会提供对 sender builder 的完全访问权限,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create |
例如,以下代码演示如何禁用批处理和启用分块:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
.send();
另一个示例显示了在将记录发布到分区主题时如何使用自定义路由。
在发件人生成器上指定您的自定义实施,例如:MessageRouter
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
.send();
请注意,使用 时,唯一有效的设置为 是 。MessageRouter spring.pulsar.producer.message-routing-mode custom |
5.2. 指定 Schema 信息
如果您使用 Java 基元类型,框架会自动为您检测架构,并且您无需指定任何架构类型来发布数据。
对于非原始类型,如果在 上调用 send 操作时未显式指定 Schema ,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。ReactivePulsarTemplate
Schema.JSON
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。 |
5.2.1. 自定义 Schema 映射
作为在对 for 复杂类型调用 send 操作时指定架构的替代方法,可以使用类型的映射配置架构解析程序。
这样就无需在框架使用传出消息类型咨询解析程序时指定架构。ReactivePulsarTemplate
可以使用属性配置架构映射。
以下示例用于分别使用 和 schema 为 和 complex 对象添加映射:spring.pulsar.defaults.type-mappings
application.yml
User
Address
AVRO
JSON
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这是 message 类的完全限定名称。message-type |
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。
以下示例使用架构解析程序定制器分别使用 and schemas 为 和 complex 对象添加映射:User
Address
AVRO
JSON
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
有了这个配置,就不需要设置 specify the schema on send 操作。
5.3. ReactivePulsarSenderFactory
这依赖于 a 来实际创建底层发件人。ReactivePulsarTemplate
ReactivePulsarSenderFactory
Spring Boot 提供了这个 sender factory,可以使用任何 spring.pulsar.producer.*
应用程序属性进行配置。
如果在直接使用发送方工厂 API 时未指定主题信息,则使用与 相同的主题解析过程,但省略了“消息类型默认”步骤。ReactivePulsarTemplate |
5.3.1. 生产者缓存
每个底层 Pulsar 生产者都会消耗资源。
为了提高性能并避免持续创建 producer,底层 Apache Pulsar Reactive 客户端会缓存它创建的 producer。
它们以 LRU 方式缓存,并在配置的时间段内未使用时被驱逐。ReactiveMessageSenderCache
你可以通过指定任何 spring.pulsar.producer.cache.*
应用程序属性来配置缓存设置。
对于不包含 topic 参数的 API 变体,将使用主题解析过程来确定目标主题。 |
请谨慎使用,因为这会提供对 sender builder 的完全访问权限,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create |
请注意,使用 时,唯一有效的设置为 是 。MessageRouter spring.pulsar.producer.message-routing-mode custom |
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。 |
这是 message 类的完全限定名称。message-type |
如果在直接使用发送方工厂 API 时未指定主题信息,则使用与 相同的主题解析过程,但省略了“消息类型默认”步骤。ReactivePulsarTemplate |
6. 消息消费
6.1. @ReactivePulsarListener
对于 Pulsar 消费者,我们建议最终用户应用程序使用 Annotation。
要使用 ,您需要使用注释。
当您使用 Spring Boot 支持时,它会自动启用此注释并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层 Pulsar 消费者)。ReactivePulsarListener
ReactivePulsarListener
@EnableReactivePulsar
让我们重新访问一下我们在快速浏览部分看到的代码片段:ReactivePulsarListener
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
listener 方法返回 a 以指示消息是否已成功处理。 表示成功 (确认) 并指示失败 (否定确认)。Mono<Void> Mono.empty() Mono.error() |
您还可以进一步简化此方法:
@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
在这种最基本的形式中,当未直接提供时,将使用主题解析过程来确定目标主题。
同样,当 annotation 上未提供 the 时,将使用自动生成的订阅名称。topics
subscriptionName
@ReactivePulsarListener
在前面显示的方法中,我们以 形式接收数据,但我们没有指定任何架构类型。
在内部,该框架依赖于 Pulsar 的 schema 机制将数据转换为所需的类型。
框架会检测到您期望的类型,然后根据该信息推断架构类型。
然后,它将该架构提供给使用者。
对于 Java 中的所有原始类型,框架都会执行此推理。
对于任何复杂类型(例如 JSON、AVRO 等),框架都无法执行此推理,用户需要使用属性在注释上提供架构类型。ReactivePulsarListener
String
String
schemaType
此示例显示了我们如何使用 topic 中的复杂类型:
@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
请注意在 上添加了一个属性。
这是因为该库无法从提供的类型推断架构类型: 。我们必须告诉框架要使用什么 schema。schemaType
ReactivePulsarListener
Foo
让我们看看我们可以消费的更多方式。
此示例直接使用 Pulsar 消息:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
此示例使用包装在 Spring 消息传递信封中的记录:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
6.1.1. 流
以上都是逐个使用单个记录的示例。 然而,使用 Reactive 的一个令人信服的理由是具有背压支持的流功能。
以下示例用于使用 POJO 流:ReactivePulsarListener
@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
在这里,我们接收 Pulsar 消息的记录。
此外,要在级别启用流使用,您需要将注释上的属性设置为 .Flux
ReactivePulsarListener
stream
true
listener 方法返回一个 where每个元素表示一个已处理的消息,并保存消息 ID、值以及它是否被确认。
它有一组静态工厂方法,可用于创建适当的实例。Flux<MessageResult<Void>> MessageResult MessageResult |
根据 中消息的实际类型,框架会尝试推断要使用的架构。
如果它包含复杂类型,您仍然需要提供 on .Flux
schemaType
ReactivePulsarListener
以下侦听器使用具有 complex type 的 Spring 消息传递信封:Message
@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
listener 方法返回一个 where每个元素表示一个已处理的消息,并保存消息 ID、值以及它是否被确认。
Spring 有一组静态工厂方法,可用于从 Spring 消息创建适当的实例。Flux<MessageResult<Void>> MessageUtils MessageResult |
6.1.2. 配置 - 应用程序属性
侦听器依赖于 来创建和管理它用于消费消息的底层 Pulsar 消费者。
Spring Boot 提供了这个消费者工厂,你可以通过指定 spring.pulsar.consumer.*
应用程序属性来进一步配置它。工厂上的大多数已配置属性将在侦听器中得到遵守,但以下情况除外:ReactivePulsarConsumerFactory
该属性将被忽略,而是在未在 annotation 上指定时生成。spring.pulsar.consumer.subscription.name |
该属性将被忽略,而是从 Comments 上的值中获取。但是,您可以将 on the annotation 设置为改用 property 值作为默认值。spring.pulsar.consumer.subscription-type subscriptionType = {} |
6.1.3. 消费者定制
你可以指定 a 来配置底层 Pulsar consumer builder,该 builder 最终构造侦听器用来接收消息的 consumer。ReactivePulsarListenerMessageConsumerBuilderCustomizer
请谨慎使用,因为这提供了对 Consumer Builder 的完全访问权限,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create |
例如,下面的代码演示如何将订阅的初始位置设置为主题上最早的消息。
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只注册了一个 bean 和一个 bean,则将自动应用定制器。@ReactivePulsarListener ReactivePulsarListenerMessageConsumerBuilderCustomizer |
您还可以使用定制器向 Consumer Builder 提供直接的 Pulsar Consumer 属性。
如果您不想使用前面提到的 Boot 配置属性或有多个配置不同的方法,这将非常方便。ReactivePulsarListener
以下定制器示例使用直接 Pulsar 消费者属性:
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 消费者属性,而不是 Spring Boot 配置属性spring.pulsar.consumer |
6.2. 指定 Schema 信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在 .
对于非原始类型,如果未在注解上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。ReactivePulsarListener
Schema.JSON
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。 |
6.2.1. 自定义 Schema 映射
作为在 for complex types 上指定架构的替代方法,可以使用类型的映射配置架构解析程序。
这样就无需在侦听器上设置 schema,因为框架使用传入消息类型咨询解析程序。ReactivePulsarListener
可以使用属性配置架构映射。
以下示例用于分别使用 和 schema 为 和 complex 对象添加映射:spring.pulsar.defaults.type-mappings
application.yml
User
Address
AVRO
JSON
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这是 message 类的完全限定名称。message-type |
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。
以下示例使用架构解析程序定制器分别使用 and schemas 为 和 complex 对象添加映射:User
Address
AVRO
JSON
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
有了这个配置,就不需要在侦听器上设置 schema,例如:
@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
6.3. 消息侦听器容器基础设施
在大多数情况下,我们建议直接使用 Annotation 从 Pulsar 主题消费,因为该模型涵盖了广泛的应用程序用例。
但是,了解内部的工作原理很重要。ReactivePulsarListener
ReactivePulsarListener
当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息使用的核心。
它在幕后使用消息侦听器容器基础设施来创建和管理底层 Pulsar 消费者。ReactivePulsarListener
6.3.1. ReactivePulsarMessageListenerContainer
此消息侦听器容器的 Contract 通过其默认实现创建反应式 Pulsar Consumer,并连接使用创建的 Consumer 的反应式消息管道。ReactivePulsarMessageListenerContainer
6.3.2. ReactiveMessagePipeline
管道是底层 Apache Pulsar Reactive 客户端的一项功能,它以反应式方式完成接收数据,然后将其移交给提供的消息处理程序的繁重工作。反应式消息侦听器容器实现要简单得多,因为管道处理大部分工作。
6.3.3. ReactivePulsarMessageHandler
“listener” 方面由 提供,其中有两个提供的实现:ReactivePulsarMessageHandler
-
ReactivePulsarOneByOneMessageHandler
- 逐个处理单个消息 -
ReactivePulsarStreamingHandler
- 通过Flux
如果在直接使用侦听器容器时未指定主题信息,则使用与 相同的主题解析过程,但省略了 “Message type default” 步骤。ReactivePulsarListener |
6.4. 并发
当以流模式 () 使用记录时,并发性自然是通过客户端实现中的底层 Reactive 支持实现的。stream = true
但是,在逐个处理消息时,可以指定并发以提高处理吞吐量。
只需将属性设置为 .
此外,当您可以确保消息按键排序,从而通过对 annotation 进行设置来发送到同一处理程序时。concurrency
@ReactivePulsarListener
concurrency > 1
useKeyOrderedProcessing = "true"
同样,它做了繁重的工作,我们只需设置它的属性。ReactiveMessagePipeline
6.5. Pulsar 头文件
Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头的列表可以在 PulsarHeaders.java 中找到。
6.5.1. 在 OneByOne Listener 中访问
以下示例显示了在使用逐个消息侦听器时如何访问 Pulsar Headers:
@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
在前面的示例中,我们访问消息元数据的值以及名为 .
Spring 注解用于每个 header 字段。messageId
foo
@Header
你也可以使用 Pulsar 的 作为 envelope 来携带有效载荷。
这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。
但是,为方便起见,您也可以使用 Comments 来检索它。
请注意,您还可以使用 Spring 消息传递信封来携带有效负载,然后使用 检索 Pulsar 标头。Message
Header
Message
@Header
6.6. 消息确认
框架会自动处理消息确认。 但是,侦听器方法必须发送一个信号,指示消息是否已成功处理。 然后,容器实现使用该信号执行 ack 或 nack 操作。 这与它的命令式对应项略有不同,在命令式中,除非方法引发异常,否则信号被暗示为正。
6.7. 消息重新投递和错误处理
Apache Pulsar 提供了各种用于消息重新传递和错误处理的原生策略。 我们将看看它们,并了解如何通过 Spring for Apache Pulsar 使用它们。
6.7.1. 鸣谢超时
默认情况下,除非 Consumer 崩溃,否则 Pulsar Consumer 不会重新传递消息,但你可以通过在 Pulsar Consumer 上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 消费者在该超时时间内没有确认消息,则重新传递该消息。
你可以通过消费者定制器将此属性直接指定为 Pulsar 消费者属性,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeout", "60s");
}
6.7.2. 否定确认重新传递延迟
当否定确认时,Pulsar consumer 允许你指定应用程序希望如何重新传递消息。 默认情况下,在一分钟内重新传送消息,但您可以通过使用者定制器进行更改,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
6.7.3. Dead Letter 主题
Apache Pulsar 允许应用程序在具有订阅类型的消费者上使用死信主题。
对于 和 订阅类型,此功能不可用。
基本思想是,如果消息重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。
让我们通过检查一些代码片段来了解有关此功能的一些细节:Shared
Exclusive
Failover
@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {
@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeout", "1s");
}
}
首先,我们有一个特殊的 bean for ,它被命名为 (它可以是你想要的任何名称)。
这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 — ,在本例中。
如果不指定 DLQ 主题名称,则默认为 in Pulsar。
接下来,我们通过设置属性来提供此 bean 名称。
请注意,它的订阅类型为 ,因为 DLQ 功能仅适用于共享订阅。
此代码主要用于演示目的,因此我们提供 1 秒的值。
这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它会重试。
如果该周期持续 10 次(因为这是我们在 中的最大重新传递计数),则 Pulsar consumer 会将消息发布到 DLQ 主题。
我们还有另一个侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。DeadLetterPolicy
deadLetterPolicy
my-dlq-topic
<topicname>-<subscriptionname>-DLQ
ReactivePulsarListener
deadLetterPolicy
ReactivePulsarListener
Shared
ackTimeout
DeadLetterPolicy
ReactivePulsarListener
6.8. Pulsar Reader 支持
Spring Boot 提供了这个 reader 工厂,可以使用任何 spring.pulsar.reader.*
应用程序属性进行配置。
listener 方法返回 a 以指示消息是否已成功处理。 表示成功 (确认) 并指示失败 (否定确认)。Mono<Void> Mono.empty() Mono.error() |
listener 方法返回一个 where每个元素表示一个已处理的消息,并保存消息 ID、值以及它是否被确认。
它有一组静态工厂方法,可用于创建适当的实例。Flux<MessageResult<Void>> MessageResult MessageResult |
listener 方法返回一个 where每个元素表示一个已处理的消息,并保存消息 ID、值以及它是否被确认。
Spring 有一组静态工厂方法,可用于从 Spring 消息创建适当的实例。Flux<MessageResult<Void>> MessageUtils MessageResult |
该属性将被忽略,而是在未在 annotation 上指定时生成。spring.pulsar.consumer.subscription.name |
该属性将被忽略,而是从 Comments 上的值中获取。但是,您可以将 on the annotation 设置为改用 property 值作为默认值。spring.pulsar.consumer.subscription-type subscriptionType = {} |
请谨慎使用,因为这提供了对 Consumer Builder 的完全访问权限,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create |
如果您的应用程序只注册了一个 bean 和一个 bean,则将自动应用定制器。@ReactivePulsarListener ReactivePulsarListenerMessageConsumerBuilderCustomizer |
使用的属性是直接的 Pulsar 消费者属性,而不是 Spring Boot 配置属性spring.pulsar.consumer |
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。 |
这是 message 类的完全限定名称。message-type |
如果在直接使用侦听器容器时未指定主题信息,则使用与 相同的主题解析过程,但省略了 “Message type default” 步骤。ReactivePulsarListener |
7. 主题解析
在生成或使用消息时,需要目标主题。 框架会查找以下有序位置以确定主题(在第一次查找时停止):
-
用户指定
-
消息类型默认
-
全局默认值
当通过默认机制之一找到主题时,无需在 produce 或 consume API 上指定主题。
当找不到主题时,API 将相应地引发异常。
7.1. 用户指定
传递到正在使用的 API 中的主题具有最高优先级(例如。 或 )。PulsarTemplate.send("my-topic", myMessage)
@PulsarListener(topics = "my-topic"
7.2. 消息类型默认
当没有主题传递到 API 中时,系统会查找为正在生成或使用的消息类型配置的消息类型到主题映射。
可以使用 属性 配置映射。
以下示例用于配置在消费或生成消息时使用的默认主题:spring.pulsar.defaults.type-mappings
application.yml
Foo
Bar
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.Foo
topic-name: foo-topic
- message-type: com.acme.Bar
topic-name: bar-topic
这是 message 类的完全限定名称。message-type |
如果消息(或输入的第一条消息)是 ,则框架将无法从中确定主题。如果您的应用程序可能发送消息,则应使用另一种方法来指定主题。Publisher null null |
这是 message 类的完全限定名称。message-type |
如果消息(或输入的第一条消息)是 ,则框架将无法从中确定主题。如果您的应用程序可能发送消息,则应使用另一种方法来指定主题。Publisher null null |