使用 Spring for Apache Pulsar

目录

1. 前言

我们建议对基于 Apache Pulsar 的 Spring 应用程序使用 Spring-Boot-First 方法,因为这极大地简化了事情。 为此,您可以将模块添加为依赖项。spring-pulsar-spring-boot-starter
此参考的大部分内容都希望读者使用 starter,并在考虑这一点的情况下提供大多数配置说明。 但是,当说明特定于 Spring Boot Starters用法时,会努力调用。

2. 快速导览

我们将通过展示一个生成和消费的示例 Spring Boot 应用程序来快速浏览 Spring for Apache Pulsar。 这是一个完整的应用程序,不需要任何其他配置,只要您在默认位置 - 上运行 Pulsar 集群即可。localhost:6650spring-doc.cn

2.1. 依赖项

Spring Boot 应用程序只需要依赖项。下面的清单分别显示了如何定义 Maven 和 Gradle 的依赖关系:spring-pulsar-spring-boot-starterspring-doc.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar</artifactId>
        <version>3.2.12</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-pulsar:3.2.12'
}

使用上述坐标时,更改如下:Version 0.2.xspring-doc.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-boot-starter:0.2.0'
}

2.2. 应用程序代码

下面的清单显示了该示例的 Spring Boot 应用程序案例:spring-doc.cn

@SpringBootApplication
public class PulsarBootHelloWorld {

    public static void main(String[] args) {
        SpringApplication.run(PulsarBootHelloWorld.class, args);
    }

    @Bean
    ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Pulsar World!");
    }

    @PulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
    void listen(String message) {
        System.out.println("Message Received: " + message);
    }
}

让我们快速了解此应用程序的更高级别详细信息。 在文档的后面部分,我们将更详细地了解这些组件。spring-doc.cn

在前面的示例中,我们严重依赖 Spring Boot 自动配置。 Spring Boot 为我们的应用程序自动配置了多个组件。 它自动为应用程序提供 ,生产者和使用者都使用该 。PulsarClientspring-doc.cn

Spring Boot 还自动配置了 ,我们将其注入应用程序中并开始向 Pulsar 主题发送记录。 应用程序将消息发送到名为 的主题。 请注意,应用程序没有指定任何 schema 信息,因为 Spring for Apache Pulsar 库会自动从你发送的数据类型中推断 schema 类型。PulsarTemplatehello-pulsarspring-doc.cn

我们使用注释来使用我们发布数据的主题。 是一个方便的注解,它将消息侦听器容器基础设施包装在 Spring for Apache Pulsar 中。 在幕后,它创建了一个消息侦听器容器来创建和管理 Pulsar 消费者。 与常规 Pulsar consumer 一样,使用时的默认订阅类型是 mode。 当记录发布到主题时,将使用记录并在控制台上打印它们。 在这种情况下,框架还会从该方法用作有效负载的数据类型 — 推断所使用的架构类型。PulsarListenerhello-pulsarPulsarListenerPulsarListenerExclusivehello-pulsarPulsarlistenerPulsarListnerStringspring-doc.cn

3. Pulsar 客户端

当您使用 Pulsar Spring Boot Starter 时,您将获得自动配置。PulsarClientspring-doc.cn

默认情况下,应用程序会尝试连接到 位于 的本地 Pulsar 实例。 这可以通过将属性设置为其他值来调整。pulsar://localhost:6650spring.pulsar.client.service-urlspring-doc.cn

该值必须是有效的 Pulsar 协议 URL

您可以通过指定任何 spring.pulsar.client.* 应用程序属性来进一步配置客户端。spring-doc.cn

如果您不使用 Starter,则需要自行配置和注册。 有一个接受构建器定制器的 that,可用于帮助实现此目的。PulsarClientDefaultPulsarClientFactory

3.1. TLS 加密 (SSL)

默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务通信。 以下部分介绍如何配置 Pulsar 客户端以使用 TLS 加密 (SSL)。 先决条件是 Broker 也已配置为使用 TLS 加密。spring-doc.cn

Spring Boot 自动配置当前不支持任何 TLS/SSL 配置属性。 相反,你可以提供一个在 Pulsar 客户端构建器上设置必要属性的 a。 Pulsar 支持隐私增强邮件 (PEM) 和 Java 密钥库 (JKS) 证书格式。PulsarClientBuilderCustomizerspring-doc.cn

请按照以下步骤配置 TLS:spring-doc.cn

  1. 调整 Pulsar 客户端服务 url 以使用 scheme 和 TLS 端口(通常为 )。pulsar+ssl://6651spring-doc.cn

  2. 调整 admin 客户端服务 url 以使用方案和 TLS Web 端口(通常为 )。https://8443spring-doc.cn

  3. 提供客户端生成器定制器,用于在生成器上设置相关属性。spring-doc.cn

您可以在 Pulsar TLS 加密官方文档中找到有关上述内容的更多信息。spring-doc.cn

3.2. 身份验证

要连接到需要身份验证的 Pulsar 集群,您需要指定要使用的身份验证插件以及指定插件所需的任何参数。 使用 Spring Boot 自动配置时,您可以通过配置属性设置插件和插件参数(在大多数情况下)。spring-doc.cn

您需要确保 下定义的名称与 auth 插件(通常是驼峰式大小写)所期望的名称完全匹配。 Spring Boot 不会尝试对这些条目进行任何类型的松散绑定。spring.pulsar.client.authentication.param.*spring-doc.cn

例如,如果要为 auth 插件配置颁发者 URL,则必须使用 。 如果使用其他形式(如 或 ),则设置将不会应用于插件。AuthenticationOAuth2spring.pulsar.client.authentication.param.issuerUrlissuerurlissuer-urlspring-doc.cn

对 auth 参数使用环境变量通常是有问题的,因为在转换过程中会丢失区分大小写。 例如,考虑通过环境变量设置的以下 auth 参数:issuerUrlspring-doc.cn

SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL=https://some.server.com

当 Spring Boot 加载此属性时,它将使用(小写)而不是预期的(驼峰式)。 您可以通过使用 env var 的值作为 application.yml 中相关 auth 属性的值来绕过此限制。 继续上面的示例:issuerurlissuerUrlspring-doc.cn

spring:
  pulsar:
    client:
      authentication:
        param:
          issuerUrl: ${SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL}

不使用 Spring Boot 自动配置时,你可以使用 来创建身份验证,然后直接在你提供给客户端工厂的客户端定制器中的 Pulsar 客户端构建器上进行设置。org.apache.pulsar.client.api.AuthenticationFactoryspring-doc.cn

以下列表显示了如何配置每个受支持的身份验证机制。spring-doc.cn

单击此处了解 Athenz
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationAthenz
        param:
          tenantDomain: ...
          tenantService: ...
          providerDomain: ...
          privateKey: ...
          keyId: ...
这也需要 TLS 加密
单击此处获取令牌
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
        param:
          token: some-token-goes-here
单击此处查看 Basic
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationBasic
        param:
          userId: ...
          password: ...
