1. Pulsar 模板
在 Pulsar 生产者端,Spring Boot 自动配置提供了一个用于发布记录的功能。该模板实现一个称为的接口,并提供通过其协定发布记录的方法。PulsarTemplate
PulsarOperations
这些发送 API 方法分为两类:和 .
这些方法通过使用 Pulsar producer 上的同步发送功能来阻止调用。
它们返回在消息保留在代理上后发布的消息的 。
方法调用是非阻塞的异步调用。
它们返回一个 ,您可以在发布消息后使用该 ID 异步接收消息 ID。send
sendAsync
send
MessageId
sendAsync
CompletableFuture
对于不包含 topic 参数的 API 变体,将使用主题解析过程来确定目标主题。 |
1.1. 简单的 API
该模板为简单的发送请求提供了一些方法(前缀为 'send')。对于更复杂的发送请求,Fluent API 允许您配置更多选项。
1.2. Fluent API
该模板提供了一个 Fluent 构建器来处理更复杂的发送请求。
1.3. 消息自定义
您可以指定 a 来配置传出消息。例如,以下代码演示如何发送键控消息:TypedMessageBuilderCustomizer
template.newMessage(msg)
.withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
.send();
1.4. 生产者定制
你可以指定 a 来配置底层 Pulsar 生产者构建器,该构建器最终构建用于发送传出消息的生产者。ProducerBuilderCustomizer
请谨慎使用,因为这提供了对 producer 构建器的完全访问权限,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create |
例如,以下代码演示如何禁用批处理和启用分块:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
.send();
另一个示例显示了在将记录发布到分区主题时如何使用自定义路由。
在构建器上指定您的自定义实现,例如:MessageRouter
Producer
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
.send();
请注意,使用 时,唯一有效的设置为 是 。MessageRouter spring.pulsar.producer.message-routing-mode custom |
另一个示例展示了如何添加一个,它将在将生产者接收到的消息发布到代理之前拦截和更改这些消息:ProducerInterceptor
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.intercept(interceptor))
.send();
定制器将仅适用于用于发送操作的创建者。 如果要将定制器应用于所有生产者,则必须将它们提供给生产者工厂,如全局生产者定制中所述。
使用 Lambda 定制器时,必须遵循“Lambda 定制器注意事项”中描述的规则。 |
对于不包含 topic 参数的 API 变体,将使用主题解析过程来确定目标主题。 |
请谨慎使用,因为这提供了对 producer 构建器的完全访问权限,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create |
请注意,使用 时,唯一有效的设置为 是 。MessageRouter spring.pulsar.producer.message-routing-mode custom |
使用 Lambda 定制器时,必须遵循“Lambda 定制器注意事项”中描述的规则。 |
2. 指定 Schema 信息
如果您使用 Java 基元类型,框架会自动为您检测架构,并且您无需指定任何架构类型来发布数据。
对于非原始类型,如果在 上调用 send 操作时未显式指定 Schema ,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。PulsarTemplate
Schema.JSON
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和 KEY_VALUE w/ INLINE 编码。 |
2.1. 自定义 Schema 映射
作为在对 for 复杂类型调用 send 操作时指定架构的替代方法,可以使用类型的映射配置架构解析程序。
这样就无需在框架使用传出消息类型咨询解析程序时指定架构。PulsarTemplate
2.1.1. 配置属性
可以使用属性配置架构映射。
以下示例用于分别使用 和 schema 为 和 complex 对象添加映射:spring.pulsar.defaults.type-mappings
application.yml
User
Address
AVRO
JSON
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这是 message 类的完全限定名称。message-type |
2.1.2. Schema 解析器定制器
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。
以下示例使用架构解析程序定制器分别使用 and schemas 为 和 complex 对象添加映射:User
Address
AVRO
JSON
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.2. 使用 AUTO_SCHEMA 进行生产
如果无法提前知道 Pulsar topic 的 schema 类型,你可以使用 AUTO_PRODUCE schema 安全地发布原始 JSON 或 Avro payload。byte[]
在这种情况下,创建者会验证出站字节是否与目标主题的架构兼容。
只需在模板上指定 request of 的 send 操作,如以下示例所示:Schema.AUTO_PRODUCE_BYTES()
void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
这仅支持 Avro 和 JSON 架构类型。 |
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和 KEY_VALUE w/ INLINE 编码。 |
这是 message 类的完全限定名称。message-type |
这仅支持 Avro 和 JSON 架构类型。 |
3. Pulsar 生产工厂
它依赖于 a 来实际创建底层 producer。
Spring Boot 自动配置还提供了这个 producer 工厂,你可以通过指定任何 spring.pulsar.producer.*
应用程序属性来进一步配置它。PulsarTemplate
PulsarProducerFactory
如果直接使用生产者工厂 API 时未指定 Topic 信息,则使用与 相同的 Topic 解析流程,但省略了 Message type default(消息类型默认)步骤。PulsarTemplate |
3.1. 全局 producer 自定义
该框架提供了合同,允许您配置用于构建每个生产者的底层构建器。
要自定义所有生成者,您可以将定制器列表传递到构造函数中。
使用多个定制器时,将按照它们在列表中的显示顺序应用它们。ProducerBuilderCustomizer
PulsarProducerFactory
如果使用 Spring Boot 自动配置,则可以将定制器指定为 bean,它们将自动传递给 ,根据其 Comments 进行排序。PulsarProducerFactory @Order |
如果您只想将定制器应用于单个创建者,则可以使用 Fluent API 并在发送时指定定制器。
如果直接使用生产者工厂 API 时未指定 Topic 信息,则使用与 相同的 Topic 解析流程,但省略了 Message type default(消息类型默认)步骤。PulsarTemplate |
如果使用 Spring Boot 自动配置,则可以将定制器指定为 bean,它们将自动传递给 ,根据其 Comments 进行排序。PulsarProducerFactory @Order |
4. Pulsar 生产者缓存
每个底层 Pulsar 生产者都会消耗资源。 为了提高性能并避免持续创建 Producer,Producer Factory 会缓存它创建的 Producer。 它们以 LRU 方式缓存,并在配置的时间段内未使用时被驱逐。 缓存键由足够的信息组成,以确保在后续创建请求中向调用方返回相同的创建者。
此外,你可以通过指定任何 spring.pulsar.producer.cache.*
应用程序属性来配置缓存设置。
4.1. 注意 Lambda 定制器
用户提供的任何创建者定制器也包含在缓存键中。
由于缓存键依赖于 的有效实现,因此在使用 Lambda 定制器时必须小心。equals/hashCode
统治:当且仅当两个作为 Lambda 实现的定制器使用相同的 Lambda 实例并且不需要在其闭包之外定义任何变量时,它们才会匹配。equals/hashCode |
为了澄清上述规则,我们将看几个例子。
在以下示例中,定制器定义为内联 Lambda,这意味着每次调用都使用相同的 Lambda 实例。此外,它不需要在其闭包之外的变量。因此,它将作为缓存键进行匹配。sendUser
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName("user"))
.send();
}
在下一个案例中,定制器被定义为内联 Lambda,这意味着每次调用都使用相同的 Lambda 实例。但是,它需要一个 Variable outside its closure。因此,它不会作为缓存键进行匹配。sendUser
void sendUser() {
var user = randomUser();
var name = randomName();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName(name))
.send();
}
在最后一个示例中,定制器定义为内联 Lambda,这意味着对的每个调用都使用相同的 Lambda 实例。虽然它确实使用变量名称,但它并不源自其闭包之外,因此将作为缓存键进行匹配。
这说明了变量可以在 Lambda 闭包中使用,甚至可以调用静态方法。sendUser
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> {
var name = SomeHelper.someStaticMethod();
b.producerName(name);
})
.send();
}
统治:如果您的 Lambda 定制器未定义一次且仅定义一次(在后续调用中使用相同的实例),或者它需要在其闭包之外定义变量,则必须提供具有有效实施的定制器实现。equals/hashCode |
如果不遵循这些规则,则创建者缓存将始终丢失,并且您的应用程序性能将受到负面影响。 |
统治:当且仅当两个作为 Lambda 实现的定制器使用相同的 Lambda 实例并且不需要在其闭包之外定义任何变量时,它们才会匹配。equals/hashCode |
统治:如果您的 Lambda 定制器未定义一次且仅定义一次(在后续调用中使用相同的实例),或者它需要在其闭包之外定义变量,则必须提供具有有效实施的定制器实现。equals/hashCode |
如果不遵循这些规则,则创建者缓存将始终丢失,并且您的应用程序性能将受到负面影响。 |
5. 在 Producer 上拦截消息
添加 a 可以在将生产者收到的消息发布到代理之前拦截和更改这些消息。
为此,您可以将拦截器列表传递到构造函数中。
当使用多个侦听器时,它们的应用 Sequence 是它们在列表中的显示顺序。ProducerInterceptor
PulsarTemplate
如果使用 Spring Boot 自动配置,则可以将拦截器指定为 Beans。
它们会自动传递到 .
拦截器的排序是通过使用 Comments 来实现的,如下所示:PulsarTemplate
@Order
@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
...
}
@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
...
}
如果您不使用 Starter,则需要自己配置和注册上述组件。 |
如果您不使用 Starter,则需要自己配置和注册上述组件。 |