消息消费

1. Pulsar 监听器

对于 Pulsar 消费者,我们建议最终用户应用程序使用 Annotation。 要使用 ,您需要使用注释。 当你使用 Spring Boot 支持时,它会自动启用此 Comments 并配置所有必要的组件,例如消息侦听器基础设施(负责创建 Pulsar 消费者)。 使用 a 创建和管理 Pulsar 消费者,即用于消费消息的底层 Pulsar 消费者。PulsarListenerPulsarListener@EnablePulsarPulsarListenerPulsarMessageListenerContainerPulsarConsumerFactoryspring-doc.cn

Spring Boot 提供了这个消费者工厂,你可以通过指定 spring.pulsar.consumer.* 应用程序属性来进一步配置它。工厂上的大多数已配置属性将在侦听器中得到遵守,但以下情况除外spring-doc.cn

该属性将被忽略,而是在未在 annotation 上指定时生成。spring.pulsar.consumer.subscription.name
该属性将被忽略,而是从 Comments 上的值中获取。但是,您可以将 on the annotation 设置为改用 property 值作为默认值。spring.pulsar.consumer.subscription-typesubscriptionType = {}

让我们重新访问一下我们在快速浏览部分看到的代码片段:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

您可以进一步简化此方法:spring-doc.cn

@PulsarListener
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

在这种最基本的形式中,当 annotation 上未提供 the 时,将使用自动生成的订阅名称。 同样,如果未直接提供 ,则使用主题解析过程来确定目标主题。subscriptionName@PulsarListenertopicsspring-doc.cn

在前面显示的方法中,我们以 形式接收数据,但我们没有指定任何架构类型。 在内部,该框架依赖于 Pulsar 的 schema 机制将数据转换为所需的类型。 框架检测到您期望的类型,然后根据该信息推断架构类型,并将该架构提供给使用者。 框架对所有基元类型执行此推理。 对于所有非基元类型,默认架构假定为 JSON。 如果复杂类型使用 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须使用属性在注释上提供架构类型。PulsarListenerStringStringschemaTypespring-doc.cn

以下示例显示了另一种方法,该方法采用 :PulsarListenerIntegerspring-doc.cn

@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
   System.out.println(message);
}

以下方法展示了我们如何从主题中使用复杂类型:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
    System.out.println(message);
}

让我们看看更多方法。spring-doc.cn

你可以直接消费 Pulsar 消息:spring-doc.cn

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
}

下面的示例使用 Spring 消息传递信封来使用记录:spring-doc.cn

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
}

现在让我们看看如何批量使用记录。 以下示例用于批量使用记录作为 POJO:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

请注意,在此示例中,我们将记录作为对象的集合 () 接收。 此外,要在级别启用批量使用,您需要将注释上的属性设置为 。ListPulsarListenerbatchtruespring-doc.cn

根据持有的实际类型,框架会尝试推断要使用的架构。 如果 it 包含除 JSON 之外的复杂类型,您仍需要提供 on .ListListschemaTypePulsarListenerspring-doc.cn

下面使用 Pulsar Java 客户端提供的 envelope:Messagespring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

以下示例使用具有 Spring 消息传递类型的信封的批处理记录:Messagespring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}

最后,你也可以将 Pulsar 的 holder 对象用于批处理监听器:Messagesspring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

当你使用 时,你可以直接在注解本身上提供 Pulsar 消费者属性。 如果您不想使用前面提到的 Boot 配置属性或具有多种方法,这将非常方便。PulsarListenerPulsarListenerspring-doc.cn

以下示例直接在 上使用 Pulsar 消费者属性:PulsarListenerspring-doc.cn

@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用的属性是直接的 Pulsar 消费者属性,而不是应用程序配置属性spring.pulsar.consumer

1.1. 使用 AUTO_CONSUME 的通用记录

如果无法提前知道某个 Pulsar topic 的 schema 类型,可以使用 schema type 来消费泛型记录。 在这种情况下,主题使用与主题关联的架构信息将消息反序列化为对象。AUTO_CONSUMEGenericRecordspring-doc.cn

要使用泛型记录,请在 your 上设置 ,并使用 type 为 Pulsar 的消息作为 message 参数,如下所示。schemaType = SchemaType.AUTO_CONSUME@PulsarListenerGenericRecordspring-doc.cn