单击此处获取 OAuth2
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: ...
          privateKey: ...
          audience: ...
          scope: ...
点击这里查看 Sasl
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationSasl
        param:
          saslJaasClientSectionName: ...
          serverType: ...
单击此处获取 mTLS (PEM)
由于此选项需要 TLS 加密,而这已经要求您提供客户端生成器定制器,因此建议直接在提供的 TLS 定制器中的客户端生成器上添加身份验证。 您可以使用 来帮助创建身份验证对象,如下所示:org.apache.pulsar.client.api.AuthenticationFactory
Authentication auth = AuthenticationFactory.TLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem");

请参阅有关 mTLS (PEM) 的 Pulsar 官方文档。spring-doc.cn

单击此处获取 mTLS (JKS)
由于此选项需要 TLS 加密,而这已经要求您提供客户端生成器定制器,因此建议直接在提供的 TLS 定制器中的客户端生成器上添加身份验证。 您可以使用 来帮助创建身份验证对象,如下所示:org.apache.pulsar.client.api.AuthenticationFactory
Authentication auth = AuthenticationFactory.create(
        "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
        Map.of("keyStoreType", "JKS", "keyStorePath", "/path/to/my/keystore.jks", "keyStorePassword", "clientpw"));

请参阅有关 mTLS (JKS) 的 Pulsar 官方文档。spring-doc.cn

您可以在官方 Pulsar 安全文档中找到有关每个支持插件及其所需属性的更多信息。spring-doc.cn

4. 消息制作

4.1. Pulsar 模板

在 Pulsar 生产者端,Spring Boot 自动配置提供了一个用于发布记录的功能。该模板实现一个称为的接口,并提供通过其协定发布记录的方法。PulsarTemplatePulsarOperationsspring-doc.cn

这些发送 API 方法分为两类:和 . 这些方法通过使用 Pulsar producer 上的同步发送功能来阻止调用。 它们返回在消息保留在代理上后发布的消息的 。 方法调用是非阻塞的异步调用。 它们返回一个 ,您可以在发布消息后使用该 ID 异步接收消息 ID。sendsendAsyncsendMessageIdsendAsyncCompletableFuturespring-doc.cn

对于不包含 topic 参数的 API 变体,将使用主题解析过程来确定目标主题。

4.1.1. 简单的 API

该模板为简单的发送请求提供了一些方法(前缀为 'send')。对于更复杂的发送请求,Fluent API 允许您配置更多选项。spring-doc.cn

4.1.2. Fluent API

该模板提供了一个 Fluent 构建器来处理更复杂的发送请求。spring-doc.cn

4.1.3. 消息自定义

您可以指定 a 来配置传出消息。例如,以下代码演示如何发送键控消息:TypedMessageBuilderCustomizerspring-doc.cn

template.newMessage(msg)
    .withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
    .send();

4.1.4. 生产者自定义

你可以指定 a 来配置底层 Pulsar 生产者构建器,该构建器最终构建用于发送传出消息的生产者。ProducerBuilderCustomizerspring-doc.cn

请谨慎使用,因为这提供了对 producer 构建器的完全访问权限,并且调用其某些方法(例如 )可能会产生意想不到的副作用。create

例如,以下代码演示如何禁用批处理和启用分块:spring-doc.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
    .send();

另一个示例显示了在将记录发布到分区主题时如何使用自定义路由。 在构建器上指定您的自定义实现,例如:MessageRouterProducerspring-doc.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
    .send();
请注意,使用 时,唯一有效的设置为 是 。MessageRouterspring.pulsar.producer.message-routing-modecustom

另一个示例展示了如何添加一个,它将在将生产者接收到的消息发布到代理之前拦截和更改这些消息:ProducerInterceptorspring-doc.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.intercept(interceptor))
    .send();

定制器将仅适用于用于发送操作的创建者。 如果要将定制器应用于所有生产者,则必须将它们提供给生产者工厂,如全局生产者定制中所述。spring-doc.cn

使用 Lambda 定制器时,必须遵循“Lambda 定制器注意事项”中描述的规则。

4.2. 指定 Schema 信息

如果您使用 Java 基元类型,框架会自动为您检测架构,并且您无需指定任何架构类型来发布数据。 对于非原始类型,如果在 上调用 send 操作时未显式指定 Schema ,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。PulsarTemplateSchema.JSONspring-doc.cn

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。

4.2.1. 自定义 Schema 映射

作为在对 for 复杂类型调用 send 操作时指定架构的替代方法,可以使用类型的映射配置架构解析程序。 这样就无需在框架使用传出消息类型咨询解析程序时指定架构。PulsarTemplatespring-doc.cn

可以使用属性配置架构映射。 以下示例用于分别使用 和 schema 为 和 complex 对象添加映射:spring.pulsar.defaults.type-mappingsapplication.ymlUserAddressAVROJSONspring-doc.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
这是 message 类的完全限定名称。message-type

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。spring-doc.cn

以下示例使用架构解析程序定制器分别使用 and schemas 为 和 complex 对象添加映射:UserAddressAVROJSONspring-doc.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

有了这个配置,就不需要设置 specify the schema on send 操作。spring-doc.cn

4.3. Pulsar Producer 工厂

它依赖于 a 来实际创建底层 producer。 Spring Boot 自动配置还提供了这个 producer 工厂,你可以通过指定任何 spring.pulsar.producer.* 应用程序属性来进一步配置它。PulsarTemplatePulsarProducerFactoryspring-doc.cn

如果直接使用生产者工厂 API 时未指定 Topic 信息,则使用与 相同的 Topic 解析流程,但省略了 Message type default(消息类型默认)步骤。PulsarTemplate

4.3.1. 全局 producer 自定义

该框架提供了合同,允许您配置用于构建每个生产者的底层构建器。 要自定义所有生成者,您可以将定制器列表传递到构造函数中。 使用多个定制器时,将按照它们在列表中的显示顺序应用它们。ProducerBuilderCustomizerPulsarProducerFactoryspring-doc.cn

如果使用 Spring Boot 自动配置,则可以将定制器指定为 bean,它们将自动传递给 ,根据其 Comments 进行排序。PulsarProducerFactory@Order

如果您只想将定制器应用于单个创建者,则可以使用 Fluent API 并在发送时指定定制器spring-doc.cn

4.4. Pulsar 生产者缓存

每个底层 Pulsar 生产者都会消耗资源。 为了提高性能并避免持续创建 Producer,Producer Factory 会缓存它创建的 Producer。 它们以 LRU 方式缓存,并在配置的时间段内未使用时被驱逐。 缓存键由足够的信息组成,以确保在后续创建请求中向调用方返回相同的创建者。spring-doc.cn

此外,你可以通过指定任何 spring.pulsar.producer.cache.* 应用程序属性来配置缓存设置。spring-doc.cn

4.4.1. 对 Lambda 定制器的注意事项

用户提供的任何创建者定制器也包含在缓存键中。 由于缓存键依赖于 的有效实现,因此在使用 Lambda 定制器时必须小心。equals/hashCodespring-doc.cn

