1. @ReactivePulsarListener
When it comes to Pulsar consumers, we recommend that end-user applications use the ReactivePulsarListener
annotation.
To use ReactivePulsarListener
, you need to use the @EnableReactivePulsar
annotation.
When you use Spring Boot support, it automatically enables this annotation and configures all necessary components, such as the message listener infrastructure (which is responsible for creating the underlying Pulsar consumer).
Let us revisit the ReactivePulsarListener
code snippet we saw in the quick-tour section:
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
The listener method returns a Mono<Void> to signal whether the message was successfully processed. Mono.empty() indicates success (acknowledgment) and Mono.error() indicates failure (negative acknowledgment).
|
You can also further simplify this method:
@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
In this most basic form, when the topics
are not directly provided, a topic resolution process is used to determine the destination topic.
Likewise, when the subscriptionName
is not provided on the @ReactivePulsarListener
annotation an auto-generated subscription name will be used.
In the ReactivePulsarListener
method shown earlier, we receive the data as String
, but we do not specify any schema types.
Internally, the framework relies on Pulsar’s schema mechanism to convert the data to the required type.
The framework detects that you expect the String
type and then infers the schema type based on that information and provides that schema to the consumer.
The framework does this inference for all primitive types.
For all non-primitive types the default schema is assumed to be JSON.
If a complex type is using anything besides JSON (such as AVRO or KEY_VALUE) you must provide the schema type on the annotation using the schemaType
property.
This example shows how we can consume complex types from a topic:
@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
Let us look at a few more ways we can consume.
This example consumes the Pulsar message directly:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
This example consumes the record wrapped in a Spring messaging envelope:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
1.1. Streaming
All of the above are examples of consuming a single record one-by-one. However, one of the compelling reasons to use Reactive is for the streaming capability with backpressure support.
The following example uses ReactivePulsarListener
to consume a stream of POJOs:
@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
Here we receive the records as a Flux
of Pulsar messages.
In addition, to enable stream consumption at the ReactivePulsarListener
level, you need to set the stream
property on the annotation to true
.
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged.
The MessageResult has a set of static factory methods that can be used to create the appropriate MessageResult instance.
|
Based on the actual type of the messages in the Flux
, the framework tries to infer the schema to use.
If it contains a complex type, you still need to provide the schemaType
on ReactivePulsarListener
.
The following listener uses the Spring messaging Message
envelope with a complex type :
@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged.
The Spring MessageUtils has a set of static factory methods that can be used to create the appropriate MessageResult instance from a Spring message.
The MessageUtils provides the same functionality for Spring messages as the set of factory methods on MessagResult does for Pulsar messages.
|
There is no support for using org.apache.pulsar.client.api.Messages<T> in a @ReactivePulsarListener
|
1.2. Configuration - Application Properties
The listener relies on the ReactivePulsarConsumerFactory
to create and manage the underlying Pulsar consumer that it uses to consume messages.
Spring Boot provides this consumer factory which you can further configure by specifying the spring.pulsar.consumer.*
application properties.
1.3. Generic records with AUTO_CONSUME
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the AUTO_CONSUME
schema type to consume generic records.
In this case, the topic deserializes messages into GenericRecord
objects using the schema info associated with the topic.
To consume generic records set the schemaType = SchemaType.AUTO_CONSUME
on your @ReactivePulsarListener
and use a Pulsar message of type GenericRecord
as the message parameter as shown below.
@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
return Mono.empty();
}
The GenericRecord API allows access to the fields and their associated values
|
1.4. Consumer Customization
You can specify a ReactivePulsarListenerMessageConsumerBuilderCustomizer
to configure the underlying Pulsar consumer builder that ultimately constructs the consumer used by the listener to receive the messages.
Use with caution as this gives full access to the consumer builder and invoking some of its methods (such as create ) may have unintended side effects.
|
For example, the following code shows how to set the initial position of the subscription to the earliest messaage on the topic.
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
If your application only has a single @ReactivePulsarListener and a single ReactivePulsarListenerMessageConsumerBuilderCustomizer bean registered then the customizer will be automatically applied.
|
You can also use the customizer to provide direct Pulsar consumer properties to the consumer builder.
This is convenient if you do not want to use the Boot configuration properties mentioned earlier or have multiple ReactivePulsarListener
methods whose configuration varies.
The following customizer example uses direct Pulsar consumer properties:
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
The properties used are direct Pulsar consumer properties, not the spring.pulsar.consumer Spring Boot configuration properties
|
The listener method returns a Mono<Void> to signal whether the message was successfully processed. Mono.empty() indicates success (acknowledgment) and Mono.error() indicates failure (negative acknowledgment).
|
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged.
The MessageResult has a set of static factory methods that can be used to create the appropriate MessageResult instance.
|
The listener method returns a Flux<MessageResult<Void>> where each element represents a processed message and holds the message id, value and whether it was acknowledged.
The Spring MessageUtils has a set of static factory methods that can be used to create the appropriate MessageResult instance from a Spring message.
The MessageUtils provides the same functionality for Spring messages as the set of factory methods on MessagResult does for Pulsar messages.
|
There is no support for using org.apache.pulsar.client.api.Messages<T> in a @ReactivePulsarListener
|
The GenericRecord API allows access to the fields and their associated values
|
Use with caution as this gives full access to the consumer builder and invoking some of its methods (such as create ) may have unintended side effects.
|
If your application only has a single @ReactivePulsarListener and a single ReactivePulsarListenerMessageConsumerBuilderCustomizer bean registered then the customizer will be automatically applied.
|
The properties used are direct Pulsar consumer properties, not the spring.pulsar.consumer Spring Boot configuration properties
|
2. Specifying Schema Information
As indicated earlier, for Java primitives, the Spring for Apache Pulsar framework can infer the proper Schema to use on the ReactivePulsarListener
.
For non-primitive types, if the Schema is not explicitly specified on the annotation, the Spring for Apache Pulsar framework will try to build a Schema.JSON
from the type.
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_CONSUME, KEY_VALUE w/ INLINE encoding. |
2.1. Custom Schema Mapping
As an alternative to specifying the schema on the ReactivePulsarListener
for complex types, the schema resolver can be configured with mappings for the types.
This removes the need to set the schema on the listener as the framework consults the resolver using the incoming message type.
2.1.1. Configuration properties
Schema mappings can be configured with the spring.pulsar.defaults.type-mappings
property.
The following example uses application.yml
to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
The message-type is the fully-qualified name of the message class.
|
2.1.2. Schema resolver customizer
The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).
The following example uses a schema resolver customizer to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. Type mapping annotation
Another option for specifying default schema information to use for a particular message type is to mark the message class with the @PulsarMessage
annotation.
The schema info can be specified via the schemaType
attribute on the annotation.
The following example configures the system to use JSON as the default schema when producing or consuming messages of type Foo
:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
With this configuration in place, there is no need to set the schema on the listener, for example:
@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_CONSUME, KEY_VALUE w/ INLINE encoding. |
The message-type is the fully-qualified name of the message class.
|
3. Message Listener Container Infrastructure
In most scenarios, we recommend using the ReactivePulsarListener
annotation directly for consuming from a Pulsar topic as that model covers a broad set of application use cases.
However, it is important to understand how ReactivePulsarListener
works internally.
The message listener container is at the heart of message consumption when you use Spring for Apache Pulsar.
The ReactivePulsarListener
uses the message listener container infrastructure behind the scenes to create and manage the underlying Pulsar consumer.
3.1. ReactivePulsarMessageListenerContainer
The contract for this message listener container is provided through ReactivePulsarMessageListenerContainer
whose default implementation creates a reactive Pulsar consumer and wires up a reactive message pipeline that uses the created consumer.
3.2. ReactiveMessagePipeline
The pipeline is a feature of the underlying Apache Pulsar Reactive client which does the heavy lifting of receiving the data in a reactive manner and then handing it over to the provided message handler. The reactive message listener container implementation is much simpler because the pipeline handles the majority of the work.
3.3. ReactivePulsarMessageHandler
The "listener" aspect is provided by the ReactivePulsarMessageHandler
of which there are two provided implementations:
-
ReactivePulsarOneByOneMessageHandler
- handles a single message one-by-one -
ReactivePulsarStreamingHandler
- handles multiple messages via aFlux
If topic information is not specified when using the listener containers directly, the same topic resolution process used by the ReactivePulsarListener is used with the one exception that the "Message type default" step is omitted.
|
3.4. Handling Startup Failures
Message listener containers are started when the application context is refreshed.
By default, any failures encountered during startup are re-thrown and the application will fail to start.
You can adjust this behavior with the StartupFailurePolicy
on the corresponding container properties.
The available options are:
-
Stop
(default) - log and re-throw the exception, effectively stopping the application -
Continue
- log the exception, leave the container in a non-running state, but do not stop the application -
Retry
- log the exception, retry to start the container asynchronously, but do not stop the application.
The default retry behavior is to retry 3 times with a 10-second delay between each attempt. However, a custom retry template can be specified on the corresponding container properties. If the container fails to restart after the retries are exhausted, it is left in a non-running state.
3.4.1. Configuration
With Spring Boot
When using Spring Boot you can register a PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>>
bean that sets the container startup properties.
Without Spring Boot
However, if you are instead manually configuring the components, you will have to update the container startup properties accordingly when constructing the message listener container factory.
If topic information is not specified when using the listener containers directly, the same topic resolution process used by the ReactivePulsarListener is used with the one exception that the "Message type default" step is omitted.
|
4. Concurrency
When consuming records in streaming mode (stream = true
) concurrency comes naturally via the underlying Reactive support in the client implementation.
However, when handling messages one-by-one, concurrency can be specified to increase processing throughput.
Simply set the concurrency
property on @ReactivePulsarListener
.
Additionally, when concurrency > 1
you can ensure messages are ordered by key and therefore sent to the same handler by setting useKeyOrderedProcessing = "true"
on the annotation.
Again, the ReactiveMessagePipeline
does the heavy lifting, we simply set the properties on it.
5. Pulsar Headers
The Pulsar message metadata can be consumed as Spring message headers. The list of available headers can be found in PulsarHeaders.java.
5.1. Accessing In OneByOne Listener
The following example shows how you can access Pulsar Headers when using a one-by-one message listener:
@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
In the preceding example, we access the values for the messageId
message metadata as well as a custom message property named foo
.
The Spring @Header
annotation is used for each header field.
You can also use Pulsar’s Message
as the envelope to carry the payload.
When doing so, the user can directly call the corresponding methods on the Pulsar message for retrieving the metadata.
However, as a convenience, you can also retrieve it by using the Header
annotation.
Note that you can also use the Spring messaging Message
envelope to carry the payload and then retrieve the Pulsar headers by using @Header
.
5.2. Accessing In Streaming Listener
When using a streaming message listener the header support is limited.
Only when the Flux
contains Spring org.springframework.messaging.Message
elements will the headers be populated.
Additionally, the Spring @Header
annotation can not be used to retrieve the data.
You must directly call the corresponding methods on the Spring message to retrieve the data.
6. Message Acknowledgment
The framework automatically handles message acknowledgement. However, the listener method must send a signal indicating whether the message was successfully processed. The container implementation then uses that signal to perform the ack or nack operation. This is a slightly different from its imperative counterpart where the signal is implied as positive unless the method throws an exception.
6.1. OneByOne Listener
The single message (aka OneByOne) message listener method returns a Mono<Void>
to signal whether the message was successfully processed. Mono.empty()
indicates success (acknowledgment) and Mono.error()
indicates failure (negative acknowledgment).
6.2. Streaming Listener
The streaming listener method returns a Flux<MessageResult<Void>>
where each MessageResult
element represents a processed message and holds the message id, value and whether it was acknowledged. The MessageResult
has a set of acknowledge
and negativeAcknowledge
static factory methods that can be used to create the appropriate MessageResult
instance.
7. Message Redelivery and Error Handling
Apache Pulsar provides various native strategies for message redelivery and error handling. We will take a look at them and see how to use them through Spring for Apache Pulsar.
7.1. Acknowledgment Timeout
By default, Pulsar consumers do not redeliver messages unless the consumer crashes, but you can change this behavior by setting an ack timeout on the Pulsar consumer. If the ack timeout property has a value above zero and if the Pulsar consumer does not acknowledge a message within that timeout period, the message is redelivered.
You can specify this property directly as a Pulsar consumer property via a consumer customizer such as:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeout", "60s");
}
7.2. Negative Acknowledgment Redelivery Delay
When acknowledging negatively, Pulsar consumer lets you specify how the application wants the message to be re-delivered. The default is to redeliver the message in one minute, but you can change it via a consumer customizer such as:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
7.3. Dead Letter Topic
Apache Pulsar lets applications use a dead letter topic on consumers with a Shared
subscription type.
For the Exclusive
and Failover
subscription types, this feature is not available.
The basic idea is that, if a message is retried a certain number of times (maybe due to an ack timeout or nack redelivery), once the number of retries are exhausted, the message can be sent to a special topic called the dead letter queue (DLQ).
Let us see some details around this feature in action by inspecting some code snippets:
@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {
@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeout", "1s");
}
}
First, we have a special bean for DeadLetterPolicy
, and it is named as deadLetterPolicy
(it can be any name as you wish).
This bean specifies a number of things, such as the max delivery (10, in this case) and the name of the dead letter topic — my-dlq-topic
, in this case.
If you do not specify a DLQ topic name, it defaults to <topicname>-<subscriptionname>-DLQ
in Pulsar.
Next, we provide this bean name to ReactivePulsarListener
by setting the deadLetterPolicy
property.
Note that the ReactivePulsarListener
has a subscription type of Shared
, as the DLQ feature only works with shared subscriptions.
This code is primarily for demonstration purposes, so we provide an ackTimeout
value of 1 second.
The idea is that the code throws the exception and, if Pulsar does not receive an ack within 1 second, it does a retry.
If that cycle continues ten times (as that is our max redelivery count in the DeadLetterPolicy
), the Pulsar consumer publishes the messages to the DLQ topic.
We have another ReactivePulsarListener
that listens on the DLQ topic to receive data as it is published to the DLQ topic.
8. Pulsar Reader Support
The framework provides support for using Pulsar Reader in a Reactive fashion via the ReactivePulsarReaderFactory
.
Spring Boot provides this reader factory which can be configured with any of the spring.pulsar.reader.*
application properties.