@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
    GenericRecord record = message.getValue();
    record.getFields().forEach((f) ->
            System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
API 允许访问字段及其关联值GenericRecord

1.2. 自定义 ConsumerBuilder

您可以通过使用 a 自定义任何可用的字段,方法是提供 of 类型,然后将其提供给 ,如下所示。ConsumerBuilderPulsarListenerConsumerBuilderCustomizer@BeanPulsarListenerConsumerBuilderCustomizerPulsarListenerspring-doc.cn

@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只注册了一个 bean 和一个 bean,则将自动应用定制器。@PulsarListenerPulsarListenerConsumerBuilderCustomizer

2. 指定 Schema 信息

如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在 . 对于非原始类型,如果未在注解上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试从该类型构建一个。PulsarListenerSchema.JSONspring-doc.cn

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE w/ INLINE 编码。

2.1. 自定义 Schema 映射

作为在 for complex types 上指定架构的替代方法,可以使用类型的映射配置架构解析程序。 这样就无需在侦听器上设置 schema,因为框架使用传入消息类型咨询解析程序。PulsarListenerspring-doc.cn

2.1.1. 配置属性

可以使用属性配置架构映射。 以下示例用于分别使用 和 schema 为 和 complex 对象添加映射:spring.pulsar.defaults.type-mappingsapplication.ymlUserAddressAVROJSONspring-doc.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
这是 message 类的完全限定名称。message-type

2.1.2. Schema 解析器定制器

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。spring-doc.cn

以下示例使用架构解析程序定制器分别使用 and schemas 为 和 complex 对象添加映射:UserAddressAVROJSONspring-doc.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 类型映射注解

指定要用于特定消息类型的默认架构信息的另一个选项是使用注释标记消息类。 可以通过 annotation 上的属性指定 schema info。@PulsarMessageschemaTypespring-doc.cn

以下示例将系统配置为在生成或使用 type 为 messages 的 messages 时使用 JSON 作为默认模式:Foospring-doc.cn

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了这个配置,就不需要在侦听器上设置 schema,例如:spring-doc.cn

@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
    System.out.println(user);
}

3. 访问 Pulsar Consumer Object

有时,你需要直接访问 Pulsar Consumer 对象。 以下示例显示了如何获取它:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
    System.out.println("Message Received: " + message);
    ConsumerStats stats = consumer.getStats();
    ...
}
以这种方式访问对象时,请勿调用任何会通过调用任何 receive 方法更改 Consumer 光标位置的操作。 所有此类操作都必须由容器完成。Consumer

4. Pulsar 消息监听器容器

现在我们已经看到了消费者端的基本交互。现在让我们深入了解一下如何与底层 Pulsar 消费者交互的内部工作原理。 请记住,对于最终用户应用程序,在大多数情况下,我们建议在使用 Spring for Apache Pulsar 时直接使用 Pulsar 主题中的注释来使用,因为该模型涵盖了广泛的应用程序用例。 但是,了解内部的工作原理很重要。本节将介绍这些详细信息。PulsarListenerPulsarListenerPulsarListenerPulsarListenerspring-doc.cn

如前所述,当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息消费的核心。 在幕后使用消息侦听器容器基础设施来创建和管理 Pulsar 消费者。 Spring for Apache Pulsar 通过 提供此消息侦听器容器的 contract。 此消息侦听器容器的默认实现通过 提供。 顾名思义,包含消息侦听器。 容器创建 Pulsar 消费者,然后运行一个单独的线程来接收和处理数据。 数据由提供的消息侦听器实现处理。PulsarListenerPulsarMessageListenerContainerDefaultPulsarMessageListenerContainerPulsarMessageListenerContainerspring-doc.cn

消息侦听器容器使用使用者的方法批量消费数据。 收到数据后,数据将移交给选定的消息侦听器实现。batchReceivespring-doc.cn

使用 Spring for Apache Pulsar 时,可以使用以下消息侦听器类型。spring-doc.cn

我们将在以下部分中看到有关这些不同消息侦听器的详细信息。spring-doc.cn

但是,在这样做之前,让我们仔细看看容器本身。spring-doc.cn

4.1. 默认 PulsarMessageListenerContainer

这是一个基于使用者的消息侦听器容器。 下面的清单显示了它的构造函数:spring-doc.cn

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
			PulsarContainerProperties pulsarContainerProperties)
}

它接收一个 (用于创建使用者) 和一个对象 (包含有关容器属性的信息)。 具有以下构造函数:PulsarConsumerFactoryPulsarContainerPropertiesPulsarContainerPropertiesspring-doc.cn

public PulsarContainerProperties(String... topics)

public PulsarContainerProperties(Pattern topicPattern)

您可以通过提供给 Consumer Factory 的 Consumer 属性提供主题信息,也可以将其作为 Consumer Property 提供。 以下示例使用 :PulsarContainerPropertiesDefaultPulsarMessageListenerContainerspring-doc.cn

Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();

pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
		});

DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
        pulsarContainerProperties);

return pulsarListenerContainer;
如果在直接使用侦听器容器时未指定主题信息,则使用与 相同的主题解析过程,但省略了 “Message type default” 步骤。PulsarListener

DefaultPulsarMessageListenerContainer仅创建一个使用者。 如果要通过多个线程管理多个使用者,则需要使用 .ConcurrentPulsarMessageListenerContainerspring-doc.cn

4.2. ConcurrentPulsarMessageListenerContainer

ConcurrentPulsarMessageListenerContainer具有以下构造函数:spring-doc.cn

public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
    PulsarContainerProperties pulsarContainerProperties)

ConcurrentPulsarMessageListenerContainer允许您通过 setter 指定属性。 仅在非独占订阅 (, , 和 ) 上允许多于的并发数。 只有在您具有独占订阅模式时,才能使用默认并发。concurrency1failoversharedkey-shared1spring-doc.cn

以下示例为订阅启用 through 注释。concurrencyPulsarListenerfailoverspring-doc.cn

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
    ...
    System.out.println("Current Thread: " + Thread.currentThread().getName());
    System.out.println("Current Consumer: " + consumer.getConsumerName());
}

在前面的监听器中,假设 topic 有 3 个 partition。 如果它是未分区的主题,则将并发设置为 does no (并发性) 不会执行任何操作。除了主要的活跃使用者之外,您还会获得两个空闲使用者。 如果主题具有三个以上的分区,则消息将在容器创建的使用者之间进行负载均衡。 如果运行此 ,则会看到来自不同分区的消息通过不同的使用者使用,如前面示例中的线程名称和使用者名称打印输出所暗示的那样。my-topic3PulsarListenerspring-doc.cn

当你在分区主题上以这种方式使用订阅时,Pulsar 保证消息排序。Failover

下面的清单显示了 的另一个示例,但具有 subscription 和 enabled。PulsarListenerSharedconcurrencyspring-doc.cn

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
    ...
}

在前面的示例中,创建了 5 个不同的使用者(这一次,我们假设该主题有 5 个分区)。PulsarListenerspring-doc.cn

在这个版本中,没有消息排序,因为订阅不保证 Pulsar 中的任何消息排序。Shared

如果您需要消息排序,但仍需要共享订阅类型,则需要使用订阅类型。Key_Sharedspring-doc.cn

4.3. 使用记录

让我们看看消息侦听器容器如何实现基于单记录和基于批处理的消息使用。spring-doc.cn

单记录消耗

为了这次讨论,让我们重新审视一下我们的基本知识:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

使用这种方法,我们必须要求 Spring for Apache Pulsar 每次都使用一条记录调用监听器方法。 我们提到了消息侦听器容器使用使用者上的方法分批使用数据。 框架检测到 ,在本例中,接收到一条记录。这意味着,在每次调用该方法时,它都需要一个 singe 记录。 尽管消息侦听器容器会批量使用记录,但它会遍历收到的批处理,并通过适配器调用侦听器方法。 正如你在上一节中看到的,它继承自 Pulsar Java 客户端提供的,它支持基本方法。PulsarListenerbatchReceivePulsarListenerPulsarRecordMessageListenerPulsarRecordMessageListenerMessageListenerreceivedspring-doc.cn

批量消耗

以下示例显示了批量消费记录:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

当您使用这种类型的 时,框架会检测到您处于批处理模式。 由于它已经使用 Consumer 的方法分批接收了数据,因此它通过 的适配器将整个批处理移交给侦听器方法。PulsarListenerbatchReceivePulsarBatchMessageListenerspring-doc.cn

5. Pulsar 标头

Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头的列表可以在 PulsarHeaders.java 中找到。spring-doc.cn

5.1. 在基于单记录的 Consumer 中访问

以下示例展示了如何在使用单记录消费模式的应用程序中访问各种 Pulsar Headers:spring-doc.cn

@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
                @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
                @Header("foo") String foo) {

}

在前面的示例中,我们访问 和 message 元数据的值以及名为 . Spring 注解用于每个 header 字段。messageIdrawDatafoo@Headerspring-doc.cn

你也可以使用 Pulsar 的 作为 envelope 来携带有效载荷。 这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。 但是,为方便起见,您也可以使用 Comments 来检索它。 请注意,您还可以使用 Spring 消息传递信封来携带有效负载,然后使用 检索 Pulsar 标头。MessageHeaderMessage@Headerspring-doc.cn

5.2. 基于批量记录的 Consumer 访问

