The framework provides a Reactive counterpart for almost all supported features.spring-doc.cn

If you put the word Reactive in front of a provided imperative component, you will likely find its Reactive counterpart.spring-doc.cn

However, the following is not yet supported:spring-doc.cn

If you put the word Reactive in front of a provided imperative component, you will likely find its Reactive counterpart.spring-doc.cn

1. Preface

We recommend using a Spring-Boot-First approach for Spring for Apache Pulsar-based applications, as that simplifies things tremendously. To do so, you can add the spring-pulsar-reactive-spring-boot-starter module as a dependency.
The majority of this reference expects the reader to be using the starter and gives most directions for configuration with that in mind. However, an effort is made to call out when instructions are specific to the Spring Boot starter usage.
We recommend using a Spring-Boot-First approach for Spring for Apache Pulsar-based applications, as that simplifies things tremendously. To do so, you can add the spring-pulsar-reactive-spring-boot-starter module as a dependency.
The majority of this reference expects the reader to be using the starter and gives most directions for configuration with that in mind. However, an effort is made to call out when instructions are specific to the Spring Boot starter usage.

2. Quick Tour

We will take a quick tour of the Reactive support in Spring for Apache Pulsar by showing a sample Spring Boot application that produces and consumes in a Reactive fashion. This is a complete application and does not require any additional configuration, as long as you have a Pulsar cluster running on the default location - localhost:6650.spring-doc.cn

2.1. Dependencies

Spring Boot applications need only the spring-pulsar-reactive-spring-boot-starter dependency. The following listings show how to define the dependency for Maven and Gradle, respectively:spring-doc.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar-reactive</artifactId>
        <version>3.2.11-SNAPSHOT</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-pulsar-reactive:3.2.11-SNAPSHOT'
}

When using Version 0.2.x the above coordinates change as follows:spring-doc.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-reactive-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-reactive-spring-boot-starter:0.2.0'
}

2.2. Application Code

Here is the application source code:spring-doc.cn

@SpringBootApplication
public class ReactiveSpringPulsarHelloWorld {

    public static void main(String[] args) {
        SpringApplication.run(ReactiveSpringPulsarHelloWorld.class, args);
    }

    @Bean
    ApplicationRunner runner(ReactivePulsarTemplate<String> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Reactive Pulsar World!").subscribe();
    }

    @ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
    Mono<Void> listen(String message) {
        System.out.println("Reactive listener received: " + message);
        return Mono.empty();
    }
}

That is it, with just a few lines of code we have a working Spring Boot app that is producing and consuming messages from a Pulsar topic in a Reactive fashion.spring-doc.cn

Once started, the application uses a ReactivePulsarTemplate to send messages to the hello-pulsar-topic. It then consumes from the hello-pulsar-topic using a @ReactivePulsarListener.spring-doc.cn

One of the key ingredients to the simplicity is the Spring Boot starter which auto-configures and provides the required components to the application

When using Version 0.2.x the above coordinates change as follows:spring-doc.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-reactive-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-reactive-spring-boot-starter:0.2.0'
}
One of the key ingredients to the simplicity is the Spring Boot starter which auto-configures and provides the required components to the application

3. Design

Here are a few key design points to keep in mind.spring-doc.cn

3.1. Apache Pulsar Reactive

The reactive support is ultimately provided by the Apache Pulsar Reactive client whose current implementation is a fully non-blocking adapter around the regular Pulsar client’s asynchronous API. This implies that the Reactive client requires the regular client.spring-doc.cn

3.2. Additive Auto-Configuration

Due to the dependence on the regular (imperative) client, the Reactive auto-configuration provided by the framework is additive to the imperative auto-configuration. In other words, The imperative starter only includes the imperative components but the reactive starter includes both imperative and reactive components.spring-doc.cn

4. Reactive Pulsar Client

When you use the Reactive Pulsar Spring Boot Starter, you get the ReactivePulsarClient auto-configured.spring-doc.cn

By default, the application tries to connect to a local Pulsar instance at pulsar://localhost:6650. This can be adjusted by setting the spring.pulsar.client.service-url property to a different value.spring-doc.cn

The value must be a valid Pulsar Protocol URL

There are many other application properties (inherited from the adapted imperative client) available to configure. See the spring.pulsar.client.* application properties.spring-doc.cn

4.1. Authentication

