1. ReactivePulsarTemplate

在 Pulsar 生产者端,Spring Boot 自动配置提供了一个用于发布记录的功能。该模板实现一个称为的接口,并提供通过其协定发布记录的方法。ReactivePulsarTemplateReactivePulsarOperationsspring-doc.cn

该模板提供了 send 方法,这些方法接受单个消息并返回 . 它还提供了接受多条消息(以 ReactiveStreams 类型的形式)并返回 .Mono<MessageId>PublisherFlux<MessageId>spring-doc.cn

对于不包含 topic 参数的 API 变体,将使用主题解析过程来确定目标主题。

1.1. Fluent API

该模板提供了一个 Fluent 构建器来处理更复杂的发送请求。spring-doc.cn

1.2. 消息自定义

您可以指定 a 来配置传出消息。例如,以下代码演示如何发送键控消息:MessageSpecBuilderCustomizerspring-doc.cn

template.newMessage(msg)
    .withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
    .send();

1.3. 发件人自定义

你可以指定 a 来配置底层 Pulsar sender 构建器,该构建器最终构建用于发送外发消息的 sender。ReactiveMessageSenderBuilderCustomizerspring-doc.cn

请谨慎使用,因为这会提供对 sender builder 的完全访问权限,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create

例如,以下代码演示如何禁用批处理和启用分块:spring-doc.cn

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
    .send();

另一个示例显示了在将记录发布到分区主题时如何使用自定义路由。 在发件人生成器上指定您的自定义实施,例如:MessageRouterspring-doc.cn

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
    .send();
请注意,使用 时,唯一有效的设置为 是 。MessageRouterspring.pulsar.producer.message-routing-modecustom
对于不包含 topic 参数的 API 变体,将使用主题解析过程来确定目标主题。
请谨慎使用,因为这会提供对 sender builder 的完全访问权限,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create
请注意,使用 时,唯一有效的设置为 是 。MessageRouterspring.pulsar.producer.message-routing-modecustom

2. 指定 Schema 信息

如果您使用 Java 基元类型,框架会自动为您检测架构,并且您无需指定任何架构类型来发布数据。 对于非原始类型,如果在 上调用 send 操作时未显式指定 Schema ,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。ReactivePulsarTemplateSchema.JSONspring-doc.cn

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和 KEY_VALUE w/ INLINE 编码。

2.1. 自定义 Schema 映射

作为在对 for 复杂类型调用 send 操作时指定架构的替代方法,可以使用类型的映射配置架构解析程序。 这样就无需在框架使用传出消息类型咨询解析程序时指定架构。ReactivePulsarTemplatespring-doc.cn

2.1.1. 配置属性

可以使用属性配置架构映射。 以下示例用于分别使用 和 schema 为 和 complex 对象添加映射:spring.pulsar.defaults.type-mappingsapplication.ymlUserAddressAVROJSONspring-doc.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
这是 message 类的完全限定名称。message-type

2.1.2. Schema 解析器定制器

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。spring-doc.cn

以下示例使用架构解析程序定制器分别使用 and schemas 为 和 complex 对象添加映射:UserAddressAVROJSONspring-doc.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 类型映射注解

指定要用于特定消息类型的默认架构信息的另一个选项是使用注释标记消息类。 可以通过 annotation 上的属性指定 schema info。@PulsarMessageschemaTypespring-doc.cn

以下示例将系统配置为在生成或使用 type 为 messages 的 messages 时使用 JSON 作为默认模式:Foospring-doc.cn

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了这个配置,就不需要设置 specify the schema on send 操作。spring-doc.cn

2.2. 使用 AUTO_SCHEMA 进行生产

如果无法提前知道 Pulsar topic 的 schema 类型,你可以使用 AUTO_PRODUCE schema 安全地发布原始 JSON 或 Avro payload。byte[]spring-doc.cn

在这种情况下,创建者会验证出站字节是否与目标主题的架构兼容。spring-doc.cn

只需在模板上指定 request of 的 send 操作,如以下示例所示:Schema.AUTO_PRODUCE_BYTES()spring-doc.cn

void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
	template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
这仅支持 Avro 和 JSON 架构类型。
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和 KEY_VALUE w/ INLINE 编码。
这是 message 类的完全限定名称。message-type
这仅支持 Avro 和 JSON 架构类型。

3. 响应式 PulsarSenderFactory

这依赖于 a 来实际创建底层发件人。ReactivePulsarTemplateReactivePulsarSenderFactoryspring-doc.cn

Spring Boot 提供了这个 sender factory,可以使用任何 spring.pulsar.producer.* 应用程序属性进行配置。spring-doc.cn

如果在直接使用发送方工厂 API 时未指定主题信息,则使用与 相同的主题解析过程,但省略了“消息类型默认”步骤。ReactivePulsarTemplate

3.1. 生产者缓存

每个底层 Pulsar 生产者都会消耗资源。 为了提高性能并避免持续创建 producer,底层 Apache Pulsar Reactive 客户端会缓存它创建的 producer。 它们以 LRU 方式缓存,并在配置的时间段内未使用时被驱逐。ReactiveMessageSenderCachespring-doc.cn

你可以通过指定任何 spring.pulsar.producer.cache.* 应用程序属性来配置缓存设置。spring-doc.cn

如果在直接使用发送方工厂 API 时未指定主题信息,则使用与 相同的主题解析过程,但省略了“消息类型默认”步骤。ReactivePulsarTemplate