在本节中,我们将了解如何在使用批处理 consumer 的应用程序中访问各种 Pulsar Headers:spring-doc.cn

@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
					@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
					@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {

}

在前面的示例中,我们将数据作为 . 在提取各种 headers 时,我们也作为 a 执行此操作。 Spring for Apache Pulsar 确保 header 列表与 data 列表相对应。List<String>List<>spring-doc.cn

当您使用批处理侦听器并接收负载时,您还可以以相同的方式提取标头,并将有效负载接收为 、 、 或 。List<org.apache.pulsar.client.api.Message<?>org.apache.pulsar.client.api.Messages<?>org.springframework.messaging.Messsge<?>spring-doc.cn

6. 消息鸣谢

当您使用 Spring for Apache Pulsar 时,消息确认由框架处理,除非应用程序选择退出。 在本节中,我们将详细介绍框架如何处理消息确认。spring-doc.cn

6.1. 消息 ACK 模式

Spring for Apache Pulsar 提供了以下确认消息的模式:spring-doc.cn

BATCH确认模式是默认模式,但您可以在 Message Listener Container (消息侦听器) 容器上更改它。 在以下部分中,我们将了解当您同时使用 的单个和批处理版本时,确认是如何工作的,以及它们如何转换为后备消息侦听器容器(并最终转换为 Pulsar 消费者)。PulsarListenerspring-doc.cn

6.2. 单记录模式下的自动消息确认

让我们重新审视一下基本的基于 Single Message 的 :PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

很自然地想知道,当你使用 时,确认是如何工作的,特别是如果你熟悉直接使用 Pulsar consumer。 答案归结为消息侦听器容器,因为这是 Spring for Apache Pulsar 中协调所有与消费者相关的活动的中心位置。PulsarListenerspring-doc.cn

假设您没有覆盖默认行为,那么当您使用上述 :PulsarListenerspring-doc.cn

  1. 首先,侦听器容器从 Pulsar 消费者批量接收消息。spring-doc.cn

  2. 收到的消息一次传递给一条消息。PulsarListenerspring-doc.cn

  3. 当所有记录都传递给侦听器方法并成功处理时,容器将确认来自原始批处理的所有消息。spring-doc.cn

这是正常流程。如果原始批处理中的任何记录引发异常, Spring for Apache Pulsar 会单独跟踪这些记录。 当批处理中的所有记录都被处理完时, Spring for Apache Pulsar 会确认所有成功的消息,并否定地确认 (nack) 所有失败的消息。 换句话说,当使用 using 使用默认的 ack 模式时,框架会等待从调用收到的所有记录都处理成功,然后在 Pulsar consumer 上调用该方法。 如果任何特定记录在调用处理程序方法时抛出异常,Spring for Apache Pulsar 会跟踪这些记录,并在处理整个批处理后单独调用这些记录。PulsarRecordMessageListenerBATCHbatchReceiveacknowledgenegativeAcknowledgespring-doc.cn

如果应用程序希望每条记录发生确认或否定确认,则可以启用 ack 模式。 在这种情况下,在处理每条记录后,如果没有错误,则确认消息,如果有错误,则否定确认消息。 以下示例在 Pulsar 监听器上启用 ack 模式:RECORDRECORDspring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

6.3. 单记录模式下的手动消息确认

您可能并不总是希望框架发送确认,而是直接从应用程序本身发送。 Spring for Apache Pulsar 提供了几种启用手动消息确认的方法。以下示例显示了其中之一:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
    System.out.println("Message Received: " + message.getValue());
	acknowledgment.acknowledge();
}

有几件事值得在这里解释。首先,我们通过设置 来启用手动确认模式 。 启用手动 ack 模式时, Spring for Apache Pulsar 允许应用程序注入对象。 该框架通过选择兼容的消息侦听器容器来实现此目的:用于基于单个记录的消费,它允许您访问对象。ackModePulsarListenerAcknowledgmentPulsarAcknowledgingMessageListenerAcknowledgmentspring-doc.cn

该对象提供以下 API 方法:Acknowledgmentspring-doc.cn

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

你可以使用 ack 模式将此对象注入到你的 while 中,然后调用相应的方法之一。AcknowledgmentPulsarListenerMANUALspring-doc.cn

在前面的示例中,我们调用了无参数方法。 这是因为框架知道它当前在哪个下运行。 调用 时,您不需要使用 enveloper' 接收有效负载,而是使用目标类型 — ,在本例中。 您还可以通过提供消息 ID 来调用不同的变体:使用 时,必须使用信封接收有效负载。PulsarListeneracknowledgeMessageacknowledge()MessageStringacknowledgeacknowledge.acknowledge(message.getMessageId());acknowledge(messageId)Message<?>spring-doc.cn