统治:且仅当两个作为 Lambda 实现的定制器使用相同的 Lambda 实例并且不需要在其闭包之外定义任何变量时,它们才会匹配。equals/hashCode

为了澄清上述规则,我们将看几个例子。 在以下示例中,定制器定义为内联 Lambda,这意味着每次调用都使用相同的 Lambda 实例。此外,它不需要在其闭包之外的变量。因此,它将作为缓存键进行匹配。sendUserspring-doc.cn

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName("user"))
        .send();
}

在下一个案例中,定制器被定义为内联 Lambda,这意味着每次调用都使用相同的 Lambda 实例。但是,它需要一个 Variable outside its closure。因此,它不会作为缓存键进行匹配。sendUserspring-doc.cn

void sendUser() {
    var user = randomUser();
    var name = randomName();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName(name))
        .send();
}

在最后一个示例中,定制器定义为内联 Lambda,这意味着对的每个调用都使用相同的 Lambda 实例。虽然它确实使用变量名称,但它并不源自其闭包之外,因此作为缓存键进行匹配。 这说明了变量可以在 Lambda 闭包中使用,甚至可以调用静态方法。sendUserspring-doc.cn

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> {
           var name = SomeHelper.someStaticMethod();
           b.producerName(name);
        })
        .send();
}
统治:如果您的 Lambda 定制器未定义一次且仅定义一次(在后续调用中使用相同的实例),或者它需要在其闭包之外定义变量,则必须提供具有有效实施的定制器实现。equals/hashCode
如果不遵循这些规则,则创建者缓存将始终丢失,并且您的应用程序性能将受到负面影响。

4.5. 在 Producer 上拦截消息

添加 a 可以在将生产者收到的消息发布到代理之前拦截和更改这些消息。 为此,您可以将拦截器列表传递到构造函数中。 当使用多个侦听器时,它们的应用 Sequence 是它们在列表中的显示顺序。ProducerInterceptorPulsarTemplatespring-doc.cn

如果使用 Spring Boot 自动配置,则可以将拦截器指定为 Beans。 它们会自动传递到 . 拦截器的排序是通过使用 Comments 来实现的,如下所示:PulsarTemplate@Orderspring-doc.cn

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
  ...
}

@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
  ...
}
如果您不使用 Starter,则需要自己配置和注册上述组件。

5. 消息消费

5.1. Pulsar 监听器

对于 Pulsar 消费者,我们建议最终用户应用程序使用 Annotation。 要使用 ,您需要使用注释。 当你使用 Spring Boot 支持时,它会自动启用此 Comments 并配置所有必要的组件,例如消息侦听器基础设施(负责创建 Pulsar 消费者)。 使用 a 创建和管理 Pulsar 消费者,即用于消费消息的底层 Pulsar 消费者。PulsarListenerPulsarListener@EnablePulsarPulsarListenerPulsarMessageListenerContainerPulsarConsumerFactoryspring-doc.cn

Spring Boot 提供了这个消费者工厂,你可以通过指定 spring.pulsar.consumer.* 应用程序属性来进一步配置它。工厂上的大多数已配置属性将在侦听器中得到遵守,但以下情况除外spring-doc.cn

该属性将被忽略,而是在未在 annotation 上指定时生成。spring.pulsar.consumer.subscription.name
该属性将被忽略,而是从 Comments 上的值中获取。但是,您可以将 on the annotation 设置为改用 property 值作为默认值。spring.pulsar.consumer.subscription-typesubscriptionType = {}

让我们重新访问一下我们在快速浏览部分看到的代码片段:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

您可以进一步简化此方法:spring-doc.cn

@PulsarListener
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

在这种最基本的形式中,当 annotation 上未提供 the 时,将使用自动生成的订阅名称。 同样,如果未直接提供 ,则使用主题解析过程来确定目标主题。subscriptionName@PulsarListenertopicsspring-doc.cn

在前面显示的方法中,我们以 形式接收数据,但我们没有指定任何架构类型。 在内部,该框架依赖于 Pulsar 的 schema 机制将数据转换为所需的类型。 框架会检测到您期望的类型,然后根据该信息推断架构类型。 然后,它将该架构提供给使用者。 对于 Java 中的所有原始类型,框架都会执行此推理。 对于任何复杂类型(例如 JSON、AVRO 等),框架都无法执行此推理,用户需要使用属性在注释上提供架构类型。PulsarListenerStringStringschemaTypespring-doc.cn

以下示例显示了另一种方法,该方法采用 :PulsarListenerIntegerspring-doc.cn

@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
   System.out.println(message);
}

以下方法展示了我们如何从主题中使用复杂类型:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
    System.out.println(message);
}

请注意在 上添加了一个属性。 这是因为该库无法从提供的类型推断架构类型: 。我们必须告诉框架要使用什么 schema。schemaTypePulsarListenerFoospring-doc.cn

让我们看看更多方法。spring-doc.cn

你可以直接消费 Pulsar 消息:spring-doc.cn

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
}

下面的示例使用 Spring 消息传递信封来使用记录:spring-doc.cn

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
}

现在让我们看看如何批量使用记录。 以下示例用于批量使用记录作为 POJO:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

请注意,在此示例中,我们将记录作为对象的集合 () 接收。 此外,要在级别启用批量使用,您需要将注释上的属性设置为 。ListPulsarListenerbatchtruespring-doc.cn

根据持有的实际类型,框架会尝试推断要使用的架构。 如果 包含 complex 类型,您仍需要提供 on .ListListschemaTypePulsarListenerspring-doc.cn

下面使用 Pulsar Java 客户端提供的 envelope:Messagespring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

以下示例使用具有 Spring 消息传递类型的信封的批处理记录:Messagespring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}

最后,你也可以将 Pulsar 的 holder 对象用于批处理监听器:Messagesspring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

当你使用 时,你可以直接在注解本身上提供 Pulsar 消费者属性。 如果您不想使用前面提到的 Boot 配置属性或具有多种方法,这将非常方便。PulsarListenerPulsarListenerspring-doc.cn

以下示例直接在 上使用 Pulsar 消费者属性:PulsarListenerspring-doc.cn

@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用的属性是直接的 Pulsar 消费者属性,而不是应用程序配置属性spring.pulsar.consumer

5.1.1. 自定义 ConsumerBuilder

您可以通过使用 a 自定义任何可用的字段,方法是提供 of 类型,然后将其提供给 ,如下所示。ConsumerBuilderPulsarListenerConsumerBuilderCustomizer@BeanPulsarListenerConsumerBuilderCustomizerPulsarListenerspring-doc.cn

@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只注册了一个 bean 和一个 bean,则将自动应用定制器。@PulsarListenerPulsarListenerConsumerBuilderCustomizer

5.2. 指定 Schema 信息

如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在 . 对于非原始类型,如果未在注解上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。PulsarListenerSchema.JSONspring-doc.cn

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF 和采用 INLINE 编码的 KEY_VALUE。

5.2.1. 自定义 Schema 映射