To connect to a Pulsar cluster that requires authentication, follow the same steps as the imperative client. Again, this is because the reactive client adapts the imperative client which handles all security configuration.spring-doc.cn

The value must be a valid Pulsar Protocol URL

5. Message Production

5.1. ReactivePulsarTemplate

On the Pulsar producer side, Spring Boot auto-configuration provides a ReactivePulsarTemplate for publishing records. The template implements an interface called ReactivePulsarOperations and provides methods to publish records through its contract.spring-doc.cn

The template provides send methods that accept a single message and return a Mono<MessageId>. It also provides send methods that accept multiple messages (in the form of the ReactiveStreams Publisher type) and return a Flux<MessageId>.spring-doc.cn

For the API variants that do not include a topic parameter, a topic resolution process is used to determine the destination topic.

5.1.1. Fluent API

The template provides a fluent builder to handle more complicated send requests.spring-doc.cn

5.1.2. Message customization

You can specify a MessageSpecBuilderCustomizer to configure the outgoing message. For example, the following code shows how to send a keyed message:spring-doc.cn

template.newMessage(msg)
    .withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
    .send();

5.1.3. Sender customization

You can specify a ReactiveMessageSenderBuilderCustomizer to configure the underlying Pulsar sender builder that ultimately constructs the sender used to send the outgoing message.spring-doc.cn

Use with caution as this gives full access to the sender builder and invoking some of its methods (such as create) may have unintended side effects.

For example, the following code shows how to disable batching and enable chunking:spring-doc.cn

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
    .send();

This other example shows how to use custom routing when publishing records to partitioned topics. Specify your custom MessageRouter implementation on the sender builder such as:spring-doc.cn

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
    .send();
Note that, when using a MessageRouter, the only valid setting for spring.pulsar.producer.message-routing-mode is custom.

5.2. Specifying Schema Information

If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data. For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the ReactivePulsarTemplate, the Spring for Apache Pulsar framework will try to build a Schema.JSON from the type.spring-doc.cn

Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding.

5.2.1. Custom Schema Mapping

As an alternative to specifying the schema when invoking send operations on the ReactivePulsarTemplate for complex types, the schema resolver can be configured with mappings for the types. This removes the need to specify the schema as the framework consults the resolver using the outgoing message type.spring-doc.cn

Schema mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:spring-doc.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
The message-type is the fully-qualified name of the message class.

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).spring-doc.cn

The following example uses a schema resolver customizer to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:spring-doc.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

With this configuration in place, there is no need to set specify the schema on send operations.spring-doc.cn

5.3. ReactivePulsarSenderFactory

The ReactivePulsarTemplate relies on a ReactivePulsarSenderFactory to actually create the underlying sender.spring-doc.cn

Spring Boot provides this sender factory which can be configured with any of the spring.pulsar.producer.* application properties.spring-doc.cn

If topic information is not specified when using the sender factory APIs directly, the same topic resolution process used by the ReactivePulsarTemplate is used with the one exception that the "Message type default" step is omitted.

5.3.1. Producer Caching

Each underlying Pulsar producer consumes resources. To improve performance and avoid continual creation of producers, the ReactiveMessageSenderCache in the underlying Apache Pulsar Reactive client caches the producers that it creates. They are cached in an LRU fashion and evicted when they have not been used within a configured time period.spring-doc.cn

You can configure the cache settings by specifying any of the spring.pulsar.producer.cache.* application properties.spring-doc.cn

For the API variants that do not include a topic parameter, a topic resolution process is used to determine the destination topic.
Use with caution as this gives full access to the sender builder and invoking some of its methods (such as create) may have unintended side effects.
Note that, when using a MessageRouter, the only valid setting for spring.pulsar.producer.message-routing-mode is custom.
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding.
The message-type is the fully-qualified name of the message class.
If topic information is not specified when using the sender factory APIs directly, the same topic resolution process used by the ReactivePulsarTemplate is used with the one exception that the "Message type default" step is omitted.

6. Message Consumption

6.1. @ReactivePulsarListener

When it comes to Pulsar consumers, we recommend that end-user applications use the ReactivePulsarListener annotation. To use ReactivePulsarListener, you need to use the @EnableReactivePulsar annotation. When you use Spring Boot support, it automatically enables this annotation and configures all necessary components, such as the message listener infrastructure (which is responsible for creating the underlying Pulsar consumer).spring-doc.cn