与可能的确认类似,API 也提供了用于否定确认的选项。 请参阅前面显示的 nack 方法。Acknowledgmentspring-doc.cn

您也可以直接在 Pulsar 消费者上调用:acknowledgespring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
    System.out.println("Message Received: " + message.getValue());
	try {
		consumer.acknowledge(message);
	}
	catch (Exception e) {
		....
	}
}

直接调用底层消费者时,需要自己做错误处理。 使用 the 不需要这样做,因为框架可以为你执行此操作。 因此,在使用手动确认时,应使用 object 方法。acknowledgeAcknowledgmentAcknowledgmentspring-doc.cn

使用手动确认时,请务必了解框架完全不执行任何确认。 因此,在设计应用程序时考虑正确的确认策略非常重要。

6.4. 批量消费自动确认消息

当您批量使用记录(请参阅“消息 ACK 模式”)并使用默认的 ack 模式时,当成功处理整个批次时,将确认整个批次。 如果任何记录引发异常,则对整个批处理进行否定确认。 请注意,这可能不是在生产者端批处理的同一批次。相反,这是从调用使用者返回的批处理BATCHbatchReceivespring-doc.cn

请考虑以下批处理侦听器:spring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
    for (Foo foo : messages) {
		...
    }
}

当处理传入集合(在本例中)中的所有消息时,框架会确认所有消息。messagesspring-doc.cn

在批处理模式下使用时,不允许的 ack 模式。 这可能会导致问题,因为应用程序可能不希望再次重新交付整个批处理。 在这种情况下,您需要使用确认模式。RECORDMANUALspring-doc.cn

6.5. 批量消费时手动消息确认

如上一节所示,当在消息侦听器容器上设置 ack 模式时,框架不会执行任何确认,无论是肯定的还是否定的。 处理此类问题完全取决于应用程序。 设置 ack 模式后, Spring for Apache Pulsar 会选择一个兼容的消息侦听器容器:用于批量消费,这样您就可以访问对象。 以下是 API 中可用的方法:MANUALMANUALPulsarBatchAcknowledgingMessageListenerAcknowledgmentAcknowledgmentspring-doc.cn

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

你可以在使用 ack 模式时将此对象注入到你的 state 中。 下面的清单显示了基于批处理的侦听器的基本示例:AcknowledgmentPulsarListenerMANUALspring-doc.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
    for (Message<String> message : messages) {
		try {
			...
	        acknowledgment.acknowledge(message.getMessageId());
		}
		catch (Exception e) {
			acknowledgment.nack(message.getMessageId());
		}
    }
}

使用批处理侦听器时,消息侦听器容器无法知道它当前正在处理哪个记录。 因此,要手动确认,您需要使用采用 a 或 a 的重载方法之一。 您还可以使用 for the batch listener 进行否定确认。acknowledgeMessageIdList<MessageId>MessageIdspring-doc.cn

7. 消息重新投递和错误处理

现在我们已经了解了消息侦听器容器基础设施及其各种功能,现在让我们尝试了解消息重新传递和错误处理。 Apache Pulsar 提供了各种用于消息重新传递和错误处理的原生策略。我们来看看它们,看看如何通过 Spring for Apache Pulsar 使用它们。PulsarListenerspring-doc.cn

7.1. 指定消息重新投递的确认超时

默认情况下,除非 Consumer 崩溃,否则 Pulsar Consumer 不会重新传递消息,但你可以通过在 Pulsar Consumer 上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 消费者在该超时时间内没有确认消息,则重新传递该消息。spring-doc.cn

当你使用 Spring for Apache Pulsar 时,你可以通过消费者定制器或使用 attribute 中的原生 Pulsar 属性来设置此属性:ackTimeoutproperties@PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"ackTimeout=60s"})
public void listen(String s) {
    ...
}

当你指定 ack 超时时,如果 Consumer 在 60 秒内没有发送确认,则 Pulsar 会将消息重新投递给 Consumer。spring-doc.cn

如果要为具有不同延迟的 ack timeout 指定一些高级回退选项,可以执行以下操作:spring-doc.cn

@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {

    @PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
            topics = "withAckTimeoutRedeliveryBackoff-test-topic",
            ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
            properties = { "ackTimeout=60s" })
    void listen(String msg) {
        // some long-running process that may cause an ack timeout
    }

    @Bean
    RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

