Message Production
1. Pulsar Template
On the Pulsar producer side, Spring Boot auto-configuration provides a PulsarTemplate
for publishing records. The template implements an interface called PulsarOperations
and provides methods to publish records through its contract.
There are two categories of these send API methods: send
and sendAsync
.
The send
methods block calls by using the synchronous sending capabilities on the Pulsar producer.
They return the MessageId
of the message that was published once the message is persisted on the broker.
The sendAsync
method calls are asynchronous calls that are non-blocking.
They return a CompletableFuture
, which you can use to asynchronously receive the message ID once the messages are published.
For the API variants that do not include a topic parameter, a topic resolution process is used to determine the destination topic. |
1.1. Simple API
The template provides a handful of methods (prefixed with 'send') for simple send requests. For more complicated send requests, a fluent API lets you configure more options.
1.2. Fluent API
The template provides a fluent builder to handle more complicated send requests.
1.3. Message customization
You can specify a TypedMessageBuilderCustomizer
to configure the outgoing message. For example, the following code shows how to send a keyed message:
template.newMessage(msg)
.withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
.send();
1.4. Producer customization
You can specify a ProducerBuilderCustomizer
to configure the underlying Pulsar producer builder that ultimately constructs the producer used to send the outgoing message.
Use with caution as this gives full access to the producer builder and invoking some of its methods (such as create ) may have unintended side effects.
|
For example, the following code shows how to disable batching and enable chunking:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
.send();
This other example shows how to use custom routing when publishing records to partitioned topics.
Specify your custom MessageRouter
implementation on the Producer
builder such as:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
.send();
Note that, when using a MessageRouter , the only valid setting for spring.pulsar.producer.message-routing-mode is custom .
|
This other example shows how to add a ProducerInterceptor
that will intercept and mutate messages received by the producer before being published to the brokers:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.intercept(interceptor))
.send();
The customizer will only apply to the producer used for the send operation. If you want to apply a customizer to all producers, you must provide them to the producer factory as described in Global producer customization.
The rules described in “Caution on Lambda customizers” must be followed when using Lambda customizers. |
2. Specifying Schema Information
If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data.
For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the PulsarTemplate
, the Spring for Apache Pulsar framework will try to build a Schema.JSON
from the type.
Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_PRODUCE_BYTES, and KEY_VALUE w/ INLINE encoding. |
2.1. Custom Schema Mapping
As an alternative to specifying the schema when invoking send operations on the PulsarTemplate
for complex types, the schema resolver can be configured with mappings for the types.
This removes the need to specify the schema as the framework consults the resolver using the outgoing message type.
2.1.1. Configuration properties
Schema mappings can be configured with the spring.pulsar.defaults.type-mappings
property.
The following example uses application.yml
to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
The message-type is the fully-qualified name of the message class.
|
2.1.2. Schema resolver customizer
The preferred method of adding mappings is via the property mentioned above. However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).
The following example uses a schema resolver customizer to add mappings for the User
and Address
complex objects using AVRO
and JSON
schemas, respectively:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. Type mapping annotation
Another option for specifying default schema information to use for a particular message type is to mark the message class with the @PulsarMessage
annotation.
The schema info can be specified via the schemaType
attribute on the annotation.
The following example configures the system to use JSON as the default schema when producing or consuming messages of type Foo
:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
With this configuration in place, there is no need to set specify the schema on send operations.
2.2. Producing with AUTO_SCHEMA
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use an AUTO_PRODUCE schema to publish a raw JSON or Avro payload as a byte[]
safely.
In this case, the producer validates whether the outbound bytes are compatible with the schema of the destination topic.
Simply specify a schema of Schema.AUTO_PRODUCE_BYTES()
on your template send operations as shown in the example below:
void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
This is only supported with Avro and JSON schema types. |
3. Pulsar Producer Factory
The PulsarTemplate
relies on a PulsarProducerFactory
to actually create the underlying producer.
Spring Boot auto-configuration also provides this producer factory which you can further configure by specifying any of the spring.pulsar.producer.*
application properties.
If topic information is not specified when using the producer factory APIs directly, the same topic resolution process used by the PulsarTemplate is used with the one exception that the "Message type default" step is omitted.
|
3.1. Global producer customization
The framework provides the ProducerBuilderCustomizer
contract which allows you to configure the underlying builder which is used to construct each producer.
To customize all producers, you can pass a list of customizers into the PulsarProducerFactory
constructor.
When using multiple customizers, they are applied in the order in which they appear in the list.
If you use Spring Boot auto-configuration, you can specify the customizers as beans and they will be passed automatically to the PulsarProducerFactory , ordered according to their @Order annotation.
|
If you want to apply a customizer to just a single producer, you can use the Fluent API and specify the customizer at send time.
4. Pulsar Producer Caching
Each underlying Pulsar producer consumes resources. To improve performance and avoid continual creation of producers, the producer factory caches the producers that it creates. They are cached in an LRU fashion and evicted when they have not been used within a configured time period. The cache key is composed of just enough information to ensure that callers are returned the same producer on subsequent creation requests.
Additionally, you can configure the cache settings by specifying any of the spring.pulsar.producer.cache.*
application properties.
4.1. Caution on Lambda customizers
Any user-provided producer customizers are also included in the cache key.
Because the cache key relies on a valid implementation of equals/hashCode
, one must take caution when using Lambda customizers.
RULE: Two customizers implemented as Lambdas will match on equals/hashCode if and only if they use the same Lambda instance and do not require any variable defined outside its closure.
|
To clarify the above rule we will look at a few examples.
In the following example, the customizer is defined as an inline Lambda which means that each call to sendUser
uses the same Lambda instance. Additionally, it requires no variable outside its closure. Therefore, it will match as a cache key.
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName("user"))
.send();
}
In this next case, the customizer is defined as an inline Lambda which means that each call to sendUser
uses the same Lambda instance. However, it requires a variable outside its closure. Therefore, it will not match as a cache key.
void sendUser() {
var user = randomUser();
var name = randomName();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName(name))
.send();
}
In this final example, the customizer is defined as an inline Lambda which means that each call to sendUser
uses the same Lambda instance. While it does use a variable name, it does not originate outside its closure and therefore will match as a cache key.
This illustrates that variables can be used within the Lambda closure and can even make calls to static methods.
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> {
var name = SomeHelper.someStaticMethod();
b.producerName(name);
})
.send();
}
RULE: If your Lambda customizer is not defined once and only once (the same instance is used on subsequent calls) OR it requires variable(s) defined outside its closure then you must provide a customizer implementation with a valid equals/hashCode implementation.
|
If these rules are not followed then the producer cache will always miss and your application performance will be negatively affected. |
5. Intercept Messages on the Producer
Adding a ProducerInterceptor
lets you intercept and mutate messages received by the producer before they are published to the brokers.
To do so, you can pass a list of interceptors into the PulsarTemplate
constructor.
When using multiple interceptors, the order they are applied in is the order in which they appear in the list.
If you use Spring Boot auto-configuration, you can specify the interceptors as Beans.
They are passed automatically to the PulsarTemplate
.
Ordering of the interceptors is achieved by using the @Order
annotation as follows:
@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
...
}
@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
...
}
If you are not using the starter, you will need to configure and register the aforementioned components yourself. |