Let us revisit the ReactivePulsarListener code snippet we saw in the quick-tour section:spring-doc.cn

@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}
The listener method returns a Mono<Void> to signal whether the message was successfully processed. Mono.empty() indicates success (acknowledgment) and Mono.error() indicates failure (negative acknowledgment).

You can also further simplify this method:spring-doc.cn

@ReactivePulsarListener
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

In this most basic form, when the topics are not directly provided, a topic resolution process is used to determine the destination topic. Likewise, when the subscriptionName is not provided on the @ReactivePulsarListener annotation an auto-generated subscription name will be used.spring-doc.cn

In the ReactivePulsarListener method shown earlier, we receive the data as String, but we do not specify any schema types. Internally, the framework relies on Pulsar’s schema mechanism to convert the data to the required type. The framework detects that you expect the String type and then infers the schema type based on that information. Then it provides that schema to the consumer. For all the primitive types in Java, the framework does this inference. For any complex types (such as JSON, AVRO, and others), the framework cannot do this inference and the user needs to provide the schema type on the annotation using the schemaType property.spring-doc.cn

This example shows how we can consume complex types from a topic:spring-doc.cn

@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
    System.out.println(message);
    return Mono.empty();
}

Note the addition of a schemaType property on ReactivePulsarListener. That is because the library is not capable of inferring the schema type from the provided type: Foo. We must tell the framework what schema to use.spring-doc.cn

Let us look at a few more ways we can consume.spring-doc.cn

This example consumes the Pulsar message directly:spring-doc.cn

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
    return Mono.empty();
}

This example consumes the record wrapped in a Spring messaging envelope:spring-doc.cn

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
    return Mono.empty();
}

6.1.1. Streaming

All of the above are examples of consuming a single record one-by-one. However, one of the compelling reasons to use Reactive is for the streaming capability with backpressure support.spring-doc.cn

The following example uses ReactivePulsarListener to consume a stream of POJOs:spring-doc.cn

@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
        .map(MessageResult::acknowledge);

Here we receive the records as a Flux of Pulsar messages. In addition, to enable stream consumption at the ReactivePulsarListener level, you need to set the stream property on the annotation to true.spring-doc.cn

The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged. The MessageResult has a set of static factory methods that can be used to create the appropriate MessageResult instance.

Based on the actual type of the messages in the Flux, the framework tries to infer the schema to use. If it contains a complex type, you still need to provide the schemaType on ReactivePulsarListener.spring-doc.cn

The following listener uses the Spring messaging Message envelope with a complex type :spring-doc.cn

@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
        .map(MessageUtils::acknowledge);
}
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged. The Spring MessageUtils has a set of static factory methods that can be used to create the appropriate MessageResult instance from a Spring message.

6.1.2. Configuration - Application Properties

The listener relies on the ReactivePulsarConsumerFactory to create and manage the underlying Pulsar consumer that it uses to consume messages. Spring Boot provides this consumer factory which you can further configure by specifying the spring.pulsar.consumer.* application properties. Most of the configured properties on the factory will be respected in the listener with the following exceptions:spring-doc.cn

The spring.pulsar.consumer.subscription.name property is ignored and is instead generated when not specified on the annotation.
The spring.pulsar.consumer.subscription-type property is ignored and is instead taken from the value on the annotation. However, you can set the subscriptionType = {} on the annotation to instead use the property value as the default.

6.1.3. Consumer Customization

You can specify a ReactivePulsarListenerMessageConsumerBuilderCustomizer to configure the underlying Pulsar consumer builder that ultimately constructs the consumer used by the listener to receive the messages.spring-doc.cn

Use with caution as this gives full access to the consumer builder and invoking some of its methods (such as create) may have unintended side effects.

For example, the following code shows how to set the initial position of the subscription to the earliest messaage on the topic.spring-doc.cn

@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
    return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
If your application only has a single @ReactivePulsarListener and a single ReactivePulsarListenerMessageConsumerBuilderCustomizer bean registered then the customizer will be automatically applied.

You can also use the customizer to provide direct Pulsar consumer properties to the consumer builder. This is convenient if you do not want to use the Boot configuration properties mentioned earlier or have multiple ReactivePulsarListener methods whose configuration varies.spring-doc.cn