在前面的示例中,我们为 Pulsar 指定了一个 bean,最小延迟为 1 秒,最大延迟为 10 秒,回退乘数为 2。 在发生初始 ack 超时后,通过此回退 Bean 控制消息重新传递。 在本例中,我们通过将属性设置为实际的 bean 名称 — 来为 Comments 提供回退 bean。RedeliveryBackoffPulsarListenerackTimeoutRedeliveryBackoffackTimeoutRedeliveryBackoffspring-doc.cn

7.2. 指定否定确认重新传递

当否定确认时,Pulsar consumer 允许你指定应用程序希望如何重新传递消息。 默认是在一分钟内重新传递消息,但你可以通过消费者定制器或使用 attribute 中的原生 Pulsar 属性来更改它:negativeAckRedeliveryDelayproperties@PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
    ...
}

你还可以通过提供 bean 并将 bean 名称作为 PulsarProducer 上的属性来指定不同的延迟和退避机制,如下所示:RedeliveryBackoffnegativeAckRedeliveryBackoffspring-doc.cn

@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {

    @PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
            topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
            subscriptionType = SubscriptionType.Shared)
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @Bean
    RedeliveryBackoff redeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

7.3. 使用 Apache Pulsar 的死信主题进行消息重投和错误处理

Apache Pulsar 允许应用程序在具有订阅类型的消费者上使用死信主题。 对于 和 订阅类型,此功能不可用。 基本思想是,如果消息重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。 让我们通过检查一些代码片段来了解有关此功能的一些细节:SharedExclusiveFailoverspring-doc.cn

@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {

    @PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
            topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
            subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy deadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

}

首先,我们有一个特殊的 bean for ,它被命名为 (它可以是你想要的任何名称)。 这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 — ,在本例中。 如果不指定 DLQ 主题名称,则默认为 in Pulsar。 接下来,我们通过设置属性来提供此 bean 名称。 请注意,它的订阅类型为 ,因为 DLQ 功能仅适用于共享订阅。 此代码主要用于演示目的,因此我们提供 1 秒的值。 这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它会重试。 如果该周期持续 10 次(因为这是我们在 中的最大重新传递计数),则 Pulsar consumer 会将消息发布到 DLQ 主题。 我们还有另一个侦听 DLQ 主题,以便在数据发布到 DLQ 主题时接收数据。DeadLetterPolicydeadLetterPolicymy-dlq-topic<topicname>-<subscriptionname>-DLQPulsarListenerdeadLetterPolicyPulsarListenerSharedackTimeoutDeadLetterPolicyPulsarListenerspring-doc.cn

使用分区主题时有关 DLQ 主题的特别说明