作为在 for complex types 上指定架构的替代方法,可以使用类型的映射配置架构解析程序。 这样就无需在侦听器上设置 schema,因为框架使用传入消息类型咨询解析程序。PulsarListenerspring-doc.cn

可以使用属性配置架构映射。 以下示例用于分别使用 和 schema 为 和 complex 对象添加映射:spring.pulsar.defaults.type-mappingsapplication.ymlUserAddressAVROJSONspring-doc.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
这是 message 类的完全限定名称。message-type

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。spring-doc.cn

以下示例使用架构解析程序定制器分别使用 and schemas 为 和 complex 对象添加映射:UserAddressAVROJSONspring-doc.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

有了这个配置,就不需要在侦听器上设置 schema,例如:spring-doc.cn

@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
    System.out.println(user);
}

5.3. 访问 Pulsar Consumer 对象

有时,你需要直接访问 Pulsar Consumer 对象。 以下示例显示了如何获取它:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
    System.out.println("Message Received: " + message);
    ConsumerStats stats = consumer.getStats();
    ...
}
以这种方式访问对象时,请勿调用任何会通过调用任何 receive 方法更改 Consumer 光标位置的操作。 所有此类操作都必须由容器完成。Consumer

5.4. Pulsar 消息监听器容器

现在我们已经看到了消费者端的基本交互。现在让我们深入了解一下如何与底层 Pulsar 消费者交互的内部工作原理。 请记住,对于最终用户应用程序,在大多数情况下,我们建议在使用 Spring for Apache Pulsar 时直接使用 Pulsar 主题中的注释来使用,因为该模型涵盖了广泛的应用程序用例。 但是,了解内部的工作原理很重要。本节将介绍这些详细信息。PulsarListenerPulsarListenerPulsarListenerPulsarListenerspring-doc.cn

如前所述,当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息消费的核心。 在幕后使用消息侦听器容器基础设施来创建和管理 Pulsar 消费者。 Spring for Apache Pulsar 通过 提供此消息侦听器容器的 contract。 此消息侦听器容器的默认实现通过 提供。 顾名思义,包含消息侦听器。 容器创建 Pulsar 消费者,然后运行一个单独的线程来接收和处理数据。 数据由提供的消息侦听器实现处理。PulsarListenerPulsarMessageListenerContainerDefaultPulsarMessageListenerContainerPulsarMessageListenerContainerspring-doc.cn

消息侦听器容器使用使用者的方法批量消费数据。 收到数据后,数据将移交给选定的消息侦听器实现。batchReceivespring-doc.cn

使用 Spring for Apache Pulsar 时,可以使用以下消息侦听器类型。spring-doc.cn

我们将在以下部分中看到有关这些不同消息侦听器的详细信息。spring-doc.cn

但是,在这样做之前,让我们仔细看看容器本身。spring-doc.cn

5.4.1. 默认 PulsarMessageListenerContainer

这是一个基于使用者的消息侦听器容器。 下面的清单显示了它的构造函数:spring-doc.cn

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
			PulsarContainerProperties pulsarContainerProperties)
}

它接收一个 (用于创建使用者) 和一个对象 (包含有关容器属性的信息)。 具有以下构造函数:PulsarConsumerFactoryPulsarContainerPropertiesPulsarContainerPropertiesspring-doc.cn

public PulsarContainerProperties(String... topics)

public PulsarContainerProperties(Pattern topicPattern)

您可以通过提供给 Consumer Factory 的 Consumer 属性提供主题信息,也可以将其作为 Consumer Property 提供。 以下示例使用 :PulsarContainerPropertiesDefaultPulsarMessageListenerContainerspring-doc.cn

Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();

pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
		});

DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
        pulsarContainerProperties);

return pulsarListenerContainer;
如果在直接使用侦听器容器时未指定主题信息,则使用与 相同的主题解析过程,但省略了 “Message type default” 步骤。PulsarListener

DefaultPulsarMessageListenerContainer仅创建一个使用者。 如果要通过多个线程管理多个使用者,则需要使用 .ConcurrentPulsarMessageListenerContainerspring-doc.cn

5.4.2. ConcurrentPulsarMessageListenerContainer

ConcurrentPulsarMessageListenerContainer具有以下构造函数:spring-doc.cn

public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
    PulsarContainerProperties pulsarContainerProperties)

ConcurrentPulsarMessageListenerContainer允许您通过 setter 指定属性。 仅在非独占订阅 (, , 和 ) 上允许多于的并发数。 只有在您具有独占订阅模式时,才能使用默认并发。concurrency1failoversharedkey-shared1spring-doc.cn

以下示例为订阅启用 through 注释。concurrencyPulsarListenerfailoverspring-doc.cn

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
    ...
    System.out.println("Current Thread: " + Thread.currentThread().getName());
    System.out.println("Current Consumer: " + consumer.getConsumerName());
}

在前面的监听器中,假设 topic 有 3 个 partition。 如果它是未分区的主题,则将并发设置为 does no (并发性) 不会执行任何操作。除了主要的活跃使用者之外,您还会获得两个空闲使用者。 如果主题具有三个以上的分区,则消息将在容器创建的使用者之间进行负载均衡。 如果运行此 ,则会看到来自不同分区的消息通过不同的使用者使用,如前面示例中的线程名称和使用者名称打印输出所暗示的那样。my-topic3PulsarListenerspring-doc.cn

当你在分区主题上以这种方式使用订阅时,Pulsar 保证消息排序。Failover

下面的清单显示了 的另一个示例,但具有 subscription 和 enabled。PulsarListenerSharedconcurrencyspring-doc.cn

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
    ...
}

在前面的示例中,创建了 5 个不同的使用者(这一次,我们假设该主题有 5 个分区)。PulsarListenerspring-doc.cn

在这个版本中,没有消息排序,因为订阅不保证 Pulsar 中的任何消息排序。Shared

如果您需要消息排序,但仍需要共享订阅类型,则需要使用订阅类型。Key_Sharedspring-doc.cn

5.4.3. 消息消费

让我们看看消息侦听器容器如何实现基于单记录和基于批处理的消息使用。spring-doc.cn

单记录消耗

为了这次讨论,让我们重新审视一下我们的基本知识:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

使用这种方法,我们必须要求 Spring for Apache Pulsar 每次都使用一条记录调用监听器方法。 我们提到了消息侦听器容器使用使用者上的方法分批使用数据。 框架检测到 ,在本例中,接收到一条记录。这意味着,在每次调用该方法时,它都需要一个 singe 记录。 尽管消息侦听器容器会批量使用记录,但它会遍历收到的批处理,并通过适配器调用侦听器方法。 正如你在上一节中看到的,它继承自 Pulsar Java 客户端提供的,它支持基本方法。PulsarListenerbatchReceivePulsarListenerPulsarRecordMessageListenerPulsarRecordMessageListenerMessageListenerreceivedspring-doc.cn

批量消耗

以下示例显示了批量消费记录:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

当您使用这种类型的 时,框架会检测到您处于批处理模式。 由于它已经使用 Consumer 的方法分批接收了数据,因此它通过 的适配器将整个批处理移交给侦听器方法。PulsarListenerbatchReceivePulsarBatchMessageListenerspring-doc.cn