The following customizer example uses direct Pulsar consumer properties:spring-doc.cn

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
    return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
The properties used are direct Pulsar consumer properties, not the spring.pulsar.consumer Spring Boot configuration properties

6.2. Specifying Schema Information

As indicated earlier, for Java primitives, the Spring for Apache Pulsar framework can infer the proper Schema to use on the ReactivePulsarListener. For non-primitive types, if the Schema is not explicitly specified on the annotation, the Spring for Apache Pulsar framework will try to build a Schema.JSON from the type.spring-doc.cn

Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding.

6.2.1. Custom Schema Mapping

As an alternative to specifying the schema on the ReactivePulsarListener for complex types, the schema resolver can be configured with mappings for the types. This removes the need to set the schema on the listener as the framework consults the resolver using the incoming message type.spring-doc.cn

Schema mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:spring-doc.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
The message-type is the fully-qualified name of the message class.

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).spring-doc.cn

The following example uses a schema resolver customizer to add mappings for the User and Address complex objects using AVRO and JSON schemas, respectively:spring-doc.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

With this configuration in place, there is no need to set the schema on the listener, for example:spring-doc.cn

@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
    System.out.println(user);
    return Mono.empty();
}

6.3. Message Listener Container Infrastructure

In most scenarios, we recommend using the ReactivePulsarListener annotation directly for consuming from a Pulsar topic as that model covers a broad set of application use cases. However, it is important to understand how ReactivePulsarListener works internally.spring-doc.cn

The message listener container is at the heart of message consumption when you use Spring for Apache Pulsar. The ReactivePulsarListener uses the message listener container infrastructure behind the scenes to create and manage the underlying Pulsar consumer.spring-doc.cn

6.3.1. ReactivePulsarMessageListenerContainer

The contract for this message listener container is provided through ReactivePulsarMessageListenerContainer whose default implementation creates a reactive Pulsar consumer and wires up a reactive message pipeline that uses the created consumer.spring-doc.cn

6.3.2. ReactiveMessagePipeline

The pipeline is a feature of the underlying Apache Pulsar Reactive client which does the heavy lifting of receiving the data in a reactive manner and then handing it over to the provided message handler. The reactive message listener container implementation is much simpler because the pipeline handles the majority of the work.spring-doc.cn

6.3.3. ReactivePulsarMessageHandler

The "listener" aspect is provided by the ReactivePulsarMessageHandler of which there are two provided implementations:spring-doc.cn

  • ReactivePulsarOneByOneMessageHandler - handles a single message one-by-onespring-doc.cn

  • ReactivePulsarStreamingHandler - handles multiple messages via a Fluxspring-doc.cn

If topic information is not specified when using the listener containers directly, the same topic resolution process used by the ReactivePulsarListener is used with the one exception that the "Message type default" step is omitted.

6.4. Concurrency

When consuming records in streaming mode (stream = true) concurrency comes naturally via the underlying Reactive support in the client implementation.spring-doc.cn

However, when handling messages one-by-one, concurrency can be specified to increase processing throughput. Simply set the concurrency property on @ReactivePulsarListener. Additionally, when concurrency > 1 you can ensure messages are ordered by key and therefore sent to the same handler by setting useKeyOrderedProcessing = "true" on the annotation.spring-doc.cn

Again, the ReactiveMessagePipeline does the heavy lifting, we simply set the properties on it.spring-doc.cn

Reactive vs Imperative

Concurrency in the reactive container is different from its imperative counterpart. The latter creates multiple threads (each with a Pulsar consumer) whereas the former dispatches the messages to multiple handler instances concurrently on the Reactive parallel scheduler.spring-doc.cn

One advantage of the reactive concurrency model is that it can be used with Exclusive subscriptions whereas the imperative concurrency model can not.spring-doc.cn

6.5. Pulsar Headers

The Pulsar message metadata can be consumed as Spring message headers. The list of available headers can be found in PulsarHeaders.java.spring-doc.cn

6.5.1. Accessing In OneByOne Listener

The following example shows how you can access Pulsar Headers when using a one-by-one message listener:spring-doc.cn

@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
        @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
        @Header("foo") String foo) {
    System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
    return Mono.empty();
}

In the preceding example, we access the values for the messageId message metadata as well as a custom message property named foo. The Spring @Header annotation is used for each header field.spring-doc.cn

