响应式支持
该框架为几乎所有受支持的功能提供了响应式对应项。
如果你把这个词
|
但是,尚不支持以下内容:
-
非共享订阅中的错误处理
-
通过以下方式访问 Pulsar 标头
@Header
在流媒体模式下 -
观察
1. 前言
我们建议对基于 Apache Pulsar 的应用程序使用 Spring-Boot-First 方法,因为这极大地简化了事情。为此,您可以将spring-pulsar-reactive-spring-boot-starter 模块作为依赖项。 |
此参考的大部分内容都希望读者使用Starters,并给出了考虑到这一点的大多数配置说明。但是,当指令特定于 Spring Boot Starters用法时,我们会努力指出。 |
2. 快速浏览
我们将通过展示一个以 Reactive 方式生成和使用的示例 Spring Boot 应用程序,快速浏览 Spring 中对 Apache Pulsar 的响应式支持。这是一个完整的应用程序,不需要任何额外的配置,只要你有一个在默认位置上运行的 Pulsar 集群 -localhost:6650
.
2.1. 依赖项
Spring Boot 应用程序只需要spring-pulsar-reactive-spring-boot-starter
Dependency。 以下列表分别显示了如何定义 Maven 和 Gradle 的依赖项:
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar-reactive</artifactId>
<version>3.2.12</version>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-pulsar-reactive:3.2.12'
}
使用时
|
2.2. 应用代码
以下是应用程序源代码:
@SpringBootApplication
public class ReactiveSpringPulsarHelloWorld {
public static void main(String[] args) {
SpringApplication.run(ReactiveSpringPulsarHelloWorld.class, args);
}
@Bean
ApplicationRunner runner(ReactivePulsarTemplate<String> pulsarTemplate) {
return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Reactive Pulsar World!").subscribe();
}
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println("Reactive listener received: " + message);
return Mono.empty();
}
}
就是这样,只需几行代码,我们就拥有了一个有效的 Spring Boot 应用程序,它以响应式方式生成和使用来自 Pulsar 主题的消息。
启动后,应用程序会使用ReactivePulsarTemplate
将消息发送到hello-pulsar-topic
.
然后,它从hello-pulsar-topic
使用@ReactivePulsarListener
.
简单性的关键要素之一是 Spring Boot Starters,它可以自动配置并为应用程序提供所需的组件 |
3. 设计
以下是需要牢记的几个关键设计要点。
3.1. Apache Pulsar 响应式
响应式支持最终由 Apache Pulsar Reactive 客户端提供,其当前实现是围绕常规 Pulsar 客户端异步 API 的完全非阻塞适配器。 这意味着响应式客户端需要常规客户端。
4. 响应式 Pulsar 客户端
当您使用 Reactive Pulsar Spring Boot Starter 时,您将获得ReactivePulsarClient
自动配置。
默认情况下,应用程序会尝试连接到本地 Pulsar 实例pulsar://localhost:6650
.
这可以通过设置spring.pulsar.client.service-url
属性设置为不同的值。
该值必须是有效的 Pulsar 协议 URL |
还有许多其他应用程序属性(继承自适应的命令式客户端)可供配置。
请参阅spring.pulsar.client.*
应用程序属性。
4.1. 身份验证
要连接到需要身份验证的 Pulsar 集群,请按照与命令式客户端相同的步骤进行作。 同样,这是因为响应式客户端适配了处理所有安全配置的命令式客户端。
5. 消息制作
5.1. 反应性脉冲星模板
在 Pulsar 生产者端,Spring Boot 自动配置提供了一个ReactivePulsarTemplate
用于发布记录。该模板实现了一个名为ReactivePulsarOperations
并提供通过其合同发布记录的方法。
该模板提供了接受单个消息并返回Mono<MessageId>
.
它还提供了接受多个消息的发送方法(以 ReactiveStreams 的形式Publisher
类型)并返回一个Flux<MessageId>
.
对于不包含主题参数的 API 变体,主题解析过程用于确定目标主题。 |
5.1.1. 流畅的 API
该模板提供了一个流畅的构建器来处理更复杂的发送请求。
5.1.2. 消息自定义
您可以指定一个MessageSpecBuilderCustomizer
以配置传出邮件。例如,以下代码演示如何发送密钥消息:
template.newMessage(msg)
.withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
.send();
5.1.3. 发送方自定义
您可以指定一个ReactiveMessageSenderBuilderCustomizer
以配置底层 Pulsar 发送器构建器,该构建器最终构建用于发送外发消息的发送者。
请谨慎使用,因为这提供了对发送方构建器的完全访问权限并调用其某些方法(例如create )可能会产生意想不到的副作用。 |
例如,以下代码演示如何禁用批处理和启用分块:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
.send();
另一个示例演示如何在将记录发布到分区主题时使用自定义路由。
指定您的自定义MessageRouter
在发送方构建器上实现,例如:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
.send();
请注意,当使用MessageRouter ,这是spring.pulsar.producer.message-routing-mode 是custom . |
5.2. 指定模式信息
如果您使用 Java 基元类型,则框架会自动为您检测模式,并且您无需指定任何模式类型来发布数据。
对于非原始类型,如果在调用ReactivePulsarTemplate
,Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从类型。
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和带有内联编码的KEY_VALUE。 |
5.2.1. 自定义模式映射
作为在调用发送作时指定模式的替代方法。ReactivePulsarTemplate
对于复杂类型,可以使用类型的映射来配置模式解析器。
这样就无需指定模式,因为框架使用传出消息类型查阅解析器。
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是消息类的完全限定名称。 |
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析器定制器来添加映射。
以下示例使用架构解析器定制器为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
使用此配置后,无需在发送作时设置指定架构。
5.3. ReactivePulsarSenderFactory
这ReactivePulsarTemplate
依赖于ReactivePulsarSenderFactory
实际创建基础发件人。
Spring Boot 提供了这个发送方工厂,它可以配置任何spring.pulsar.producer.*
应用程序属性。
5.3.1. 生产者缓存
每个底层 Pulsar 生产者都会消耗资源。
为了提高性能并避免持续创建生产者,ReactiveMessageSenderCache
在底层 Apache Pulsar Reactive 客户端中缓存它创建的生产者。
它们以 LRU 方式缓存,并在配置的时间段内未使用时逐出。
您可以通过指定任何spring.pulsar.producer.cache.*
应用程序属性。
6. 消息消费
6.1. @ReactivePulsarListener
对于 Pulsar 消费者,我们建议最终用户应用程序使用ReactivePulsarListener
注解。
使用ReactivePulsarListener
,您需要使用@EnableReactivePulsar
注解。
当您使用 Spring Boot 支持时,它会自动启用此 Comments 并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层 Pulsar 消费者)。
让我们重新审视一下ReactivePulsarListener
我们在快速浏览部分看到的代码片段:
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
监听器方法返回一个Mono<Void> 以指示消息是否已成功处理。Mono.empty() 表示成功(确认),并Mono.error() 表示失败(否定确认)。 |
您还可以进一步简化此方法:
@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
在这种最基本的形式中,当topics
未直接提供,则使用主题解析过程来确定目标主题。
同样,当subscriptionName
上未提供@ReactivePulsarListener
注释将使用自动生成的订阅名称。
在ReactivePulsarListener
方法,我们收到的数据为String
,但我们没有指定任何模式类型。
在内部,该框架依赖于 Pulsar 的模式机制将数据转换为所需的类型。
框架检测到您期望String
类型,然后根据该信息推断架构类型。
然后,它向使用者提供该架构。
对于 Java 中的所有原始类型,框架都会进行此推理。
对于任何复杂类型(例如 JSON、AVRO 等),框架无法执行此推理,用户需要使用schemaType
财产。
此示例显示了我们如何从主题中使用复杂类型:
@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
请注意,添加了一个schemaType
属性ReactivePulsarListener
.
这是因为库无法从提供的类型推断出架构类型:Foo
.我们必须告诉框架要使用什么模式。
让我们看看更多我们可以消费的方式。
此示例直接使用 Pulsar 消息:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
此示例使用包装在 Spring 消息信封中的记录:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
6.1.1. 流式处理
以上都是逐条消费单条记录的示例。 然而,使用 Reactive 的令人信服的原因之一是支持背压的流功能。
以下示例使用ReactivePulsarListener
要使用POJO流:
@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
在这里,我们将记录作为Flux
脉冲星消息。
此外,要在ReactivePulsarListener
级别,您需要将stream
属性设置为true
.
监听器方法返回一个Flux<MessageResult<Void>> 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。 这MessageResult 具有一组静态工厂方法,可用于创建适当的MessageResult 实例。 |
根据Flux
,框架会尝试推断要使用的模式。如果它包含复杂类型,您仍然需要提供schemaType
上ReactivePulsarListener
.
以下监听器使用 Spring 消息传递Message
具有复杂类型的信封:
@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
监听器方法返回一个Flux<MessageResult<Void>> 其中每个元素代表已处理的消息,并保存消息 ID、值以及是否被确认。弹簧MessageUtils 具有一组静态工厂方法,可用于创建适当的MessageResult 实例。 |
6.1.2. 配置 - 应用程序属性
监听器依赖于ReactivePulsarConsumerFactory
创建和管理它用于使用消息的底层 Pulsar 消费者。Spring Boot 提供了这个消费者工厂,您可以通过指定spring.pulsar.consumer.*
应用程序属性。工厂上的大多数配置属性将在侦听器中得到尊重,但以下情况除外:
这spring.pulsar.consumer.subscription.name 属性被忽略,而是在注释上未指定时生成。 |
这spring.pulsar.consumer.subscription-type 属性被忽略,而是取自注释上的值。但是,您可以将subscriptionType = {} 改为使用属性值作为默认值。 |
6.1.3. 消费者定制
您可以指定一个ReactivePulsarListenerMessageConsumerBuilderCustomizer
配置底层 Pulsar 消费者构建器,该构建器最终构造监听器用于接收消息的消费者。
请谨慎使用,因为这可以完全访问消费者构建器并调用其某些方法(例如create )可能会产生意想不到的副作用。 |
例如,以下代码演示如何将订阅的初始位置设置为主题上最早的消息。
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只有一个@ReactivePulsarListener 和单个ReactivePulsarListenerMessageConsumerBuilderCustomizer bean 注册,则定制器将自动应用。 |
您还可以使用定制器向消费者构建器提供直接的 Pulsar 消费者属性。
如果您不想使用前面提到的启动配置属性或有多个ReactivePulsarListener
配置不同的方法。
以下定制器示例使用直接的 Pulsar 使用者属性:
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 使用者属性,而不是spring.pulsar.consumer Spring Boot 配置属性 |
6.2. 指定模式信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在ReactivePulsarListener
.
对于非原始类型,如果未在 Comments 上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从类型。
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和带有内联编码的KEY_VALUE。 |
6.2.1. 自定义模式映射
作为在ReactivePulsarListener
对于复杂类型,可以使用类型的映射来配置模式解析器。
这样就无需在侦听器上设置模式,因为框架使用传入消息类型咨询解析器。
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是消息类的完全限定名称。 |
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析器定制器来添加映射。
以下示例使用架构解析器定制器为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
使用此配置后,无需在侦听器上设置架构,例如:
@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
6.3. 消息侦听器容器基础设施
在大多数情况下,我们建议使用ReactivePulsarListener
注释,以便直接从 Pulsar 主题中使用,因为该模型涵盖了广泛的应用程序用例。
但是,重要的是要了解如何ReactivePulsarListener
在内部工作。
当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息消费的核心。
这ReactivePulsarListener
在幕后使用消息监听器容器基础设施来创建和管理底层 Pulsar 消费者。
6.3.1. ReactivePulsarMessageListenerContainer
此消息侦听器容器的合约通过ReactivePulsarMessageListenerContainer
其默认实现创建一个响应式 Pulsar 使用者,并连接使用创建的使用者的响应式消息管道。
6.4. 并发
在流式处理模式下使用记录时 (stream = true
) 并发性通过客户端实现中的底层 Reactive 支持自然而然地实现。
但是,在逐个处理消息时,可以指定并发以提高处理吞吐量。
只需将concurrency
属性@ReactivePulsarListener
.
此外,当concurrency > 1
您可以确保消息按键排序,因此通过设置useKeyOrderedProcessing = "true"
在注释上。
同样,ReactiveMessagePipeline
做繁重的工作,我们只需在其上设置属性即可。
6.5. Pulsar 接头
Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头列表可以在PulsarHeaders.java中找到。
6.5.1. 在 OneByOne 监听器中访问
以下示例显示了在使用逐个消息监听器时如何访问 Pulsar 标头:
@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
在前面的示例中,我们访问messageId
消息元数据以及名为foo
.
Spring@Header
注释用于每个标题字段。
您还可以使用 Pulsar 的Message
作为承载有效载荷的信封。
这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。
但是,为了方便起见,您也可以使用Header
注解。
请注意,您还可以使用 Spring 消息传递Message
信封来承载有效负载,然后使用@Header
.
6.6. 消息确认
该框架会自动处理消息确认。 但是,侦听器方法必须发送一个信号,指示消息是否已成功处理。 然后,容器实现使用该信号执行 ack 或 nack作。 这与命令式对应项略有不同,在命令式对应项中,除非方法抛出异常,否则信号隐含为正。
6.7. 消息重新传递和错误处理
Apache Pulsar 为消息重新传递和错误处理提供了各种本机策略。 我们将看看它们,并了解如何通过 Spring for Apache Pulsar 使用它们。
6.7.1. 确认超时
默认情况下,除非使用者崩溃,否则 Pulsar 使用者不会重新传递消息,但您可以通过在 Pulsar 使用者上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 使用者在该超时期限内未确认消息,则会重新传递该消息。
您可以通过消费者定制器将此属性直接指定为 Pulsar 消费者属性,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeout", "60s");
}
6.7.2. 否定确认重新传递延迟
当确认否定时,Pulsar 消费者允许您指定应用程序希望如何重新传递消息。 默认设置是在一分钟内重新传递消息,但您可以通过消费者定制器进行更改,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
6.7.3. 死信主题
Apache Pulsar 允许应用程序在消费者上使用死信主题,并使用Shared
订阅类型。
对于Exclusive
和Failover
订阅类型,此功能不可用。
基本思想是,如果消息重试了一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用尽,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。
让我们通过检查一些代码片段来了解有关此功能的一些细节:
@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {
@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeout", "1s");
}
}
首先,我们有一个特殊的豆子DeadLetterPolicy
,并将其命名为deadLetterPolicy
(它可以是任何你想要的名称)。
此 bean 指定了许多内容,例如最大传递量(在本例中为 10)和死信主题的名称 —my-dlq-topic
,在本例中。如果未指定 DLQ 主题名称,则默认为<topicname>-<subscriptionname>-DLQ
在 Pulsar 中。接下来,我们将此 bean 名称提供给ReactivePulsarListener
通过将deadLetterPolicy
财产。 请注意,ReactivePulsarListener
订阅类型为Shared
,因为 DLQ 功能仅适用于共享订阅。此代码主要用于演示目的,因此我们提供了ackTimeout
值为 1 秒。这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它就会进行重试。如果该循环持续十次(因为这是我们在DeadLetterPolicy
),Pulsar 消费者将消息发布到 DLQ 主题。我们还有另一个ReactivePulsarListener
侦听 DLQ 主题以接收发布到 DLQ 主题的数据。
6.8. Pulsar Reader 支持
Spring Boot 提供了这个读取器工厂,可以配置任何spring.pulsar.reader.*
应用程序属性。
7. 主题解决
生成或使用消息时需要目标主题。 框架在以下有序位置查找以确定主题(在第一个查找处停止):
-
用户指定
-
默认消息类型
-
全局默认值
通过默认机制之一找到主题时,无需在生产或消费 API 上指定主题。
当找不到主题时,API 将相应地抛出异常。
7.1. 用户指定
传递到正在使用的 API 中的主题具有最高的优先级(例如PulsarTemplate.send("my-topic", myMessage)
或@PulsarListener(topics = "my-topic"
).
7.2. 消息类型默认
当没有主题传递到 API 时,系统会查找为正在生成或使用的消息类型配置的消息类型到主题的映射。
可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
配置使用或生产时要使用的默认主题Foo
或Bar
消息:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.Foo
topic-name: foo-topic
- message-type: com.acme.Bar
topic-name: bar-topic
这message-type 是消息类的完全限定名称。 |
如果消息(或Publisher 输入)是null ,则框架将无法从中确定主题。如果您的应用程序可能会发送null 消息。 |