5.5. Pulsar 头文件

Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头的列表可以在 PulsarHeaders.java 中找到。spring-doc.cn

5.5.1. 在基于单记录的 Consumer 中访问

以下示例展示了如何在使用单记录消费模式的应用程序中访问各种 Pulsar Headers:spring-doc.cn

@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
                @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
                @Header("foo") String foo) {

}

在前面的示例中,我们访问 和 message 元数据的值以及名为 . Spring 注解用于每个 header 字段。messageIdrawDatafoo@Headerspring-doc.cn

你也可以使用 Pulsar 的 作为 envelope 来携带有效载荷。 这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。 但是,为方便起见,您也可以使用 Comments 来检索它。 请注意,您还可以使用 Spring 消息传递信封来携带有效负载,然后使用 检索 Pulsar 标头。MessageHeaderMessage@Headerspring-doc.cn

5.5.2. 基于批量记录的 Consumer 访问

在本节中,我们将了解如何在使用批处理 consumer 的应用程序中访问各种 Pulsar Headers:spring-doc.cn

@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
					@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
					@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {

}

在前面的示例中,我们将数据作为 . 在提取各种 headers 时,我们也作为 a 执行此操作。 Spring for Apache Pulsar 确保 header 列表与 data 列表相对应。List<String>List<>spring-doc.cn

当您使用批处理侦听器并接收负载时,您还可以以相同的方式提取标头,并将有效负载接收为 、 、 或 。List<org.apache.pulsar.client.api.Message<?>org.apache.pulsar.client.api.Messages<?>org.springframework.messaging.Messsge<?>spring-doc.cn

5.6. 消息确认

当您使用 Spring for Apache Pulsar 时,消息确认由框架处理,除非应用程序选择退出。 在本节中,我们将详细介绍框架如何处理消息确认。spring-doc.cn

5.6.1. 消息 ACK 模式

Spring for Apache Pulsar 提供了以下确认消息的模式:spring-doc.cn

BATCH确认模式是默认模式,但您可以在 Message Listener Container (消息侦听器) 容器上更改它。 在以下部分中,我们将了解当您同时使用 的单个和批处理版本时,确认是如何工作的,以及它们如何转换为后备消息侦听器容器(并最终转换为 Pulsar 消费者)。PulsarListenerspring-doc.cn

5.6.2. 单记录模式下的自动消息确认

让我们重新审视一下基本的基于 Single Message 的 :PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

很自然地想知道,当你使用 时,确认是如何工作的,特别是如果你熟悉直接使用 Pulsar consumer。 答案归结为消息侦听器容器,因为这是 Spring for Apache Pulsar 中协调所有与消费者相关的活动的中心位置。PulsarListenerspring-doc.cn

假设您没有覆盖默认行为,那么当您使用上述 :PulsarListenerspring-doc.cn

  1. 首先,侦听器容器从 Pulsar 消费者批量接收消息。spring-doc.cn

  2. 收到的消息一次传递给一条消息。PulsarListenerspring-doc.cn

  3. 当所有记录都传递给侦听器方法并成功处理时,容器将确认来自原始批处理的所有消息。spring-doc.cn

这是正常流程。如果原始批处理中的任何记录引发异常, Spring for Apache Pulsar 会单独跟踪这些记录。 当批处理中的所有记录都被处理完时, Spring for Apache Pulsar 会确认所有成功的消息,并否定地确认 (nack) 所有失败的消息。 换句话说,当使用 using 使用默认的 ack 模式时,框架会等待从调用收到的所有记录都处理成功,然后在 Pulsar consumer 上调用该方法。 如果任何特定记录在调用处理程序方法时抛出异常,Spring for Apache Pulsar 会跟踪这些记录,并在处理整个批处理后单独调用这些记录。PulsarRecordMessageListenerBATCHbatchReceiveacknowledgenegativeAcknowledgespring-doc.cn

如果应用程序希望每条记录发生确认或否定确认,则可以启用 ack 模式。 在这种情况下,在处理每条记录后,如果没有错误,则确认消息,如果有错误,则否定确认消息。 以下示例在 Pulsar 监听器上启用 ack 模式:RECORDRECORDspring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

5.6.3. 单记录模式下的手动消息确认

您可能并不总是希望框架发送确认,而是直接从应用程序本身发送。 Spring for Apache Pulsar 提供了几种启用手动消息确认的方法。以下示例显示了其中之一:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
    System.out.println("Message Received: " + message.getValue());
	acknowledgment.acknowledge();
}

有几件事值得在这里解释。首先,我们通过设置 来启用手动确认模式 。 启用手动 ack 模式时, Spring for Apache Pulsar 允许应用程序注入对象。 该框架通过选择兼容的消息侦听器容器来实现此目的:用于基于单个记录的消费,它允许您访问对象。ackModePulsarListenerAcknowledgmentPulsarAcknowledgingMessageListenerAcknowledgmentspring-doc.cn

该对象提供以下 API 方法:Acknowledgmentspring-doc.cn

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

你可以使用 ack 模式将此对象注入到你的 while 中,然后调用相应的方法之一。AcknowledgmentPulsarListenerMANUALspring-doc.cn

在前面的示例中,我们调用了无参数方法。 这是因为框架知道它当前在哪个下运行。 调用 时,您不需要使用 enveloper' 接收有效负载,而是使用目标类型 — ,在本例中。 您还可以通过提供消息 ID 来调用不同的变体:使用 时,必须使用信封接收有效负载。PulsarListeneracknowledgeMessageacknowledge()MessageStringacknowledgeacknowledge.acknowledge(message.getMessageId());acknowledge(messageId)Message<?>spring-doc.cn

与可能的确认类似,API 也提供了用于否定确认的选项。 请参阅前面显示的 nack 方法。Acknowledgmentspring-doc.cn

您也可以直接在 Pulsar 消费者上调用:acknowledgespring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
    System.out.println("Message Received: " + message.getValue());
	try {
		consumer.acknowledge(message);
	}
	catch (Exception e) {
		....
	}
}

直接调用底层消费者时,需要自己做错误处理。 使用 the 不需要这样做,因为框架可以为你执行此操作。 因此,在使用手动确认时,应使用 object 方法。acknowledgeAcknowledgmentAcknowledgmentspring-doc.cn

使用手动确认时,请务必了解框架完全不执行任何确认。 因此,在设计应用程序时考虑正确的确认策略非常重要。

5.6.4. 批量消费时自动确认消息

当您批量使用记录(请参阅“消息 ACK 模式”)并使用默认的 ack 模式时,当成功处理整个批次时,将确认整个批次。 如果任何记录引发异常,则对整个批处理进行否定确认。 请注意,这可能不是在生产者端批处理的同一批次。相反,这是从调用使用者返回的批处理BATCHbatchReceivespring-doc.cn

请考虑以下批处理侦听器:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
    for (Foo foo : messages) {
		...
    }
}