You can also use Pulsar’s Message as the envelope to carry the payload. When doing so, the user can directly call the corresponding methods on the Pulsar message for retrieving the metadata. However, as a convenience, you can also retrieve it by using the Header annotation. Note that you can also use the Spring messaging Message envelope to carry the payload and then retrieve the Pulsar headers by using @Header.spring-doc.cn

6.5.2. Accessing In Streaming Listener

When using a streaming message listener the header support is limited. Only when the Flux contains Spring org.springframework.messaging.Message elements will the headers be populated. Additionally, the Spring @Header annotation can not be used to retrieve the data. You must directly call the corresponding methods on the Spring message to retrieve the data.spring-doc.cn

6.6. Message Acknowledgment

The framework automatically handles message acknowledgement. However, the listener method must send a signal indicating whether the message was successfully processed. The container implementation then uses that signal to perform the ack or nack operation. This is a slightly different from its imperative counterpart where the signal is implied as positive unless the method throws an exception.spring-doc.cn

6.6.1. OneByOne Listener

The single message (aka OneByOne) message listener method returns a Mono<Void> to signal whether the message was successfully processed. Mono.empty() indicates success (acknowledgment) and Mono.error() indicates failure (negative acknowledgment).spring-doc.cn

6.6.2. Streaming Listener

The streaming listener method returns a Flux<MessageResult<Void>> where each MessageResult element represents a processed message and holds the message id, value and whether it was acknowledged. The MessageResult has a set of acknowledge and negativeAcknowledge static factory methods that can be used to create the appropriate MessageResult instance.spring-doc.cn

6.7. Message Redelivery and Error Handling

Apache Pulsar provides various native strategies for message redelivery and error handling. We will take a look at them and see how to use them through Spring for Apache Pulsar.spring-doc.cn

6.7.1. Acknowledgment Timeout

By default, Pulsar consumers do not redeliver messages unless the consumer crashes, but you can change this behavior by setting an ack timeout on the Pulsar consumer. If the ack timeout property has a value above zero and if the Pulsar consumer does not acknowledge a message within that timeout period, the message is redelivered.spring-doc.cn

You can specify this property directly as a Pulsar consumer property via a consumer customizer such as:spring-doc.cn

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("ackTimeout", "60s");
}

6.7.2. Negative Acknowledgment Redelivery Delay

When acknowledging negatively, Pulsar consumer lets you specify how the application wants the message to be re-delivered. The default is to redeliver the message in one minute, but you can change it via a consumer customizer such as:spring-doc.cn

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}

6.7.3. Dead Letter Topic

Apache Pulsar lets applications use a dead letter topic on consumers with a Shared subscription type. For the Exclusive and Failover subscription types, this feature is not available. The basic idea is that, if a message is retried a certain number of times (maybe due to an ack timeout or nack redelivery), once the number of retries are exhausted, the message can be sent to a special topic called the dead letter queue (DLQ). Let us see some details around this feature in action by inspecting some code snippets:spring-doc.cn

@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {

    @ReactivePulsarListener(
            topics = "topic-with-dlp",
            subscriptionType = SubscriptionType.Shared,
            deadLetterPolicy = "myDeadLetterPolicy",
            consumerCustomizer = "ackTimeoutCustomizer" )
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @ReactivePulsarListener(topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy myDeadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

    @Bean
    ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
        return b -> b.property("ackTimeout", "1s");
    }
}

First, we have a special bean for DeadLetterPolicy, and it is named as deadLetterPolicy (it can be any name as you wish). This bean specifies a number of things, such as the max delivery (10, in this case) and the name of the dead letter topic — my-dlq-topic, in this case. If you do not specify a DLQ topic name, it defaults to <topicname>-<subscriptionname>-DLQ in Pulsar. Next, we provide this bean name to ReactivePulsarListener by setting the deadLetterPolicy property. Note that the ReactivePulsarListener has a subscription type of Shared, as the DLQ feature only works with shared subscriptions. This code is primarily for demonstration purposes, so we provide an ackTimeout value of 1 second. The idea is that the code throws the exception and, if Pulsar does not receive an ack within 1 second, it does a retry. If that cycle continues ten times (as that is our max redelivery count in the DeadLetterPolicy), the Pulsar consumer publishes the messages to the DLQ topic. We have another ReactivePulsarListener that listens on the DLQ topic to receive data as it is published to the DLQ topic.spring-doc.cn

