消息制作

1. Pulsar 模板

在 Pulsar 生产者端,Spring Boot 自动配置提供了一个PulsarTemplate用于发布记录。该模板实现了一个名为PulsarOperations并提供通过其 Contract 发布记录的方法。spring-doc.cadn.net.cn

这些发送 API 方法分为两类:sendsendAsync. 这sendmethods 使用 Pulsar producer 上的同步发送功能来阻止调用。 它们返回MessageId的消息。 这sendAsync方法调用是非阻塞的异步调用。 他们返回一个CompletableFuture,可用于在消息发布后异步接收消息 ID。spring-doc.cadn.net.cn

对于不包含 topic 参数的 API 变体,将使用主题解析过程来确定目标主题。

1.1. 简单的 API

该模板为简单的发送请求提供了一些方法(前缀为 'send')。对于更复杂的发送请求,Fluent API 允许您配置更多选项。spring-doc.cadn.net.cn

1.2. Fluent API

该模板提供了一个 Fluent 构建器来处理更复杂的发送请求。spring-doc.cadn.net.cn

1.3. 消息自定义

您可以指定TypedMessageBuilderCustomizer以配置传出消息。例如,以下代码演示如何发送键控消息:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
    .send();

1.4. 生产者定制

您可以指定ProducerBuilderCustomizer配置底层 Pulsar 生产者构建器,最终构建用于发送传出消息的生产者。spring-doc.cadn.net.cn

请谨慎使用,因为这会提供对 producer 构建器的完全访问权限,并调用其某些方法(例如create) 可能会产生意想不到的副作用。

例如,以下代码演示如何禁用批处理和启用分块:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
    .send();

另一个示例显示了在将记录发布到分区主题时如何使用自定义路由。 指定您的自定义MessageRouterProducer构建器,例如:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
    .send();
请注意,当使用MessageRouter,则spring.pulsar.producer.message-routing-modecustom.

这个另一个示例展示了如何添加ProducerInterceptor这将在将消息发布到 broker 之前拦截和更改 producer 收到的消息:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.intercept(interceptor))
    .send();

定制器将仅适用于用于发送作的创建者。 如果要将定制器应用于所有生产者,则必须将它们提供给生产者工厂,如全局生产者定制中所述。spring-doc.cadn.net.cn

使用 Lambda 定制器时,必须遵循“Lambda 定制器注意事项”中描述的规则。

2. 指定 Schema 信息

如果您使用 Java 基元类型,框架会自动为您检测架构,并且您无需指定任何架构类型来发布数据。 对于非原始类型,如果在对PulsarTemplate,Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON从 type.spring-doc.cadn.net.cn

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和 KEY_VALUE w/ INLINE 编码。

2.1. 自定义 Schema 映射

作为在PulsarTemplate对于复杂类型,可以使用类型的映射配置 Schema Resolver。 这样就无需在框架使用传出消息类型咨询解析程序时指定架构。spring-doc.cadn.net.cn

2.1.1. 配置属性

架构映射可以使用spring.pulsar.defaults.type-mappings财产。 以下示例使用application.yml要为UserAddress复杂对象 usingAVROJSONschemas 的 Schemas:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
message-type是 Message 类的完全限定名称。

2.1.2. Schema 解析器定制器

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。spring-doc.cadn.net.cn

以下示例使用架构解析程序定制器为UserAddress复杂对象 usingAVROJSONschemas 的 Schemas:spring-doc.cadn.net.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 类型映射注解

指定要用于特定消息类型的默认架构信息的另一个选项是使用@PulsarMessage注解。 架构信息可以通过schemaType属性。spring-doc.cadn.net.cn

以下示例将系统配置为在生成或使用 JSON 类型的消息时使用 JSON 作为默认架构Foo:spring-doc.cadn.net.cn

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了这个配置,就不需要设置 specify the schema on send作。spring-doc.cadn.net.cn

2.2. 使用 AUTO_SCHEMA 进行生产

如果无法提前知道 Pulsar topic 的 schema 类型,可以使用 AUTO_PRODUCE schema 将原始 JSON 或 Avro payload 发布为byte[]牢牢。spring-doc.cadn.net.cn

在这种情况下,创建者会验证出站字节是否与目标主题的架构兼容。spring-doc.cadn.net.cn