当处理传入集合(在本例中)中的所有消息时,框架会确认所有消息。messagesspring-doc.cn

在批处理模式下使用时,不允许的 ack 模式。 这可能会导致问题,因为应用程序可能不希望再次重新交付整个批处理。 在这种情况下,您需要使用确认模式。RECORDMANUALspring-doc.cn

5.6.5. 批量消费时手动消息确认

如上一节所示,当在消息侦听器容器上设置 ack 模式时,框架不会执行任何确认,无论是肯定的还是否定的。 处理此类问题完全取决于应用程序。 设置 ack 模式后, Spring for Apache Pulsar 会选择一个兼容的消息侦听器容器:用于批量消费,这样您就可以访问对象。 以下是 API 中可用的方法:MANUALMANUALPulsarBatchAcknowledgingMessageListenerAcknowledgmentAcknowledgmentspring-doc.cn

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

你可以在使用 ack 模式时将此对象注入到你的 state 中。 下面的清单显示了基于批处理的侦听器的基本示例:AcknowledgmentPulsarListenerMANUALspring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
    for (Message<String> message : messages) {
		try {
			...
	        acknowledgment.acknowledge(message.getMessageId());
		}
		catch (Exception e) {
			acknowledgment.nack(message.getMessageId());
		}
    }
}

使用批处理侦听器时,消息侦听器容器无法知道它当前正在处理哪个记录。 因此,要手动确认,您需要使用采用 a 或 a 的重载方法之一。 您还可以使用 for the batch listener 进行否定确认。acknowledgeMessageIdList<MessageId>MessageIdspring-doc.cn

5.7. 消息重新投递和错误处理

现在我们已经了解了消息侦听器容器基础设施及其各种功能,现在让我们尝试了解消息重新传递和错误处理。 Apache Pulsar 提供了各种用于消息重新传递和错误处理的原生策略。我们来看看它们,看看如何通过 Spring for Apache Pulsar 使用它们。PulsarListenerspring-doc.cn

5.7.1. 为消息重新投递指定 Acknowledgment Timeout(确认超时)

默认情况下,除非 Consumer 崩溃,否则 Pulsar Consumer 不会重新传递消息,但你可以通过在 Pulsar Consumer 上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 消费者在该超时时间内没有确认消息,则重新传递该消息。spring-doc.cn

当你使用 Spring for Apache Pulsar 时,你可以通过消费者定制器或使用 attribute 中的原生 Pulsar 属性来设置此属性:ackTimeoutproperties@PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"ackTimeout=60s"})
public void listen(String s) {
    ...
}

当你指定 ack 超时时,如果 Consumer 在 60 秒内没有发送确认,则 Pulsar 会将消息重新投递给 Consumer。spring-doc.cn

如果要为具有不同延迟的 ack timeout 指定一些高级回退选项,可以执行以下操作:spring-doc.cn

@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {

    @PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
            topics = "withAckTimeoutRedeliveryBackoff-test-topic",
            ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
            properties = { "ackTimeout=60s" })
    void listen(String msg) {
        // some long-running process that may cause an ack timeout
    }

    @Bean
    RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

在前面的示例中,我们为 Pulsar 指定了一个 bean,最小延迟为 1 秒,最大延迟为 10 秒,回退乘数为 2。 在发生初始 ack 超时后,通过此回退 Bean 控制消息重新传递。 在本例中,我们通过将属性设置为实际的 bean 名称 — 来为 Comments 提供回退 bean。RedeliveryBackoffPulsarListenerackTimeoutRedeliveryBackoffackTimeoutRedeliveryBackoffspring-doc.cn

5.7.2. 指定否定确认重新传递

当否定确认时,Pulsar consumer 允许你指定应用程序希望如何重新传递消息。 默认是在一分钟内重新传递消息,但你可以通过消费者定制器或使用 attribute 中的原生 Pulsar 属性来更改它:negativeAckRedeliveryDelayproperties@PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
    ...
}

你还可以通过提供 bean 并将 bean 名称作为 PulsarProducer 上的属性来指定不同的延迟和退避机制,如下所示:RedeliveryBackoffnegativeAckRedeliveryBackoffspring-doc.cn

@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {

    @PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
            topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
            subscriptionType = SubscriptionType.Shared)
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @Bean
    RedeliveryBackoff redeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

5.7.3. 使用 Apache Pulsar 的 Dead Letter Topic 进行消息重投和错误处理

Apache Pulsar 允许应用程序在具有订阅类型的消费者上使用死信主题。 对于 和 订阅类型,此功能不可用。 基本思想是,如果消息重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。 让我们通过检查一些代码片段来了解有关此功能的一些细节:SharedExclusiveFailoverspring-doc.cn

@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {

    @PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
            topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
            subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy deadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

}

首先,我们有一个特殊的 bean for ,它被命名为 (它可以是你想要的任何名称)。 这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 — ,在本例中。 如果不指定 DLQ 主题名称,则默认为 in Pulsar。 接下来,我们通过设置属性来提供此 bean 名称。 请注意,它的订阅类型为 ,因为 DLQ 功能仅适用于共享订阅。 此代码主要用于演示目的,因此我们提供 1 秒的值。 这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它会重试。 如果该周期持续 10 次(因为这是我们在 中的最大重新传递计数),则 Pulsar consumer 会将消息发布到 DLQ 主题。 我们还有另一个侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。DeadLetterPolicydeadLetterPolicymy-dlq-topic<topicname>-<subscriptionname>-DLQPulsarListenerdeadLetterPolicyPulsarListenerSharedackTimeoutDeadLetterPolicyPulsarListenerspring-doc.cn

使用分区主题时有关 DLQ 主题的特别说明