Special note on DLQ topics when using partitioned topics

If the main topic is partitioned, behind the scenes, each partition is treated as a separate topic by Pulsar. Pulsar appends partition-<n>, where n stands for the partition number to the main topic name. The problem is that, if you do not specify a DLQ topic (as opposed to what we did above), Pulsar publishes to a default topic name that has this `partition-<n> info in it — for example: topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ. The easy way to solve this is to provide a DLQ topic name always.spring-doc.cn

6.8. Pulsar Reader Support

The framework provides support for using Pulsar Reader in a Reactive fashion via the ReactivePulsarReaderFactory.spring-doc.cn

Spring Boot provides this reader factory which can be configured with any of the spring.pulsar.reader.* application properties.spring-doc.cn

The listener method returns a Mono<Void> to signal whether the message was successfully processed. Mono.empty() indicates success (acknowledgment) and Mono.error() indicates failure (negative acknowledgment).
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged. The MessageResult has a set of static factory methods that can be used to create the appropriate MessageResult instance.
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged. The Spring MessageUtils has a set of static factory methods that can be used to create the appropriate MessageResult instance from a Spring message.
The spring.pulsar.consumer.subscription.name property is ignored and is instead generated when not specified on the annotation.
The spring.pulsar.consumer.subscription-type property is ignored and is instead taken from the value on the annotation. However, you can set the subscriptionType = {} on the annotation to instead use the property value as the default.
Use with caution as this gives full access to the consumer builder and invoking some of its methods (such as create) may have unintended side effects.
If your application only has a single @ReactivePulsarListener and a single ReactivePulsarListenerMessageConsumerBuilderCustomizer bean registered then the customizer will be automatically applied.
The properties used are direct Pulsar consumer properties, not the spring.pulsar.consumer Spring Boot configuration properties
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding.
The message-type is the fully-qualified name of the message class.
If topic information is not specified when using the listener containers directly, the same topic resolution process used by the ReactivePulsarListener is used with the one exception that the "Message type default" step is omitted.

7. Topic Resolution

A destination topic is needed when producing or consuming messages. The framework looks in the following ordered locations to determine a topic (stopping at the first find):spring-doc.cn

When a topic is found via one of the default mechanisms, there is no need to specify the topic on the produce or consume API.spring-doc.cn

When a topic is not found, the API will throw an exception accordingly.spring-doc.cn

7.1. User specified

A topic passed into the API being used has the highest precedence (eg. PulsarTemplate.send("my-topic", myMessage) or @PulsarListener(topics = "my-topic").spring-doc.cn

7.2. Message type default

When no topic is passed into the API, the system looks for a message type to topic mapping configured for the type of the message being produced or consumed.spring-doc.cn

Mappings can be configured with the spring.pulsar.defaults.type-mappings property. The following example uses application.yml to configure default topics to use when consuming or producing Foo or Bar messages:spring-doc.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.Foo
          topic-name: foo-topic
        - message-type: com.acme.Bar
          topic-name: bar-topic
The message-type is the fully-qualified name of the message class.
If the message (or the first message of a Publisher input) is null, the framework won’t be able to determine the topic from it. Another method shall be used to specify the topic if your application is likely to send null messages.

7.2.1. Custom topic resolver

The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can replace the default resolver by proving your own implementation, for example:spring-doc.cn

@Bean
public MyTopicResolver topicResolver() {
	return new MyTopicResolver();
}

7.3. Producer global default

The final location consulted (when producing) is the system-wide producer default topic. It is configured via the spring.pulsar.producer.topic-name property when using the imperative API and the spring.pulsar.reactive.sender.topic-name property when using the reactive API.spring-doc.cn

7.4. Consumer global default

The final location consulted (when consuming) is the system-wide consumer default topic. It is configured via the spring.pulsar.consumer.topics or spring.pulsar.consumer.topics-pattern property when using the imperative API and one of the spring.pulsar.reactive.consumer.topics or spring.pulsar.reactive.consumer.topics-pattern property when using the reactive API.spring-doc.cn

The message-type is the fully-qualified name of the message class.
If the message (or the first message of a Publisher input) is null, the framework won’t be able to determine the topic from it. Another method shall be used to specify the topic if your application is likely to send null messages.