The framework provides a Reactive counterpart for almost all supported features.
If you put the word
|
However, the following is not yet supported:
-
Error Handling in non-shared subscriptions
-
Accessing Pulsar headers via
@Header
in streaming mode -
Observations
If you put the word
|
1. Preface
We recommend using a Spring-Boot-First approach for Spring for Apache Pulsar-based applications, as that simplifies things tremendously.
To do so, you can add the spring-pulsar-reactive-spring-boot-starter module as a dependency.
|
The majority of this reference expects the reader to be using the starter and gives most directions for configuration with that in mind. However, an effort is made to call out when instructions are specific to the Spring Boot starter usage. |
We recommend using a Spring-Boot-First approach for Spring for Apache Pulsar-based applications, as that simplifies things tremendously.
To do so, you can add the spring-pulsar-reactive-spring-boot-starter module as a dependency.
|
The majority of this reference expects the reader to be using the starter and gives most directions for configuration with that in mind. However, an effort is made to call out when instructions are specific to the Spring Boot starter usage. |
2. Quick Tour
We will take a quick tour of the Reactive support in Spring for Apache Pulsar by showing a sample Spring Boot application that produces and consumes in a Reactive fashion.
This is a complete application and does not require any additional configuration, as long as you have a Pulsar cluster running on the default location - localhost:6650
.
2.1. Dependencies
Spring Boot applications need only the spring-pulsar-reactive-spring-boot-starter
dependency. The following listings show how to define the dependency for Maven and Gradle, respectively:
-
Maven
-
Gradle
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar-reactive</artifactId>
<version>3.2.11-SNAPSHOT</version>
</dependency>
</dependencies>
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-pulsar-reactive:3.2.11-SNAPSHOT'
}
When using
|
2.2. Application Code
Here is the application source code:
@SpringBootApplication
public class ReactiveSpringPulsarHelloWorld {
public static void main(String[] args) {
SpringApplication.run(ReactiveSpringPulsarHelloWorld.class, args);
}
@Bean
ApplicationRunner runner(ReactivePulsarTemplate<String> pulsarTemplate) {
return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Reactive Pulsar World!").subscribe();
}
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println("Reactive listener received: " + message);
return Mono.empty();
}
}
That is it, with just a few lines of code we have a working Spring Boot app that is producing and consuming messages from a Pulsar topic in a Reactive fashion.
Once started, the application uses a ReactivePulsarTemplate
to send messages to the hello-pulsar-topic
.
It then consumes from the hello-pulsar-topic
using a @ReactivePulsarListener
.
One of the key ingredients to the simplicity is the Spring Boot starter which auto-configures and provides the required components to the application |
When using
|
One of the key ingredients to the simplicity is the Spring Boot starter which auto-configures and provides the required components to the application |
3. Design
Here are a few key design points to keep in mind.
3.1. Apache Pulsar Reactive
The reactive support is ultimately provided by the Apache Pulsar Reactive client whose current implementation is a fully non-blocking adapter around the regular Pulsar client’s asynchronous API. This implies that the Reactive client requires the regular client.
3.2. Additive Auto-Configuration
Due to the dependence on the regular (imperative) client, the Reactive auto-configuration provided by the framework is additive to the imperative auto-configuration. In other words, The imperative starter only includes the imperative components but the reactive starter includes both imperative and reactive components.
4. Reactive Pulsar Client
When you use the Reactive Pulsar Spring Boot Starter, you get the ReactivePulsarClient
auto-configured.
By default, the application tries to connect to a local Pulsar instance at pulsar://localhost:6650
.
This can be adjusted by setting the spring.pulsar.client.service-url
property to a different value.
The value must be a valid Pulsar Protocol URL |
There are many other application properties (inherited from the adapted imperative client) available to configure.
See the spring.pulsar.client.*
application properties.
4.1. Authentication
To connect to a Pulsar cluster that requires authentication, follow the same steps as the imperative client. Again, this is because the reactive client adapts the imperative client which handles all security configuration.
The value must be a valid Pulsar Protocol URL |
5. Message Production
5.1. ReactivePulsarTemplate
On the Pulsar producer side, Spring Boot auto-configuration provides a ReactivePulsarTemplate
for publishing records. The template implements an interface called ReactivePulsarOperations
and provides methods to publish records through its contract.
The template provides send methods that accept a single message and return a Mono<MessageId>
.
It also provides send methods that accept multiple messages (in the form of the ReactiveStreams Publisher
type) and return a Flux<MessageId>
.
For the API variants that do not include a topic parameter, a topic resolution process is used to determine the destination topic. |
5.1.1. Fluent API
The template provides a fluent builder to handle more complicated send requests.
5.1.2. Message customization
You can specify a MessageSpecBuilderCustomizer
to configure the outgoing message. For example, the following code shows how to send a keyed message:
template.newMessage(msg)
.withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
.send();
5.1.3. Sender customization
You can specify a ReactiveMessageSenderBuilderCustomizer
to configure the underlying Pulsar sender builder that ultimately constructs the sender used to send the outgoing message.
Use with caution as this gives full access to the sender builder and invoking some of its methods (such as create ) may have unintended side effects.
|
For example, the following code shows how to disable batching and enable chunking:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
.send();
This other example shows how to use custom routing when publishing records to partitioned topics.
Specify your custom MessageRouter
implementation on the sender builder such as:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
.send();
Note that, when using a MessageRouter , the only valid setting for spring.pulsar.producer.message-routing-mode is custom .
|
5.2. Specifying Schema Information
If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data.
For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the ReactivePulsarTemplate
, the Spring for Apache Pulsar framework will try to build a Schema.JSON
from the type.
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding. |
5.2.1. Custom Schema Mapping
As an alternative to specifying the schema when invoking send operations on the ReactivePulsarTemplate
for complex types, the schema resolver can be configured with mappings for the types.
This removes the need to specify the schema as the framework consults the resolver using the outgoing message type.
Schema mappings can be configured with the spring.pulsar.defaults.type-mappings
property.
The following example uses application.yml
to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
The message-type is the fully-qualified name of the message class.
|
The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).
The following example uses a schema resolver customizer to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
With this configuration in place, there is no need to set specify the schema on send operations.
5.3. ReactivePulsarSenderFactory
The ReactivePulsarTemplate
relies on a ReactivePulsarSenderFactory
to actually create the underlying sender.
Spring Boot provides this sender factory which can be configured with any of the spring.pulsar.producer.*
application properties.
If topic information is not specified when using the sender factory APIs directly, the same topic resolution process used by the ReactivePulsarTemplate is used with the one exception that the "Message type default" step is omitted.
|
5.3.1. Producer Caching
Each underlying Pulsar producer consumes resources.
To improve performance and avoid continual creation of producers, the ReactiveMessageSenderCache
in the underlying Apache Pulsar Reactive client caches the producers that it creates.
They are cached in an LRU fashion and evicted when they have not been used within a configured time period.
You can configure the cache settings by specifying any of the spring.pulsar.producer.cache.*
application properties.
For the API variants that do not include a topic parameter, a topic resolution process is used to determine the destination topic. |
Use with caution as this gives full access to the sender builder and invoking some of its methods (such as create ) may have unintended side effects.
|
Note that, when using a MessageRouter , the only valid setting for spring.pulsar.producer.message-routing-mode is custom .
|
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding. |
The message-type is the fully-qualified name of the message class.
|
If topic information is not specified when using the sender factory APIs directly, the same topic resolution process used by the ReactivePulsarTemplate is used with the one exception that the "Message type default" step is omitted.
|
6. Message Consumption
6.1. @ReactivePulsarListener
When it comes to Pulsar consumers, we recommend that end-user applications use the ReactivePulsarListener
annotation.
To use ReactivePulsarListener
, you need to use the @EnableReactivePulsar
annotation.
When you use Spring Boot support, it automatically enables this annotation and configures all necessary components, such as the message listener infrastructure (which is responsible for creating the underlying Pulsar consumer).
Let us revisit the ReactivePulsarListener
code snippet we saw in the quick-tour section:
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
The listener method returns a Mono<Void> to signal whether the message was successfully processed. Mono.empty() indicates success (acknowledgment) and Mono.error() indicates failure (negative acknowledgment).
|
You can also further simplify this method:
@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
In this most basic form, when the topics
are not directly provided, a topic resolution process is used to determine the destination topic.
Likewise, when the subscriptionName
is not provided on the @ReactivePulsarListener
annotation an auto-generated subscription name will be used.
In the ReactivePulsarListener
method shown earlier, we receive the data as String
, but we do not specify any schema types.
Internally, the framework relies on Pulsar’s schema mechanism to convert the data to the required type.
The framework detects that you expect the String
type and then infers the schema type based on that information.
Then it provides that schema to the consumer.
For all the primitive types in Java, the framework does this inference.
For any complex types (such as JSON, AVRO, and others), the framework cannot do this inference and the user needs to provide the schema type on the annotation using the schemaType
property.
This example shows how we can consume complex types from a topic:
@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
Note the addition of a schemaType
property on ReactivePulsarListener
.
That is because the library is not capable of inferring the schema type from the provided type: Foo
. We must tell the framework what schema to use.
Let us look at a few more ways we can consume.
This example consumes the Pulsar message directly:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
This example consumes the record wrapped in a Spring messaging envelope:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
6.1.1. Streaming
All of the above are examples of consuming a single record one-by-one. However, one of the compelling reasons to use Reactive is for the streaming capability with backpressure support.
The following example uses ReactivePulsarListener
to consume a stream of POJOs:
@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
Here we receive the records as a Flux
of Pulsar messages.
In addition, to enable stream consumption at the ReactivePulsarListener
level, you need to set the stream
property on the annotation to true
.
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged.
The MessageResult has a set of static factory methods that can be used to create the appropriate MessageResult instance.
|
Based on the actual type of the messages in the Flux
, the framework tries to infer the schema to use.
If it contains a complex type, you still need to provide the schemaType
on ReactivePulsarListener
.
The following listener uses the Spring messaging Message
envelope with a complex type :
@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged.
The Spring MessageUtils has a set of static factory methods that can be used to create the appropriate MessageResult instance from a Spring message.
|
6.1.2. Configuration - Application Properties
The listener relies on the ReactivePulsarConsumerFactory
to create and manage the underlying Pulsar consumer that it uses to consume messages.
Spring Boot provides this consumer factory which you can further configure by specifying the spring.pulsar.consumer.*
application properties.
Most of the configured properties on the factory will be respected in the listener with the following exceptions:
The spring.pulsar.consumer.subscription.name property is ignored and is instead generated when not specified on the annotation.
|
The spring.pulsar.consumer.subscription-type property is ignored and is instead taken from the value on the annotation. However, you can set the subscriptionType = {} on the annotation to instead use the property value as the default.
|
6.1.3. Consumer Customization
You can specify a ReactivePulsarListenerMessageConsumerBuilderCustomizer
to configure the underlying Pulsar consumer builder that ultimately constructs the consumer used by the listener to receive the messages.
Use with caution as this gives full access to the consumer builder and invoking some of its methods (such as create ) may have unintended side effects.
|
For example, the following code shows how to set the initial position of the subscription to the earliest messaage on the topic.
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
If your application only has a single @ReactivePulsarListener and a single ReactivePulsarListenerMessageConsumerBuilderCustomizer bean registered then the customizer will be automatically applied.
|
You can also use the customizer to provide direct Pulsar consumer properties to the consumer builder.
This is convenient if you do not want to use the Boot configuration properties mentioned earlier or have multiple ReactivePulsarListener
methods whose configuration varies.
The following customizer example uses direct Pulsar consumer properties:
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
The properties used are direct Pulsar consumer properties, not the spring.pulsar.consumer Spring Boot configuration properties
|
6.2. Specifying Schema Information
As indicated earlier, for Java primitives, the Spring for Apache Pulsar framework can infer the proper Schema to use on the ReactivePulsarListener
.
For non-primitive types, if the Schema is not explicitly specified on the annotation, the Spring for Apache Pulsar framework will try to build a Schema.JSON
from the type.
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding. |
6.2.1. Custom Schema Mapping
As an alternative to specifying the schema on the ReactivePulsarListener
for complex types, the schema resolver can be configured with mappings for the types.
This removes the need to set the schema on the listener as the framework consults the resolver using the incoming message type.
Schema mappings can be configured with the spring.pulsar.defaults.type-mappings
property.
The following example uses application.yml
to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
The message-type is the fully-qualified name of the message class.
|
The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).
The following example uses a schema resolver customizer to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
With this configuration in place, there is no need to set the schema on the listener, for example:
@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
6.3. Message Listener Container Infrastructure
In most scenarios, we recommend using the ReactivePulsarListener
annotation directly for consuming from a Pulsar topic as that model covers a broad set of application use cases.
However, it is important to understand how ReactivePulsarListener
works internally.
The message listener container is at the heart of message consumption when you use Spring for Apache Pulsar.
The ReactivePulsarListener
uses the message listener container infrastructure behind the scenes to create and manage the underlying Pulsar consumer.
6.3.1. ReactivePulsarMessageListenerContainer
The contract for this message listener container is provided through ReactivePulsarMessageListenerContainer
whose default implementation creates a reactive Pulsar consumer and wires up a reactive message pipeline that uses the created consumer.
6.3.2. ReactiveMessagePipeline
The pipeline is a feature of the underlying Apache Pulsar Reactive client which does the heavy lifting of receiving the data in a reactive manner and then handing it over to the provided message handler. The reactive message listener container implementation is much simpler because the pipeline handles the majority of the work.
6.3.3. ReactivePulsarMessageHandler
The "listener" aspect is provided by the ReactivePulsarMessageHandler
of which there are two provided implementations:
-
ReactivePulsarOneByOneMessageHandler
- handles a single message one-by-one -
ReactivePulsarStreamingHandler
- handles multiple messages via aFlux
If topic information is not specified when using the listener containers directly, the same topic resolution process used by the ReactivePulsarListener is used with the one exception that the "Message type default" step is omitted.
|
6.4. Concurrency
When consuming records in streaming mode (stream = true
) concurrency comes naturally via the underlying Reactive support in the client implementation.
However, when handling messages one-by-one, concurrency can be specified to increase processing throughput.
Simply set the concurrency
property on @ReactivePulsarListener
.
Additionally, when concurrency > 1
you can ensure messages are ordered by key and therefore sent to the same handler by setting useKeyOrderedProcessing = "true"
on the annotation.
Again, the ReactiveMessagePipeline
does the heavy lifting, we simply set the properties on it.
6.5. Pulsar Headers
The Pulsar message metadata can be consumed as Spring message headers. The list of available headers can be found in PulsarHeaders.java.
6.5.1. Accessing In OneByOne Listener
The following example shows how you can access Pulsar Headers when using a one-by-one message listener:
@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
In the preceding example, we access the values for the messageId
message metadata as well as a custom message property named foo
.
The Spring @Header
annotation is used for each header field.
You can also use Pulsar’s Message
as the envelope to carry the payload.
When doing so, the user can directly call the corresponding methods on the Pulsar message for retrieving the metadata.
However, as a convenience, you can also retrieve it by using the Header
annotation.
Note that you can also use the Spring messaging Message
envelope to carry the payload and then retrieve the Pulsar headers by using @Header
.
6.5.2. Accessing In Streaming Listener
When using a streaming message listener the header support is limited.
Only when the Flux
contains Spring org.springframework.messaging.Message
elements will the headers be populated.
Additionally, the Spring @Header
annotation can not be used to retrieve the data.
You must directly call the corresponding methods on the Spring message to retrieve the data.
6.6. Message Acknowledgment
The framework automatically handles message acknowledgement. However, the listener method must send a signal indicating whether the message was successfully processed. The container implementation then uses that signal to perform the ack or nack operation. This is a slightly different from its imperative counterpart where the signal is implied as positive unless the method throws an exception.
6.6.1. OneByOne Listener
The single message (aka OneByOne) message listener method returns a Mono<Void>
to signal whether the message was successfully processed. Mono.empty()
indicates success (acknowledgment) and Mono.error()
indicates failure (negative acknowledgment).
6.6.2. Streaming Listener
The streaming listener method returns a Flux<MessageResult<Void>>
where each MessageResult
element represents a processed message and holds the message id, value and whether it was acknowledged. The MessageResult
has a set of acknowledge
and negativeAcknowledge
static factory methods that can be used to create the appropriate MessageResult
instance.
6.7. Message Redelivery and Error Handling
Apache Pulsar provides various native strategies for message redelivery and error handling. We will take a look at them and see how to use them through Spring for Apache Pulsar.
6.7.1. Acknowledgment Timeout
By default, Pulsar consumers do not redeliver messages unless the consumer crashes, but you can change this behavior by setting an ack timeout on the Pulsar consumer. If the ack timeout property has a value above zero and if the Pulsar consumer does not acknowledge a message within that timeout period, the message is redelivered.
You can specify this property directly as a Pulsar consumer property via a consumer customizer such as:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeout", "60s");
}
6.7.2. Negative Acknowledgment Redelivery Delay
When acknowledging negatively, Pulsar consumer lets you specify how the application wants the message to be re-delivered. The default is to redeliver the message in one minute, but you can change it via a consumer customizer such as:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
6.7.3. Dead Letter Topic
Apache Pulsar lets applications use a dead letter topic on consumers with a Shared
subscription type.
For the Exclusive
and Failover
subscription types, this feature is not available.
The basic idea is that, if a message is retried a certain number of times (maybe due to an ack timeout or nack redelivery), once the number of retries are exhausted, the message can be sent to a special topic called the dead letter queue (DLQ).
Let us see some details around this feature in action by inspecting some code snippets:
@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {
@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeout", "1s");
}
}
First, we have a special bean for DeadLetterPolicy
, and it is named as deadLetterPolicy
(it can be any name as you wish).
This bean specifies a number of things, such as the max delivery (10, in this case) and the name of the dead letter topic — my-dlq-topic
, in this case.
If you do not specify a DLQ topic name, it defaults to <topicname>-<subscriptionname>-DLQ
in Pulsar.
Next, we provide this bean name to ReactivePulsarListener
by setting the deadLetterPolicy
property.
Note that the ReactivePulsarListener
has a subscription type of Shared
, as the DLQ feature only works with shared subscriptions.
This code is primarily for demonstration purposes, so we provide an ackTimeout
value of 1 second.
The idea is that the code throws the exception and, if Pulsar does not receive an ack within 1 second, it does a retry.
If that cycle continues ten times (as that is our max redelivery count in the DeadLetterPolicy
), the Pulsar consumer publishes the messages to the DLQ topic.
We have another ReactivePulsarListener
that listens on the DLQ topic to receive data as it is published to the DLQ topic.
6.8. Pulsar Reader Support
The framework provides support for using Pulsar Reader in a Reactive fashion via the ReactivePulsarReaderFactory
.
Spring Boot provides this reader factory which can be configured with any of the spring.pulsar.reader.*
application properties.
The listener method returns a Mono<Void> to signal whether the message was successfully processed. Mono.empty() indicates success (acknowledgment) and Mono.error() indicates failure (negative acknowledgment).
|
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged.
The MessageResult has a set of static factory methods that can be used to create the appropriate MessageResult instance.
|
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged.
The Spring MessageUtils has a set of static factory methods that can be used to create the appropriate MessageResult instance from a Spring message.
|
The spring.pulsar.consumer.subscription.name property is ignored and is instead generated when not specified on the annotation.
|
The spring.pulsar.consumer.subscription-type property is ignored and is instead taken from the value on the annotation. However, you can set the subscriptionType = {} on the annotation to instead use the property value as the default.
|
Use with caution as this gives full access to the consumer builder and invoking some of its methods (such as create ) may have unintended side effects.
|
If your application only has a single @ReactivePulsarListener and a single ReactivePulsarListenerMessageConsumerBuilderCustomizer bean registered then the customizer will be automatically applied.
|
The properties used are direct Pulsar consumer properties, not the spring.pulsar.consumer Spring Boot configuration properties
|
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding. |
The message-type is the fully-qualified name of the message class.
|
If topic information is not specified when using the listener containers directly, the same topic resolution process used by the ReactivePulsarListener is used with the one exception that the "Message type default" step is omitted.
|
7. Topic Resolution
A destination topic is needed when producing or consuming messages. The framework looks in the following ordered locations to determine a topic (stopping at the first find):
-
User specified
-
Message type default
-
Global default
When a topic is found via one of the default mechanisms, there is no need to specify the topic on the produce or consume API.
When a topic is not found, the API will throw an exception accordingly.
7.1. User specified
A topic passed into the API being used has the highest precedence (eg. PulsarTemplate.send("my-topic", myMessage)
or @PulsarListener(topics = "my-topic"
).
7.2. Message type default
When no topic is passed into the API, the system looks for a message type to topic mapping configured for the type of the message being produced or consumed.
Mappings can be configured with the spring.pulsar.defaults.type-mappings
property.
The following example uses application.yml
to configure default topics to use when consuming or producing Foo
or Bar
messages:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.Foo
topic-name: foo-topic
- message-type: com.acme.Bar
topic-name: bar-topic
The message-type is the fully-qualified name of the message class.
|
If the message (or the first message of a Publisher input) is null , the framework won’t be able to determine the topic from it. Another method shall be used to specify the topic if your application is likely to send null messages.
|
7.2.1. Custom topic resolver
The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can replace the default resolver by proving your own implementation, for example:
@Bean
public MyTopicResolver topicResolver() {
return new MyTopicResolver();
}
7.3. Producer global default
The final location consulted (when producing) is the system-wide producer default topic.
It is configured via the spring.pulsar.producer.topic-name
property when using the imperative API and the spring.pulsar.reactive.sender.topic-name
property when using the reactive API.
7.4. Consumer global default
The final location consulted (when consuming) is the system-wide consumer default topic.
It is configured via the spring.pulsar.consumer.topics
or spring.pulsar.consumer.topics-pattern
property when using the imperative API and one of the spring.pulsar.reactive.consumer.topics
or spring.pulsar.reactive.consumer.topics-pattern
property when using the reactive API.
The message-type is the fully-qualified name of the message class.
|
If the message (or the first message of a Publisher input) is null , the framework won’t be able to determine the topic from it. Another method shall be used to specify the topic if your application is likely to send null messages.
|