只需指定一个Schema.AUTO_PRODUCE_BYTES()在您的模板 send作中,如下例所示:spring-doc.cadn.net.cn

void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
	template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
这仅支持 Avro 和 JSON 架构类型。

3. Pulsar 生产工厂

PulsarTemplate依赖于PulsarProducerFactory实际创建底层 producer。 Spring Boot 自动配置还提供了这个生产者工厂,你可以通过指定任何spring.pulsar.producer.*应用程序属性。spring-doc.cadn.net.cn

如果直接使用 producer factory API 时未指定 topic 信息,则PulsarTemplate省略“Message type default”步骤的一个例外一起使用。

3.1. 全局 producer 自定义

该框架提供了ProducerBuilderCustomizerContract 的 Contract,它允许您配置用于构建每个生产者的底层构建器。 要自定义所有生产者,您可以将定制器列表传递到PulsarProducerFactory构造 函数。 使用多个定制器时,将按照它们在列表中的显示顺序应用它们。spring-doc.cadn.net.cn

如果使用 Spring Boot 自动配置,则可以将定制器指定为 bean,它们将自动传递给PulsarProducerFactory,根据其@Order注解。

如果您只想将定制器应用于单个创建者,则可以使用 Fluent API 并在发送时指定定制器spring-doc.cadn.net.cn

4. Pulsar 生产者缓存

每个底层 Pulsar 生产者都会消耗资源。 为了提高性能并避免持续创建 Producer,Producer Factory 会缓存它创建的 Producer。 它们以 LRU 方式缓存,并在配置的时间段内未使用时被驱逐。 缓存键由足够的信息组成,以确保在后续创建请求中向调用方返回相同的创建者。spring-doc.cadn.net.cn

此外,您还可以通过指定任何spring.pulsar.producer.cache.*应用程序属性。spring-doc.cadn.net.cn

4.1. 注意 Lambda 定制器

用户提供的任何创建者定制器也包含在缓存键中。 由于缓存键依赖于equals/hashCode,在使用 Lambda 定制器时必须小心。spring-doc.cadn.net.cn

统治:作为 Lambda 实现的两个定制器将在equals/hashCode 当且仅当它们使用相同的 Lambda 实例,并且不需要在其闭包之外定义任何变量。

为了澄清上述规则,我们将看几个例子。 在以下示例中,定制器定义为内联 Lambda,这意味着每次调用sendUser使用相同的 Lambda 实例。此外,它不需要在其闭包之外的变量。因此,它将作为缓存键进行匹配。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName("user"))
        .send();
}

在下一个案例中,定制器被定义为内联 Lambda,这意味着对sendUser使用相同的 Lambda 实例。但是,它需要一个 Variable outside its closure。因此,它不会作为缓存键进行匹配。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    var name = randomName();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName(name))
        .send();
}

在最后一个示例中,定制器被定义为内联 Lambda,这意味着每次调用sendUser使用相同的 Lambda 实例。虽然它确实使用变量名称,但它并不源自其闭包之外,因此作为缓存键进行匹配。 这说明了变量可以在 Lambda 闭包中使用,甚至可以调用静态方法。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> {
           var name = SomeHelper.someStaticMethod();
           b.producerName(name);
        })
        .send();
}
统治:如果您的 Lambda 定制器未定义一次且仅定义一次(在后续调用中使用相同的实例),或者它需要在其闭包之外定义变量,则必须为定制器实现提供有效的equals/hashCode实现。
如果不遵循这些规则,则创建者缓存将始终丢失,并且您的应用程序性能将受到负面影响。

5. 在 Producer 上拦截消息

添加ProducerInterceptor允许您在将 Producer 收到的消息发布到 Broker 之前拦截和更改这些消息。 为此,您可以将拦截器列表传递到PulsarTemplate构造 函数。 当使用多个侦听器时,它们的应用 Sequence 是它们在列表中的显示顺序。spring-doc.cadn.net.cn

如果使用 Spring Boot 自动配置,则可以将拦截器指定为 Beans。 它们会自动传递到PulsarTemplate. 拦截器的排序是通过使用@Order注解如下:spring-doc.cadn.net.cn

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
  ...
}

@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
  ...
}
如果您不使用 Starter,则需要自己配置和注册上述组件。

APP信息