如果 main topic 是分区的,那么在幕后,Pulsar 会将每个分区视为一个单独的 topic。 Pulsar 将 , 其中 代表分区编号 附加到主主题名称。 问题在于,如果你不指定 DLQ 主题(与我们上面所做的相反),Pulsar 会发布到包含此信息的默认主题名称——例如:。 解决此问题的简单方法是始终提供 DLQ 主题名称。partition-<n>n`partition-<n>topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQspring-doc.cn

5.7.4. Spring for Apache Pulsar 中的原生错误处理

正如我们之前提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。 如果应用程序需要对非共享订阅使用一些类似的功能,该怎么办? Pulsar 不支持独占订阅和故障转移订阅的 DLQ 的主要原因是因为这些订阅类型是有订单保证的。 允许重新投递、DLQ 等实际上会不按顺序接收消息。 但是,如果应用程序对此没有问题,但更重要的是,需要此 DLQ 功能用于非共享订阅,该怎么办? 为此, Spring for Apache Pulsar 提供了一个 ,您可以在 Pulsar 中的任何订阅类型中使用它: 、 、 或 。PulsarConsumerErrorHandlerExclusiveFailoverSharedKey_Sharedspring-doc.cn

当你使用 Spring for Apache Pulsar 时,请确保不要在侦听器上设置 ack timeout 属性。PulsarConsumerErrorHandlerspring-doc.cn

让我们通过检查一些代码片段来了解一些细节:spring-doc.cn

@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {

    @Bean
    PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
            PulsarTemplate<String> pulsarTemplate) {
        return new DefaultPulsarConsumerErrorHandler<>(
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
    }

    @PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
                    topics = "pulsarConsumerErrorHandler-topic",
					pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
    void listenDlt(String msg) {
        System.out.println("From DLT: " + msg);
    }

}

考虑一下咖啡豆。 这将创建一个类型的 bean,并使用 Spring for Apache Pulsar 提供的开箱即用的默认实现: 。 有一个采用 a 和 a 的构造函数。 是具有以下 API 的功能性接口:pulsarConsumerErrorHandlerPulsarConsumerErrorHandlerDefaultPulsarConsumerErrorHandlerDefaultPulsarConsumerErrorHandlerPulsarMessageRecovererFactoryorg.springframework.util.backoff.BackoffPulsarMessageRecovererFactoryspring-doc.cn

@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {

	/**
	 * Provides a message recoverer {@link PulsarMessageRecoverer}.
	 * @param consumer Pulsar consumer
	 * @return {@link PulsarMessageRecoverer}.
	 */
	PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);

}

该方法接受一个 Pulsar 消费者并返回一个 ,这是另一个功能接口。 这是 API :recovererForConsumerPulsarMessageRecovererPulsarMessageRecovererspring-doc.cn

public interface PulsarMessageRecoverer<T> {

	/**
	 * Recover a failed message, for e.g. send the message to a DLT.
	 * @param message Pulsar message
	 * @param exception exception from failed message
	 */
	void recoverMessage(Message<T> message, Exception exception);

}

Spring for Apache Pulsar 提供了一个 called 实现,它提供了一个默认实现,可以通过将消息发送到死信主题 (DLT) 来恢复消息。 我们将此实现提供给前面的构造函数。 作为第二个参数,我们提供一个 . 您还可以提供 from Spring for advanced backoff 功能。 然后,我们将这个 bean 名称作为属性提供给 . 该属性称为 。 每次消息的方法失败时,都会重试该方法。 重试次数由提供的 implementation 值控制。在我们的示例中,我们执行 10 次重试(总共 11 次尝试 — 第一次,然后是 10 次重试)。 用尽所有重试后,消息将发送到 DLT 主题。PulsarMessageRecovererFactoryPulsarDeadLetterPublishingRecovererDefaultPulsarConsumerErrorHandlerFixedBackOffExponentialBackoffPulsarConsumerErrorHandlerPulsarListenerpulsarConsumerErrorHandlerPulsarListenerBackoffspring-doc.cn

我们提供的实现使用用于将消息发布到 DLT 的 a。 在大多数情况下,从 Spring Boot 自动配置的相同内容就足够了,但需要注意分区主题。 使用分区主题和对主主题使用自定义消息路由时,必须使用不同的主题,该主题不采用自动配置,该自动配置填充了 for 的值。 您可以将 a 与以下蓝图一起使用:PulsarDeadLetterPublishingRecovererPulsarTemplatePulsarTemplatePulsarTemplatePulsarProducerFactorycustompartitionmessage-routing-modePulsarConsumerErrorHandlerspring-doc.cn

@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
    PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
        PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);

        BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
                (c, m) -> "my-foo-dlt";

        PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);

        return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
                new FixedBackOff(100, 5));
}

请注意,我们提供了一个目标解析器作为第二个 constructor 参数。 如果未提供,则用作 DLT 主题名称。 使用此功能时,您应该通过设置目标解析程序而不是使用默认值来使用正确的目标名称。PulsarDeadLetterPublishingRecovererPulsarDeadLetterPublishingRecoverer<subscription-name>-<topic-name>-DLT>spring-doc.cn

使用单记录消息侦听器时,就像我们对 所做的那样,如果您使用手动确认,请确保在引发异常时不要否定地确认消息。 相反,将异常重新引发回容器。否则,容器会认为消息是单独处理的,不会触发错误处理。PulsarConsumerErrorHnadlerspring-doc.cn

最后,我们有第二个接收来自 DLT 主题的消息。PulsarListenerspring-doc.cn

到目前为止,在本节提供的示例中,我们只看到了如何与单个记录消息侦听器一起使用。 接下来,我们看看如何在批处理侦听器上使用它。PulsarConsumerErrorHandlerspring-doc.cn

5.7.5. 使用 PulsarConsumerErrorHandler 的批处理监听器

首先,我们看一个 batch 方法:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
            subscriptionType = SubscriptionType.Failover,
            pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
    for (Message<Integer> datum : data) {
        if (datum.getValue() == 5) {
            throw new PulsarBatchListenerFailedException("failed", datum);
        }
        acknowledgement.acknowledge(datum.getMessageId());
    }
}

@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
        PulsarTemplate<String> pulsarTemplate) {
    return new DefaultPulsarConsumerErrorHandler<>(
            new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}

@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
    System.out.println("DLT - RECEIVED: " + message.getValue());
}

我们再次为 property 提供 bean 名称。 当您使用批处理侦听器(如前面的示例所示)并希望使用 from Spring for Apache Pulsar 时,您需要使用手动确认。 这样,您可以确认所有成功的单个消息。 对于失败的 API,您必须抛出 a 并显示它失败的消息。 如果没有此异常,框架不知道如何处理失败。 重试时,容器会向侦听器发送一批新的消息,从失败的消息开始。 如果再次失败,则重试,直到重试次数用完,此时消息将发送到 DLT。 此时,容器会确认消息,并将原始批处理中的后续消息移交给侦听器。pulsarConsumerErrorHandlerPulsarConsumerErrorHandlerPulsarConsumerErrorHandlerPulsarBatchListenerFailedExceptionspring-doc.cn

5.8. PulsarListener 的消费者定制

Spring for Apache Pulsar 提供了一种便捷的方式来自定义由 . 应用程序可以为 . 下面是一个示例。PulsarListenerPulsarListenerConsumerBuilderCustomizerspring-doc.cn

@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return cb -> {
        cb.subscriptionName("modified-subscription-name");
    };
}

然后,可以将此定制器 bean 名称作为 Comments 上的属性提供,如下所示。PuslarListenerspring-doc.cn

@PulsarListener(subscriptionName = "my-subscription",
        topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {

}

框架通过 检测提供的 bean,并在创建 Pulsar Consumer 之前将此定制器应用于 Consumer 构建器。PulsarListenerspring-doc.cn

如果您有多个方法,并且每个方法都有不同的定制规则,则应创建多个定制器 bean 并在每个 Bean 上附加适当的定制器。PulsarListenerPulsarListenerspring-doc.cn

5.9. 暂停和恢复消息侦听器容器

在某些情况下,应用程序可能希望暂时暂停消息使用,然后稍后恢复。 Spring for Apache Pulsar 提供了暂停和恢复底层消息侦听器容器的能力。 当 Pulsar 消息侦听器容器暂停时,容器为从 Pulsar 消费者接收数据而进行的任何轮询都将被暂停。 同样,当容器恢复时,如果主题在暂停时添加了任何新记录,则下一次轮询将开始返回数据。spring-doc.cn

要暂停或恢复侦听器容器,请首先通过 Bean 获取容器实例,然后在容器实例上调用暂停/恢复 API - 如下面的代码片段所示:PulsarListenerEndpointRegistryspring-doc.cn

@Autowired
private PulsarListenerEndpointRegistry registry;

void someMethod() {
  PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
  container.pause();
}
传递给的 id 参数是容器 ID - 在暂停/恢复 .getListenerContainer@PulsarListener@PulsarListener

5.10. Pulsar Reader 支持

该框架支持通过 .PulsarReaderFactoryspring-doc.cn

Spring Boot 提供了这个 reader 工厂,你可以通过指定任何 spring.pulsar.reader.* 应用程序属性来进一步配置它。spring-doc.cn

5.10.1. PulsarReader 注解

虽然可以直接使用,但 Spring for Apache Pulsar 提供了注释,您可以使用该注释快速读取主题,而无需自己设置任何读取器工厂。 这与背后的相同想法类似 下面是一个简单的示例。PulsarReaderFactoryPulsarReaderPulsarListener.spring-doc.cn

@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
    //...
}

该属性是可选的,但最佳实践是提供对应用程序有意义的值。 如果未指定,则将使用自动生成的 ID。 另一方面,和 属性是必需的。 该属性可以是单个主题或逗号分隔的主题列表。 该属性指示读者从主题中的特定消息开始。 的有效值为 are 或 假设您希望读者从最早或最新的可用消息以外的主题开始任意读取消息。在这种情况下,您需要使用 a 来自定义 ,以便它知道从哪里开始。idtopicsstartMessageIdtopicsstartMessageIdstartMessageIdearliestlatest.ReaderBuilderCustomizerReaderBuilderMessageIdspring-doc.cn

5.10.2. 自定义 ReaderBuilder

您可以通过使用 Spring for Apache Pulsar 中的来自定义任何可用的字段。 您可以提供 type 的 ,然后将其提供给 ,如下所示。ReaderBuilderPulsarReaderReaderBuilderCustomizer@BeanPulsarReaderReaderBuilderCustomizerPulsarReaderspring-doc.cn

@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
    readerCustomizer = "myCustomizer")
void read(String message) {
    //...
}

@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
    return readerBuilder -> {
        readerBuilder.startMessageId(messageId); // the first message read is after this message id.
        // Any other customizations on the readerBuilder
    };
}
如果您的应用程序只注册了一个 bean 和一个 bean,则将自动应用定制器。@PulsarReaderPulsarReaderReaderBuilderCustomizer

6. 主题解析

在生成或使用消息时,需要目标主题。 框架会查找以下有序位置以确定主题(在第一次查找时停止):spring-doc.cn

当通过默认机制之一找到主题时,无需在 produce 或 consume API 上指定主题。spring-doc.cn

当找不到主题时,API 将相应地引发异常。spring-doc.cn

6.1. 用户指定

传递到正在使用的 API 中的主题具有最高优先级(例如。 或 )。PulsarTemplate.send("my-topic", myMessage)@PulsarListener(topics = "my-topic"spring-doc.cn

6.2. 消息类型默认

当没有主题传递到 API 中时,系统会查找为正在生成或使用的消息类型配置的消息类型到主题映射。spring-doc.cn

可以使用 属性 配置映射。 以下示例用于配置在消费或生成消息时使用的默认主题:spring.pulsar.defaults.type-mappingsapplication.ymlFooBarspring-doc.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.Foo
          topic-name: foo-topic
        - message-type: com.acme.Bar
          topic-name: bar-topic
这是 message 类的完全限定名称。message-type
如果消息(或输入的第一条消息)是 ,则框架将无法从中确定主题。如果您的应用程序可能发送消息,则应使用另一种方法来指定主题。Publishernullnull

6.2.1. 自定义主题解析器

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,您可以通过证明自己的实现来替换默认解析器,例如:spring-doc.cn

@Bean
public MyTopicResolver topicResolver() {
	return new MyTopicResolver();
}

6.3. 生产者全局默认值

最终查询的位置(生产时)是系统范围的生产者默认主题。 它是通过使用命令式 API 时的 property 和使用 reactive API 时的 property 进行配置的。spring.pulsar.producer.topic-namespring.pulsar.reactive.sender.topic-namespring-doc.cn

6.4. Consumer 全局默认值

查询的最后一个位置(使用时)是系统范围的使用者默认主题。 当使用命令式 API 时,它是通过 or 属性配置的,当使用反应式 API 时,它是通过 or 属性之一配置的。spring.pulsar.consumer.topicsspring.pulsar.consumer.topics-patternspring.pulsar.reactive.consumer.topicsspring.pulsar.reactive.consumer.topics-patternspring-doc.cn

7. 发布和使用分区主题

在以下示例中,我们将发布到名为 . 它是一个已分区的主题,对于此示例,我们假设该主题已创建具有三个分区。hello-pulsar-partitionedspring-doc.cn

@SpringBootApplication
public class PulsarBootPartitioned {

	public static void main(String[] args) {
		SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
	}

	@Bean
	public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
		pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
		return args -> {
			for (int i = 0; i < 10; i++) {
				pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
				pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
				pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
			}
		};
	}

	@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
	public void listen(String message) {
		System.out.println("Message Received: " + message);
	}

    static class FooRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 0;
		}
	}

	static class BarRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 1;
		}
	}

	static class BuzzRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 2;
		}
	}

}

在前面的示例中,我们发布到分区主题,并且我们希望将一些数据段发布到特定分区。 如果将其保留为 Pulsar 的默认值,则它遵循分区分配的循环模式,我们想覆盖它。 为此,我们提供了一个带有 method 的 message router 对象。 考虑实现的 3 个消息路由器。 始终将数据发送到 分区 、 发送到分区 和 发送到分区 。 另请注意,我们现在使用 that 的方法返回一个 . 在运行应用程序时,我们还需要将 on the producer 设置为 ()。sendFooRouter0BarRouter1BuzzRouter2sendAsyncPulsarTemplateCompletableFuturemessageRoutingModeCustomPartitionspring.pulsar.producer.message-routing-modespring-doc.cn

在消费者端,我们使用 a 和 exclusive 订阅类型。 这意味着来自所有分区的数据最终都位于同一个使用者中,并且没有排序保证。PulsarListenerspring-doc.cn

如果我们希望每个分区都由单个不同的使用者使用,我们该怎么办? 我们可以切换到订阅模式并添加三个单独的使用者:failoverspring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription",  topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
    System.out.println("Message Received 3: " + foo);
}

当您遵循此方法时,单个分区始终由专用使用者使用。spring-doc.cn

同样,如果你想使用 Pulsar 的共享 consumer 类型,你可以使用 subscription 类型。 但是,当您使用该模式时,您将失去任何排序保证,因为单个使用者可能会在另一个使用者有机会之前从所有分区接收消息。sharedsharedspring-doc.cn

请考虑以下示例:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}