Overview
Apache Kafka provides a high-level API for serializing and deserializing record values as well as their keys.
It is present with the org.apache.kafka.common.serialization.Serializer<T>
and
org.apache.kafka.common.serialization.Deserializer<T>
abstractions with some built-in implementations.
Meanwhile, we can specify serializer and deserializer classes by using Producer
or Consumer
configuration properties.
The following example shows how to do so:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
For more complex or particular cases, the KafkaConsumer
(and, therefore, KafkaProducer
) provides overloaded
constructors to accept Serializer
and Deserializer
instances for keys
and values
, respectively.
When you use this API, the DefaultKafkaProducerFactory
and DefaultKafkaConsumerFactory
also provide properties (through constructors or setter methods) to inject custom Serializer
and Deserializer
instances into the target Producer
or Consumer
.
Also, you can pass in Supplier<Serializer>
or Supplier<Deserializer>
instances through constructors - these Supplier
s are called on creation of each Producer
or Consumer
.
String serialization
Since version 2.5, Spring for Apache Kafka provides ToStringSerializer
and ParseStringDeserializer
classes that use String representation of entities.
They rely on methods toString
and some Function<String>
or BiFunction<String, Headers>
to parse the String and populate properties of an instance.
Usually, this would invoke some static method on the class, such as parse
:
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
By default, the ToStringSerializer
is configured to convey type information about the serialized entity in the record Headers
.
You can disable this by setting the addTypeInfo
property to false
.
This information can be used by ParseStringDeserializer
on the receiving side.
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(defaulttrue
): You can set it tofalse
to disable this feature on theToStringSerializer
(sets theaddTypeInfo
property).
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
You can configure the Charset
used to convert String
to/from byte[]
with the default being UTF-8
.
You can configure the deserializer with the name of the parser method using ConsumerConfig
properties:
-
ParseStringDeserializer.KEY_PARSER
-
ParseStringDeserializer.VALUE_PARSER
The properties must contain the fully qualified name of the class followed by the method name, separated by a period .
.
The method must be static and have a signature of either (String, Headers)
or (String)
.
A ToFromStringSerde
is also provided, for use with Kafka Streams.
JSON
Spring for Apache Kafka also provides JsonSerializer
and JsonDeserializer
implementations that are based on the
Jackson JSON object mapper.
The JsonSerializer
allows writing any Java object as a JSON byte[]
.
The JsonDeserializer
requires an additional Class<?> targetType
argument to allow the deserialization of a consumed byte[]
to the proper target object.
The following example shows how to create a JsonDeserializer
:
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
You can customize both JsonSerializer
and JsonDeserializer
with an ObjectMapper
.
You can also extend them to implement some particular configuration logic in the configure(Map<String, ?> configs, boolean isKey)
method.
Starting with version 2.3, all the JSON-aware components are configured by default with a JacksonUtils.enhancedObjectMapper()
instance, which comes with the MapperFeature.DEFAULT_VIEW_INCLUSION
and DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
features disabled.
Also such an instance is supplied with well-known modules for custom data types, such a Java time and Kotlin support.
See JacksonUtils.enhancedObjectMapper()
JavaDocs for more information.
This method also registers a org.springframework.kafka.support.JacksonMimeTypeModule
for org.springframework.util.MimeType
objects serialization into the plain string for inter-platform compatibility over the network.
A JacksonMimeTypeModule
can be registered as a bean in the application context and it will be auto-configured into the Spring Boot ObjectMapper
instance.
Also starting with version 2.3, the JsonDeserializer
provides TypeReference
-based constructors for better handling of target generic container types.
Starting with version 2.1, you can convey type information in record Headers
, allowing the handling of multiple types.
In addition, you can configure the serializer and deserializer by using the following Kafka properties.
They have no effect if you have provided Serializer
and Deserializer
instances for KafkaConsumer
and KafkaProducer
, respectively.
Configuration Properties
-
JsonSerializer.ADD_TYPE_INFO_HEADERS
(defaulttrue
): You can set it tofalse
to disable this feature on theJsonSerializer
(sets theaddTypeInfo
property). -
JsonSerializer.TYPE_MAPPINGS
(defaultempty
): See Mapping Types. -
JsonDeserializer.USE_TYPE_INFO_HEADERS
(defaulttrue
): You can set it tofalse
to ignore headers set by the serializer. -
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(defaulttrue
): You can set it tofalse
to retain headers set by the serializer. -
JsonDeserializer.KEY_DEFAULT_TYPE
: Fallback type for deserialization of keys if no header information is present. -
JsonDeserializer.VALUE_DEFAULT_TYPE
: Fallback type for deserialization of values if no header information is present. -
JsonDeserializer.TRUSTED_PACKAGES
(defaultjava.util
,java.lang
): Comma-delimited list of package patterns allowed for deserialization.*
means deserializing all. -
JsonDeserializer.TYPE_MAPPINGS
(defaultempty
): See Mapping Types. -
JsonDeserializer.KEY_TYPE_METHOD
(defaultempty
): See Using Methods to Determine Types. -
JsonDeserializer.VALUE_TYPE_METHOD
(defaultempty
): See Using Methods to Determine Types.
Starting with version 2.2, the type information headers (if added by the serializer) are removed by the deserializer.
You can revert to the previous behavior by setting the removeTypeHeaders
property to false
, either directly on the deserializer or with the configuration property described earlier.
Starting with version 2.8, if you construct the serializer or deserializer programmatically as shown in Programmatic Construction, the above properties will be applied by the factories, as long as you have not set any properties explicitly (using set*() methods or using the fluent API).
Previously, when creating programmatically, the configuration properties were never applied; this is still the case if you explicitly set properties on the object directly.
|
Mapping Types
Starting with version 2.2, when using JSON, you can now provide type mappings by using the properties in the preceding list.
Previously, you had to customize the type mapper within the serializer and deserializer.
Mappings consist of a comma-delimited list of token:className
pairs.
On outbound, the payload’s class name is mapped to the corresponding token.
On inbound, the token in the type header is mapped to the corresponding class name.
The following example creates a set of mappings:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
The corresponding objects must be compatible. |
If you use Spring Boot, you can provide these properties in the application.properties
(or yaml) file.
The following example shows how to do so:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
You can perform only simple configuration with properties.
For more advanced configuration (such as using a custom
Setters are also provided, as an alternative to using these constructors. |
When using Spring Boot and overriding the ConsumerFactory and ProducerFactory as shown above, wild card generic types need to be used with the bean method return type.
If concrete generic types are provided instead, then Spring Boot will ignore these beans and still use the default ones.
|
Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent
argument (which is true
by default).
The following example shows how to do so:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
Using Methods to Determine Types
Starting with version 2.5, you can now configure the deserializer, via properties, to invoke a method to determine the target type.
If present, this will override any of the other techniques discussed above.
This can be useful if the data is published by an application that does not use the Spring serializer and you need to deserialize to different types depending on the data, or other headers.
Set these properties to the method name - a fully qualified class name followed by the method name, separated by a period .
.
The method must be declared as public static
, have one of three signatures (String topic, byte[] data, Headers headers)
, (byte[] data, Headers headers)
or (byte[] data)
and return a Jackson JavaType
.
-
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
-
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
You can use arbitrary headers or inspect the data to determine the type.
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
For more sophisticated data inspection consider using JsonPath
or similar but, the simpler the test to determine the type, the more efficient the process will be.
The following is an example of creating the deserializer programmatically (when providing the consumer factory with the deserializer in the constructor):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
Programmatic Construction
When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
To provide type mapping programmatically, similar to Using Methods to Determine Types, use the typeFunction
property.
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
Alternatively, as long as you don’t use the fluent API to configure properties, or set them using set*()
methods, the factories will configure the serializer/deserializer using the configuration properties; see Configuration Properties.
Starting with version 2.8, if you construct the serializer or deserializer programmatically as shown in Programmatic Construction, the above properties will be applied by the factories, as long as you have not set any properties explicitly (using set*() methods or using the fluent API).
Previously, when creating programmatically, the configuration properties were never applied; this is still the case if you explicitly set properties on the object directly.
|
The corresponding objects must be compatible. |
You can perform only simple configuration with properties.
For more advanced configuration (such as using a custom
Setters are also provided, as an alternative to using these constructors. |
When using Spring Boot and overriding the ConsumerFactory and ProducerFactory as shown above, wild card generic types need to be used with the bean method return type.
If concrete generic types are provided instead, then Spring Boot will ignore these beans and still use the default ones.
|
Delegating Serializer and Deserializer
Using Headers
Version 2.3 introduced the DelegatingSerializer
and DelegatingDeserializer
, which allow producing and consuming records with different key and/or value types.
Producers must set a header DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
to a selector value that is used to select which serializer to use for the value and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
for the key; if a match is not found, an IllegalStateException
is thrown.
For incoming records, the deserializer uses the same headers to select the deserializer to use; if a match is not found or the header is not present, the raw byte[]
is returned.
You can configure the map of selector to Serializer
/ Deserializer
via a constructor, or you can configure it via Kafka producer/consumer properties with the keys DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
.
For the serializer, the producer property can be a Map<String, Object>
where the key is the selector and the value is a Serializer
instance, a serializer Class
or the class name.
The property can also be a String of comma-delimited map entries, as shown below.
For the deserializer, the consumer property can be a Map<String, Object>
where the key is the selector and the value is a Deserializer
instance, a deserializer Class
or the class name.
The property can also be a String of comma-delimited map entries, as shown below.
To configure using properties, use the following syntax:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
Producers would then set the DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
header to thing1
or thing2
.
This technique supports sending different types to the same topic (or different topics).
Starting with version 2.5.1, it is not necessary to set the selector header, if the type (key or value) is one of the standard types supported by Serdes (Long , Integer , etc).
Instead, the serializer will set the header to the class name of the type.
It is not necessary to configure serializers or deserializers for these types, they will be created (once) dynamically.
|
For another technique to send different types to different topics, see Using RoutingKafkaTemplate
.
By Type
Version 2.8 introduced the DelegatingByTypeSerializer
.
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
Starting with version 2.8.3, you can configure the serializer to check if the map key is assignable from the target object, useful when a delegate serializer can serialize sub classes.
In this case, if there are amiguous matches, an ordered Map
, such as a LinkedHashMap
should be provided.
By Topic
Starting with version 2.8, the DelegatingByTopicSerializer
and DelegatingByTopicDeserializer
allow selection of a serializer/deserializer based on the topic name.
Regex Pattern
s are used to lookup the instance to use.
The map can be configured using a constructor, or via properties (a comma delimited list of pattern:serializer
).
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
Use KEY_SERIALIZATION_TOPIC_CONFIG
when using this for keys.
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
You can specify a default serializer/deserializer to use when there is no pattern match using DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
and DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
.
An additional property DelegatingByTopicSerialization.CASE_SENSITIVE
(default true
), when set to false
makes the topic lookup case insensitive.
Starting with version 2.5.1, it is not necessary to set the selector header, if the type (key or value) is one of the standard types supported by Serdes (Long , Integer , etc).
Instead, the serializer will set the header to the class name of the type.
It is not necessary to configure serializers or deserializers for these types, they will be created (once) dynamically.
|
Retrying Deserializer
The RetryingDeserializer
uses a delegate Deserializer
and RetryTemplate
to retry deserialization when the delegate might have transient errors, such as network issues, during deserialization.
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
Starting with version 3.1.2
, a RecoveryCallback
can be set on the RetryingDeserializer
optionally.
Refer to the spring-retry project for configuration of the RetryTemplate
with a retry policy, back off policy, etc.
Spring Messaging Message Conversion
Although the Serializer
and Deserializer
API is quite simple and flexible from the low-level Kafka Consumer
and Producer
perspective, you might need more flexibility at the Spring Messaging level, when using either @KafkaListener
or Spring Integration’s Apache Kafka Support.
To let you easily convert to and from org.springframework.messaging.Message
, Spring for Apache Kafka provides a MessageConverter
abstraction with the MessagingMessageConverter
implementation and its JsonMessageConverter
(and subclasses) customization.
You can inject the MessageConverter
into a KafkaTemplate
instance directly and by using AbstractKafkaListenerContainerFactory
bean definition for the @KafkaListener.containerFactory()
property.
The following example shows how to do so:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
When using Spring Boot, simply define the converter as a @Bean
and Spring Boot auto configuration will wire it into the auto-configured template and container factory.
When you use a @KafkaListener
, the parameter type is provided to the message converter to assist with the conversion.
This type inference can be achieved only when the |
On the consumer side, you can configure a On the producer side, when you use Spring Integration or the
Again, using For convenience, starting with version 2.3, the framework also provides a |
Starting with version 2.7.1, message payload conversion can be delegated to a spring-messaging
SmartMessageConverter
; this enables conversion, for example, to be based on the MessageHeaders.CONTENT_TYPE
header.
The KafkaMessageConverter.fromMessage() method is called for outbound conversion to a ProducerRecord with the message payload in the ProducerRecord.value() property.
The KafkaMessageConverter.toMessage() method is called for inbound conversion from ConsumerRecord with the payload being the ConsumerRecord.value() property.
The SmartMessageConverter.toMessage() method is called to create a new outbound Message<?> from the Message passed to fromMessage() (usually by KafkaTemplate.send(Message<?> msg) ).
Similarly, in the KafkaMessageConverter.toMessage() method, after the converter has created a new Message<?> from the ConsumerRecord , the SmartMessageConverter.fromMessage() method is called and then the final inbound message is created with the newly converted payload.
In either case, if the SmartMessageConverter returns null , the original message is used.
|
When the default converter is used in the KafkaTemplate
and listener container factory, you configure the SmartMessageConverter
by calling setMessagingConverter()
on the template and via the contentTypeConverter
property on @KafkaListener
methods.
Examples:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
Using Spring Data Projection Interfaces
Starting with version 2.1.1, you can convert JSON to a Spring Data Projection interface instead of a concrete type. This allows very selective, and low-coupled bindings to data, including the lookup of values from multiple places inside the JSON document. For example the following interface can be defined as message payload type:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
Accessor methods will be used to lookup the property name as field in the received JSON document by default.
The @JsonPath
expression allows customization of the value lookup, and even to define multiple JSON Path expressions, to look up values from multiple places until an expression returns an actual value.
To enable this feature, use a ProjectingMessageConverter
configured with an appropriate delegate converter (used for outbound conversion and converting non-projection interfaces).
You must also add spring-data:spring-data-commons
and com.jayway.jsonpath:json-path
to the classpath.
When used as the parameter to a @KafkaListener
method, the interface type is automatically passed to the converter as normal.
This type inference can be achieved only when the |
On the consumer side, you can configure a On the producer side, when you use Spring Integration or the
Again, using For convenience, starting with version 2.3, the framework also provides a |
The KafkaMessageConverter.fromMessage() method is called for outbound conversion to a ProducerRecord with the message payload in the ProducerRecord.value() property.
The KafkaMessageConverter.toMessage() method is called for inbound conversion from ConsumerRecord with the payload being the ConsumerRecord.value() property.
The SmartMessageConverter.toMessage() method is called to create a new outbound Message<?> from the Message passed to fromMessage() (usually by KafkaTemplate.send(Message<?> msg) ).
Similarly, in the KafkaMessageConverter.toMessage() method, after the converter has created a new Message<?> from the ConsumerRecord , the SmartMessageConverter.fromMessage() method is called and then the final inbound message is created with the newly converted payload.
In either case, if the SmartMessageConverter returns null , the original message is used.
|
Using ErrorHandlingDeserializer
When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll()
returns.
To solve this problem, the ErrorHandlingDeserializer
has been introduced.
This deserializer delegates to a real deserializer (key or value).
If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer
returns a null
value and a DeserializationException
in a header that contains the cause and the raw bytes.
When you use a record-level MessageListener
, if the ConsumerRecord
contains a DeserializationException
header for either the key or value, the container’s ErrorHandler
is called with the failed ConsumerRecord
.
The record is not passed to the listener.
Alternatively, you can configure the ErrorHandlingDeserializer
to create a custom value by providing a failedDeserializationFunction
, which is a Function<FailedDeserializationInfo, T>
.
This function is invoked to create an instance of T
, which is passed to the listener in the usual fashion.
An object of type FailedDeserializationInfo
, which contains all the contextual information is provided to the function.
You can find the DeserializationException
(as a serialized Java object) in headers.
See the Javadoc for the ErrorHandlingDeserializer
for more information.
You can use the DefaultKafkaConsumerFactory
constructor that takes key and value Deserializer
objects and wire in appropriate ErrorHandlingDeserializer
instances that you have configured with the proper delegates.
Alternatively, you can use consumer configuration properties (which are used by the ErrorHandlingDeserializer
) to instantiate the delegates.
The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
.
The property value can be a class or class name.
The following example shows how to set these properties:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
The following example uses a failedDeserializationFunction
.
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
The preceding example uses the following configuration:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
If the consumer is configured with an ErrorHandlingDeserializer , it is important to configure the KafkaTemplate and its producer with a serializer that can handle normal objects as well as raw byte[] values, which result from deserialization exceptions.
The generic value type of the template should be Object .
One technique is to use the DelegatingByTypeSerializer ; an example follows:
|
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
When using an ErrorHandlingDeserializer
with a batch listener, you must check for the deserialization exceptions in message headers.
When used with a DefaultBatchErrorHandler
, you can use that header to determine which record the exception failed on and communicate to the error handler via a BatchListenerFailedException
.
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException()
can be used to convert the header to a DeserializationException
.
When consuming List<ConsumerRecord<?, ?>
, SerializationUtils.getExceptionFromHeader()
is used instead:
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
If you are also using a DeadLetterPublishingRecoverer , the record published for a DeserializationException will have a record.value() of type byte[] ; this should not be serialized.
Consider using a DelegatingByTypeSerializer configured to use a ByteArraySerializer for byte[] and the normal serializer (Json, Avro, etc) for all other types.
|
Starting with version 3.1, you can add a Validator
to the ErrorHandlingDeserializer
.
If the delegate Deserializer
successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring.
This allows the original raw data to be passed to the error handler.
When creating the deserializer yourself, simply call setValidator
; if you configure the serializer using properties, set the consumer configuration property ErrorHandlingDeserializer.VALIDATOR_CLASS
to the class or fully qualified class name for your Validator
.
When using Spring Boot, this property name is spring.kafka.consumer.properties.spring.deserializer.validator.class
.
If the consumer is configured with an ErrorHandlingDeserializer , it is important to configure the KafkaTemplate and its producer with a serializer that can handle normal objects as well as raw byte[] values, which result from deserialization exceptions.
The generic value type of the template should be Object .
One technique is to use the DelegatingByTypeSerializer ; an example follows:
|
If you are also using a DeadLetterPublishingRecoverer , the record published for a DeserializationException will have a record.value() of type byte[] ; this should not be serialized.
Consider using a DelegatingByTypeSerializer configured to use a ByteArraySerializer for byte[] and the normal serializer (Json, Avro, etc) for all other types.
|
Payload Conversion with Batch Listeners
You can also use a JsonMessageConverter
within a BatchMessagingMessageConverter
to convert batch messages when you use a batch listener container factory.
See Serialization, Deserialization, and Message Conversion and Spring Messaging Message Conversion for more information.
By default, the type for the conversion is inferred from the listener argument.
If you configure the JsonMessageConverter
with a DefaultJackson2TypeMapper
that has its TypePrecedence
set to TYPE_ID
(instead of the default INFERRED
), the converter uses the type information in headers (if present) instead.
This allows, for example, listener methods to be declared with interfaces instead of concrete classes.
Also, the type converter supports mapping, so the deserialization can be to a different type than the source (as long as the data is compatible).
This is also useful when you use class-level @KafkaListener
instances where the payload must have already been converted to determine which method to invoke.
The following example creates beans that use this method:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
Note that, for this to work, the method signature for the conversion target must be a container object with a single generic parameter type, such as the following:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
Note that you can still access the batch headers.
If the batch converter has a record converter that supports it, you can also receive a list of messages where the payloads are converted according to the generic type. The following example shows how to do so:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
ConversionService
Customization
Starting with version 2.1.1, the org.springframework.core.convert.ConversionService
used by the default org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
to resolve parameters for the invocation of a listener method is supplied with all beans that implement any of the following interfaces:
-
org.springframework.core.convert.converter.Converter
-
org.springframework.core.convert.converter.GenericConverter
-
org.springframework.format.Formatter
This lets you further customize listener deserialization without changing the default configuration for ConsumerFactory
and KafkaListenerContainerFactory
.
Setting a custom MessageHandlerMethodFactory on the KafkaListenerEndpointRegistrar through a KafkaListenerConfigurer bean disables this feature.
|
Setting a custom MessageHandlerMethodFactory on the KafkaListenerEndpointRegistrar through a KafkaListenerConfigurer bean disables this feature.
|
Adding Custom HandlerMethodArgumentResolver
to @KafkaListener
Starting with version 2.4.2 you are able to add your own HandlerMethodArgumentResolver
and resolve custom method parameters.
All you need is to implement KafkaListenerConfigurer
and use method setCustomMethodArgumentResolvers()
from class KafkaListenerEndpointRegistrar
.
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
You can also completely replace the framework’s argument resolution by adding a custom MessageHandlerMethodFactory
to the KafkaListenerEndpointRegistrar
bean.
If you do this, and your application needs to handle tombstone records, with a null
value()
(e.g. from a compacted topic), you should add a KafkaNullAwarePayloadArgumentResolver
to the factory; it must be the last resolver because it supports all types and can match arguments without a @Payload
annotation.
If you are using a DefaultMessageHandlerMethodFactory
, set this resolver as the last custom resolver; the factory will ensure that this resolver will be used before the standard PayloadMethodArgumentResolver
, which has no knowledge of KafkaNull
payloads.