如果 main topic 是分区的,那么在幕后,Pulsar 会将每个分区视为一个单独的 topic。 Pulsar 将 , 其中 代表分区编号 附加到主主题名称。 问题在于,如果你不指定 DLQ 主题(与我们上面所做的相反),Pulsar 会发布到包含此信息的默认主题名称——例如:。 解决此问题的简单方法是始终提供 DLQ 主题名称。partition-<n>n`partition-<n>topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQspring-doc.cn

7.4. Spring for Apache Pulsar 中的原生错误处理

正如我们之前提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。 如果应用程序需要对非共享订阅使用一些类似的功能,该怎么办? Pulsar 不支持独占订阅和故障转移订阅的 DLQ 的主要原因是因为这些订阅类型是有订单保证的。 允许重新投递、DLQ 等实际上会不按顺序接收消息。 但是,如果应用程序对此没有问题,但更重要的是,需要此 DLQ 功能用于非共享订阅,该怎么办? 为此, Spring for Apache Pulsar 提供了一个 ,您可以在 Pulsar 中的任何订阅类型中使用它: 、 、 或 。PulsarConsumerErrorHandlerExclusiveFailoverSharedKey_Sharedspring-doc.cn

当你使用 Spring for Apache Pulsar 时,请确保不要在侦听器上设置 ack timeout 属性。PulsarConsumerErrorHandlerspring-doc.cn

让我们通过检查一些代码片段来了解一些细节:spring-doc.cn

@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {

    @Bean
    PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
            PulsarTemplate<String> pulsarTemplate) {
        return new DefaultPulsarConsumerErrorHandler<>(
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
    }

    @PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
                    topics = "pulsarConsumerErrorHandler-topic",
					pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
    void listenDlt(String msg) {
        System.out.println("From DLT: " + msg);
    }

}

考虑一下咖啡豆。 这将创建一个类型的 bean,并使用 Spring for Apache Pulsar 提供的开箱即用的默认实现: 。 有一个采用 a 和 a 的构造函数。 是具有以下 API 的功能性接口:pulsarConsumerErrorHandlerPulsarConsumerErrorHandlerDefaultPulsarConsumerErrorHandlerDefaultPulsarConsumerErrorHandlerPulsarMessageRecovererFactoryorg.springframework.util.backoff.BackoffPulsarMessageRecovererFactoryspring-doc.cn

@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {

	/**
	 * Provides a message recoverer {@link PulsarMessageRecoverer}.
	 * @param consumer Pulsar consumer
	 * @return {@link PulsarMessageRecoverer}.
	 */
	PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);

}

该方法接受一个 Pulsar 消费者并返回一个 ,这是另一个功能接口。 这是 API :recovererForConsumerPulsarMessageRecovererPulsarMessageRecovererspring-doc.cn

public interface PulsarMessageRecoverer<T> {

	/**
	 * Recover a failed message, for e.g. send the message to a DLT.
	 * @param message Pulsar message
	 * @param exception exception from failed message
	 */
	void recoverMessage(Message<T> message, Exception exception);

}

Spring for Apache Pulsar 提供了一个 called 实现,它提供了一个默认实现,可以通过将消息发送到死信主题 (DLT) 来恢复消息。 我们将此实现提供给前面的构造函数。 作为第二个参数,我们提供一个 . 您还可以提供 from Spring for advanced backoff 功能。 然后,我们将这个 bean 名称作为属性提供给 . 该属性称为 。 每次消息的方法失败时,都会重试该方法。 重试次数由提供的 implementation 值控制。在我们的示例中,我们执行 10 次重试(总共 11 次尝试 — 第一次,然后是 10 次重试)。 用尽所有重试后,消息将发送到 DLT 主题。PulsarMessageRecovererFactoryPulsarDeadLetterPublishingRecovererDefaultPulsarConsumerErrorHandlerFixedBackOffExponentialBackoffPulsarConsumerErrorHandlerPulsarListenerpulsarConsumerErrorHandlerPulsarListenerBackoffspring-doc.cn

我们提供的实现使用用于将消息发布到 DLT 的 a。 在大多数情况下,从 Spring Boot 自动配置的相同内容就足够了,但需要注意分区主题。 使用分区主题和对主主题使用自定义消息路由时,必须使用不同的主题,该主题不采用自动配置,该自动配置填充了 for 的值。 您可以将 a 与以下蓝图一起使用:PulsarDeadLetterPublishingRecovererPulsarTemplatePulsarTemplatePulsarTemplatePulsarProducerFactorycustompartitionmessage-routing-modePulsarConsumerErrorHandlerspring-doc.cn

@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
    PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
        PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);

        BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
                (c, m) -> "my-foo-dlt";

        PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);

        return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
                new FixedBackOff(100, 5));
}

请注意,我们提供了一个目标解析器作为第二个 constructor 参数。 如果未提供,则用作 DLT 主题名称。 使用此功能时,您应该通过设置目标解析程序而不是使用默认值来使用正确的目标名称。PulsarDeadLetterPublishingRecovererPulsarDeadLetterPublishingRecoverer<subscription-name>-<topic-name>-DLT>spring-doc.cn

使用单记录消息侦听器时,就像我们对 所做的那样,如果您使用手动确认,请确保在引发异常时不要否定地确认消息。 相反,将异常重新引发回容器。否则,容器会认为消息是单独处理的,不会触发错误处理。PulsarConsumerErrorHnadlerspring-doc.cn

最后,我们有第二个接收来自 DLT 主题的消息。PulsarListenerspring-doc.cn

到目前为止,在本节提供的示例中,我们只看到了如何与单个记录消息侦听器一起使用。 接下来,我们看看如何在批处理侦听器上使用它。PulsarConsumerErrorHandlerspring-doc.cn

7.5. 使用 PulsarConsumerErrorHandler 的批处理监听器

首先,我们看一个 batch 方法:PulsarListenerspring-doc.cn

@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
            subscriptionType = SubscriptionType.Failover,
            pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
    for (Message<Integer> datum : data) {
        if (datum.getValue() == 5) {
            throw new PulsarBatchListenerFailedException("failed", datum);
        }
        acknowledgement.acknowledge(datum.getMessageId());
    }
}

@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
        PulsarTemplate<String> pulsarTemplate) {
    return new DefaultPulsarConsumerErrorHandler<>(
            new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}

@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
    System.out.println("DLT - RECEIVED: " + message.getValue());
}

我们再次为 property 提供 bean 名称。 当您使用批处理侦听器(如前面的示例所示)并希望使用 from Spring for Apache Pulsar 时,您需要使用手动确认。 这样,您可以确认所有成功的单个消息。 对于失败的 API,您必须抛出 a 并显示它失败的消息。 如果没有此异常,框架不知道如何处理失败。 重试时,容器会向侦听器发送一批新的消息,从失败的消息开始。 如果再次失败,则重试,直到重试次数用完,此时消息将发送到 DLT。 此时,容器会确认消息,并将原始批处理中的后续消息移交给侦听器。pulsarConsumerErrorHandlerPulsarConsumerErrorHandlerPulsarConsumerErrorHandlerPulsarBatchListenerFailedExceptionspring-doc.cn

8. PulsarListener 上的消费者定制

Spring for Apache Pulsar 提供了一种便捷的方式来自定义由 . 应用程序可以为 . 下面是一个示例。PulsarListenerPulsarListenerConsumerBuilderCustomizerspring-doc.cn

@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return cb -> {
        cb.subscriptionName("modified-subscription-name");
    };
}

然后,可以将此定制器 bean 名称作为 Comments 上的属性提供,如下所示。PuslarListenerspring-doc.cn

@PulsarListener(subscriptionName = "my-subscription",
        topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {

}

框架通过 检测提供的 bean,并在创建 Pulsar Consumer 之前将此定制器应用于 Consumer 构建器。PulsarListenerspring-doc.cn

如果您有多个方法,并且每个方法都有不同的定制规则,则应创建多个定制器 bean 并在每个 Bean 上附加适当的定制器。PulsarListenerPulsarListenerspring-doc.cn

9. 暂停和恢复消息侦听器容器

在某些情况下,应用程序可能希望暂时暂停消息使用,然后稍后恢复。 Spring for Apache Pulsar 提供了暂停和恢复底层消息侦听器容器的能力。 当 Pulsar 消息侦听器容器暂停时,容器为从 Pulsar 消费者接收数据而进行的任何轮询都将被暂停。 同样,当容器恢复时,如果主题在暂停时添加了任何新记录,则下一次轮询将开始返回数据。spring-doc.cn

要暂停或恢复侦听器容器,请首先通过 Bean 获取容器实例,然后在容器实例上调用暂停/恢复 API - 如下面的代码片段所示:PulsarListenerEndpointRegistryspring-doc.cn

@Autowired
private PulsarListenerEndpointRegistry registry;

void someMethod() {
  PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
  container.pause();
}
传递给的 id 参数是容器 ID - 在暂停/恢复 .getListenerContainer@PulsarListener@PulsarListener

10. Pulsar Reader 支持

该框架支持通过 .PulsarReaderFactoryspring-doc.cn

Spring Boot 提供了这个 reader 工厂,你可以通过指定任何 spring.pulsar.reader.* 应用程序属性来进一步配置它。spring-doc.cn

10.1. PulsarReader 注解

虽然可以直接使用,但 Spring for Apache Pulsar 提供了注释,您可以使用该注释快速读取主题,而无需自己设置任何读取器工厂。 这与背后的相同想法类似 下面是一个简单的示例。PulsarReaderFactoryPulsarReaderPulsarListener.spring-doc.cn

@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
    //...
}

该属性是可选的,但最佳实践是提供对应用程序有意义的值。 如果未指定,则将使用自动生成的 ID。 另一方面,和 属性是必需的。 该属性可以是单个主题或逗号分隔的主题列表。 该属性指示读者从主题中的特定消息开始。 的有效值为 are 或 假设您希望读者从最早或最新的可用消息以外的主题开始任意读取消息。在这种情况下,您需要使用 a 来自定义 ,以便它知道从哪里开始。idtopicsstartMessageIdtopicsstartMessageIdstartMessageIdearliestlatest.ReaderBuilderCustomizerReaderBuilderMessageIdspring-doc.cn

10.2. 自定义 ReaderBuilder

您可以通过使用 Spring for Apache Pulsar 中的来自定义任何可用的字段。 您可以提供 type 的 ,然后将其提供给 ,如下所示。ReaderBuilderPulsarReaderReaderBuilderCustomizer@BeanPulsarReaderReaderBuilderCustomizerPulsarReaderspring-doc.cn

@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
    readerCustomizer = "myCustomizer")
void read(String message) {
    //...
}

@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
    return readerBuilder -> {
        readerBuilder.startMessageId(messageId); // the first message read is after this message id.
        // Any other customizations on the readerBuilder
    };
}
如果您的应用程序只注册了一个 bean 和一个 bean,则将自动应用定制器。@PulsarReaderPulsarReaderReaderBuilderCustomizer