When using log compaction, you can send and receive messages with null
payloads to identify the deletion of a key.
You can also receive null
values for other reasons, such as a deserializer that might return null
when it cannot deserialize a value.
Producing Null Payloads
You can send a null
value with the ReactivePulsarTemplate
by passing a null
message parameter value to one of the send
methods, for example:
reactiveTemplate
.send(null, Schema.STRING)
.subscribe();
When sending null values you must specify the schema type as the system can not determine the type of the message from a null payload.
|
When sending null values you must specify the schema type as the system can not determine the type of the message from a null payload.
|
Consuming Null Payloads
For @ReactivePularListener
, the null
payload is passed into the listener method based on the type of its message parameter as follows:
Parameter type | Passed-in value |
---|---|
primitive |
|
user-defined |
|
|
non-null Pulsar message whose |
|
non-null Spring message whose |
|
non-null flux whose entries are non-null Pulsar messages whose |
|
non-null flux whose entries are non-null Spring messages whose |
When the passed-in value is null (ie. single record listeners with primitive or user-defined types) you must use the @Payload parameter annotation with required = false .
|
When using the Spring org.springframework.messaging.Message for your listener payload type, its generic type information must be wide enough to accept Message<PulsarNull> (eg. Message , Message<?> , or Message<Object> ).
This is due to the fact that the Spring Message does not allow null values for its payload and instead uses the PulsarNull placeholder.
|
If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "deleted
".
The following example shows such a configuration:
@ReactivePulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
Mono<Void> myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
When using a streaming message listener (Flux ) the header support is limited, so it less useful in the log compaction scenario.
|
Parameter type | Passed-in value |
---|---|
primitive |
|
user-defined |
|
|
non-null Pulsar message whose |
|
non-null Spring message whose |
|
non-null flux whose entries are non-null Pulsar messages whose |
|
non-null flux whose entries are non-null Spring messages whose |
When the passed-in value is null (ie. single record listeners with primitive or user-defined types) you must use the @Payload parameter annotation with required = false .
|
When using the Spring org.springframework.messaging.Message for your listener payload type, its generic type information must be wide enough to accept Message<PulsarNull> (eg. Message , Message<?> , or Message<Object> ).
This is due to the fact that the Spring Message does not allow null values for its payload and instead uses the PulsarNull placeholder.
|
When using a streaming message listener (Flux ) the header support is limited, so it less useful in the log compaction scenario.
|