对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
序列化、反序列化和消息转换
概述
Apache Kafka 提供了一个高级 API,用于序列化和反序列化记录值及其键。
它与 和 abstractions 一起存在,并带有一些内置实现。
同时,我们可以通过 using 或 configuration 属性来指定序列化器和反序列化器类。
以下示例显示了如何执行此操作:org.apache.kafka.common.serialization.Serializer<T>
org.apache.kafka.common.serialization.Deserializer<T>
Producer
Consumer
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特殊的情况,(并且因此,) 提供重载的
构造函数 和 和 的实例。KafkaConsumer
KafkaProducer
Serializer
Deserializer
keys
values
使用此 API 时, 和 还会提供属性(通过构造函数或 setter 方法),以将自定义和实例注入目标或 .
此外,还可以通过构造函数传入 或 实例 - 这些 s 在创建每个 或 时调用。DefaultKafkaProducerFactory
DefaultKafkaConsumerFactory
Serializer
Deserializer
Producer
Consumer
Supplier<Serializer>
Supplier<Deserializer>
Supplier
Producer
Consumer
字符串序列化
从版本 2.5 开始, Spring for Apache Kafka 提供了使用实体的 String 表示的类。
它们依赖于 methods 和 some or 来解析 String 和 fill 实例的属性。
通常,这会在类上调用一些静态方法,例如:ToStringSerializer
ParseStringDeserializer
toString
Function<String>
BiFunction<String, Headers>
parse
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下, , 配置为传达有关记录中序列化实体的类型信息 。
您可以通过将属性设置为 来禁用此功能。
接收方可以使用此信息。ToStringSerializer
Headers
addTypeInfo
false
ParseStringDeserializer
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(默认):您可以将其设置为在 (设置属性) 上禁用此功能。true
false
ToStringSerializer
addTypeInfo
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以配置 用于转换为 / from 的默认值。Charset
String
byte[]
UTF-8
您可以使用 properties 使用 parser 方法的名称配置 deserializer:ConsumerConfig
-
ParseStringDeserializer.KEY_PARSER
-
ParseStringDeserializer.VALUE_PARSER
属性必须包含类的完全限定名称,后跟方法名称,用句点分隔。
该方法必须是静态的,并且具有 or 的签名。.
(String, Headers)
(String)
还提供了 A,用于 Kafka Streams。ToFromStringSerde
JSON 格式
Spring for Apache Kafka 还提供基于
Jackson JSON 对象映射器。
它允许将任何 Java 对象写入 JSON 。
这需要一个额外的参数来允许将 consumed 反序列化为适当的目标对象。
以下示例演示如何创建 :JsonSerializer
JsonDeserializer
JsonSerializer
byte[]
JsonDeserializer
Class<?> targetType
byte[]
JsonDeserializer
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以使用 .
您还可以扩展它们以实现方法中的某些特定配置逻辑。JsonSerializer
JsonDeserializer
ObjectMapper
configure(Map<String, ?> configs, boolean isKey)
从版本 2.3 开始,默认情况下,所有 JSON 感知组件都配置了一个实例,该实例禁用了 and 功能。
此外,此类实例还提供了用于自定义数据类型的众所周知的模块,例如 Java 时间和 Kotlin 支持。
有关更多信息,请参阅 JavaDocs。
此方法还将 for objects 序列化注册到纯字符串中,以实现网络上的平台间兼容性。
A 可以在应用程序上下文中注册为 bean,并且它将被自动配置到 Spring Boot ObjectMapper
实例中。JacksonUtils.enhancedObjectMapper()
MapperFeature.DEFAULT_VIEW_INCLUSION
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
JacksonUtils.enhancedObjectMapper()
org.springframework.kafka.support.JacksonMimeTypeModule
org.springframework.util.MimeType
JacksonMimeTypeModule
同样从版本 2.3 开始,提供了基于 -的构造函数,以更好地处理目标通用容器类型。JsonDeserializer
TypeReference
从版本 2.1 开始,您可以在 record 中传达类型信息,从而允许处理多种类型。
此外,您可以使用以下 Kafka 属性配置序列化程序和反序列化程序。
如果您分别为 和 提供了 和 实例,则它们无效。Headers
Serializer
Deserializer
KafkaConsumer
KafkaProducer
配置属性
-
JsonSerializer.ADD_TYPE_INFO_HEADERS
(默认):您可以将其设置为在 (设置属性) 上禁用此功能。true
false
JsonSerializer
addTypeInfo
-
JsonSerializer.TYPE_MAPPINGS
(默认):请参阅映射类型。empty
-
JsonDeserializer.USE_TYPE_INFO_HEADERS
(默认):您可以将其设置为忽略序列化程序设置的标头。true
false
-
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(默认):您可以将其设置为保留序列化程序设置的标头。true
false
-
JsonDeserializer.KEY_DEFAULT_TYPE
:如果不存在标头信息,则用于键反序列化的回退类型。 -
JsonDeserializer.VALUE_DEFAULT_TYPE
:如果不存在标头信息,则用于反序列化值的回退类型。 -
JsonDeserializer.TRUSTED_PACKAGES
(default , ):允许反序列化的包模式的逗号分隔列表。 意味着反序列化 all。java.util
java.lang
*
-
JsonDeserializer.TYPE_MAPPINGS
(默认):请参阅映射类型。empty
-
JsonDeserializer.KEY_TYPE_METHOD
(默认):请参阅使用方法确定类型。empty
-
JsonDeserializer.VALUE_TYPE_METHOD
(默认):请参阅使用方法确定类型。empty
从版本 2.2 开始,类型信息 Headers(如果由序列化器添加)由反序列化器删除。
您可以通过直接在反序列化器上或使用前面描述的 configuration 属性将属性设置为 , 来恢复到以前的行为。removeTypeHeaders
false
从版本 2.8 开始,如果你以编程方式构造序列化器或反序列化器,如 编程构造 中所示,只要你没有显式设置任何属性(使用方法或使用 Fluent API),工厂就会应用上述属性。
以前,以编程方式创建时,从不应用配置属性;如果您直接在对象上显式设置属性,则仍然会出现这种情况。set*() |
映射类型
从版本 2.2 开始,在使用 JSON 时,您现在可以使用前面列表中的属性来提供类型映射。
以前,您必须在序列化器和反序列化器中自定义类型映射器。
映射由逗号分隔的对列表组成。
在出站时,有效负载的类名将映射到相应的令牌。
在入站时,type 标头中的令牌将映射到相应的类名。token:className
以下示例创建一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
相应的对象必须兼容。 |
如果使用 Spring Boot,则可以在 (或 yaml) 文件中提供这些属性。
以下示例显示了如何执行此操作:application.properties
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
您只能使用属性执行简单配置。
对于更高级的配置(例如在序列化器和反序列化器中使用自定义),你应该使用接受预构建的序列化器和反序列化器的 producer 和 consumer factory 构造函数。
Spring Boot 示例覆盖了默认工厂:
还提供了 Setter,作为使用这些构造函数的替代方法。 |
当使用 Spring Boot 并覆盖 and 时,如上所述,通配符泛型类型需要与 bean 方法返回类型一起使用。
如果提供了具体的泛型类型,则 Spring Boot 将忽略这些 bean 并仍然使用默认 bean。ConsumerFactory ProducerFactory |
从版本 2.2 开始,你可以显式地将反序列化器配置为使用提供的目标类型,并通过使用具有 boolean 参数的重载构造函数之一(默认情况下)来忽略 Headers 中的类型信息。
以下示例显示了如何执行此操作:useHeadersIfPresent
true
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法确定类型
从版本 2.5 开始,您现在可以通过 properties 配置反序列化器,以调用方法来确定目标类型。
如果存在,这将覆盖上面讨论的任何其他技术。
如果数据是由不使用 Spring 序列化器的应用程序发布的,并且您需要根据数据或其他 Headers 反序列化为不同的类型,那么这可能很有用。
将这些属性设置为方法名称 - 一个完全限定的类名,后跟方法名,用句点分隔。
该方法必须声明为 , have one of three signatures 或 and return a Jackson 。.
public static
(String topic, byte[] data, Headers headers)
(byte[] data, Headers headers)
(byte[] data)
JavaType
-
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
-
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
您可以使用任意标头或检查数据以确定类型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的数据检查,请考虑使用 或类似方法,但是,用于确定类型的测试越简单,过程的效率就越高。JsonPath
以下是以编程方式创建反序列化器的示例(在构造函数中为 Consumer Factory 提供反序列化器时):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程序化构建
当以编程方式构造序列化器/反序列化器以在 producer/consumer 工厂中使用时,从版本 2.3 开始,你可以使用 Fluent API,这简化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
若要以编程方式提供类型映射,类似于 Using Methods to Determine Types,请使用该属性。typeFunction
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要你不使用 Fluent API 来配置属性,或者使用方法设置它们,工厂就会使用配置属性配置序列化器/反序列化器;请参阅配置属性。set*()
委托序列化器和反序列化器
使用标头
版本 2.3 引入了 和 ,它允许生成和使用具有不同键和/或值类型的记录。
生产者必须将 Headers 设置为选择器值,该值用于选择要用于值和键的序列化器;如果未找到匹配项,则引发 AN。DelegatingSerializer
DelegatingDeserializer
DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
IllegalStateException
对于传入记录,反序列化器使用相同的 Headers 来选择要使用的反序列化器;如果未找到匹配项或标头不存在,则返回 RAW。byte[]
您可以通过构造函数配置 selector 到 / 的映射,也可以通过 Kafka 生产者/使用者属性使用键 和 进行配置。
对于序列化程序,producer 属性可以是 a,其中 key 是选择器,value 是实例、序列化程序或类名。
该属性也可以是逗号分隔的映射条目的 String,如下所示。Serializer
Deserializer
DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
Map<String, Object>
Serializer
Class
对于反序列化器,consumer 属性可以是 a,其中 key 是选择器,value 是实例、反序列化器或类名。
该属性也可以是逗号分隔的映射条目的 String,如下所示。Map<String, Object>
Deserializer
Class
要使用 properties 进行配置,请使用以下语法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
然后,创建者会将标头设置为 或 。DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
thing1
thing2
此技术支持向同一主题(或不同主题)发送不同类型的内容。
从版本 2.5.1 开始,如果类型(键或值)是 (, , etc) 支持的标准类型之一,则无需设置 selector 标头。
相反,序列化程序会将标头设置为类型的类名。
没有必要为这些类型配置 serializer 或 deserializers,它们将动态创建(一次)。Serdes Long Integer |
有关将不同类型发送到不同主题的另一种技术,请参阅使用 RoutingKafkaTemplate
。
按类型
版本 2.8 引入了 .DelegatingByTypeSerializer
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从版本 2.8.3 开始,你可以配置序列化器来检查是否可以从目标对象分配 map 键,这在委托序列化器可以序列化子类时很有用。
在这种情况下,如果存在友好匹配项,则应提供 ordered ,例如 a 。Map
LinkedHashMap
按主题
从版本 2.8 开始, 和 允许根据主题名称选择序列化器/反序列化器。
Regex 用于查找要使用的实例。
可以使用构造函数或通过 properties(逗号分隔的列表)配置映射。DelegatingByTopicSerializer
DelegatingByTopicDeserializer
Pattern
pattern:serializer
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
在将此选项用于键时使用。KEY_SERIALIZATION_TOPIC_CONFIG
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以使用 和 指定在没有模式匹配时使用的默认序列化器/反序列化器。DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
当设置为 时,附加属性 (default ) 使主题查找不区分大小写。DelegatingByTopicSerialization.CASE_SENSITIVE
true
false
重试 Deserializer
当委托在反序列化期间可能存在暂时性错误(如网络问题)时,使用委托 和 重试反序列化。RetryingDeserializer
Deserializer
RetryTemplate
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
从 version 开始,可以在 上设置 a。3.1.2
RecoveryCallback
RetryingDeserializer
有关使用重试策略、回退策略等的配置,请参阅 spring-retry 项目。RetryTemplate
Spring 消息传递消息转换
尽管从低级 Kafka 和角度来看,和 API 非常简单灵活,但在使用 Spring Integration的 Apache Kafka 支持时,您可能需要在 Spring 消息传递级别具有更大的灵活性。
为了让您轻松地转换为 Spring for Apache Kafka 提供了带有实现及其(和子类)自定义的抽象。
您可以直接将 the 注入到实例中,也可以通过对属性使用 bean 定义来注入。
以下示例显示了如何执行此操作:Serializer
Deserializer
Consumer
Producer
@KafkaListener
org.springframework.messaging.Message
MessageConverter
MessagingMessageConverter
JsonMessageConverter
MessageConverter
KafkaTemplate
AbstractKafkaListenerContainerFactory
@KafkaListener.containerFactory()
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
使用 Spring Boot 时,只需将转换器定义为 a ,Spring Boot 自动配置就会将其连接到自动配置的模板和容器工厂中。@Bean
当您使用 时,将向消息转换器提供参数类型以帮助进行转换。@KafkaListener
只有在方法级别声明 Comments 时,才能实现此类型推断。
使用 class-level ,有效负载类型用于选择要调用的方法,因此在选择方法之前必须已转换该方法。 |
在使用者端,您可以配置 ;它可以处理 类型的值,因此应与 、 或 结合使用。
( 并且效率更高,因为它们避免了不必要的转换)。
如果你愿意,你还可以配置对应于 deserializer 的特定子类。 在生产者端,当您使用 Spring Integration 或方法(请参阅 使用
同样,使用 or 更有效,因为它们避免了 to 转换。 为方便起见,从版本 2.3 开始,该框架还提供了一个可以序列化所有三种值类型的工具,以便它可以与任何消息转换器一起使用。 |
从版本 2.7.1 开始,消息负载转换可以委托给 ;例如,这允许基于 Headers 进行转换。spring-messaging
SmartMessageConverter
MessageHeaders.CONTENT_TYPE
调用该方法以将消息有效负载出站转换为 a。
该方法用于入站转化 from,有效负载为属性。
调用该方法以从传递给 (通常由 ) 创建新的出站。
同样,在该方法中,在转换器从 创建新的 后,将调用该方法,然后使用新转换的有效负载创建最终的入站消息。
在任一情况下,如果 returns ,则使用原始消息。KafkaMessageConverter.fromMessage() ProducerRecord ProducerRecord.value() KafkaMessageConverter.toMessage() ConsumerRecord ConsumerRecord.value() SmartMessageConverter.toMessage() Message<?> Message fromMessage() KafkaTemplate.send(Message<?> msg) KafkaMessageConverter.toMessage() Message<?> ConsumerRecord SmartMessageConverter.fromMessage() SmartMessageConverter null |
当 和 listener 容器工厂中使用默认转换器时,您可以通过调用模板和属性 on methods 来配置 。KafkaTemplate
SmartMessageConverter
setMessagingConverter()
contentTypeConverter
@KafkaListener
例子:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring Data Projection 接口
从版本 2.1.1 开始,你可以将 JSON 转换为 Spring Data Projection 接口,而不是具体类型。 这允许对数据进行非常有选择性的低耦合绑定,包括从 JSON 文档中的多个位置查找值。 例如,可以将以下接口定义为消息有效负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
默认情况下,将使用访问器方法在接收到的 JSON 文档中查找属性名称作为字段。
该表达式允许自定义值查找,甚至定义多个 JSON 路径表达式,以便从多个位置查找值,直到表达式返回实际值。@JsonPath
要启用此功能,请使用配置了适当的代理转换器(用于出站转换和转换非投影接口)。
您还必须将 和 添加到 Classpath 中。ProjectingMessageConverter
spring-data:spring-data-commons
com.jayway.jsonpath:json-path
当用作方法的参数时,接口类型会像往常一样自动传递给转换器。@KafkaListener
用ErrorHandlingDeserializer
当反序列化器无法反序列化消息时, Spring 无法处理该问题,因为它发生在返回之前。
为了解决这个问题,引入了 。
此 deserializer 委托给真正的 deserializer (key 或 value)。
如果委托无法反序列化记录内容,则 将在包含原因和原始字节的标头中返回一个值和 a。
使用 record-level 时,如果 包含 key 或 value 的标头,则使用 failed 调用容器的 。
记录不会传递给侦听器。poll()
ErrorHandlingDeserializer
ErrorHandlingDeserializer
null
DeserializationException
MessageListener
ConsumerRecord
DeserializationException
ErrorHandler
ConsumerRecord
或者,您也可以通过提供 来配置 以创建自定义值,即 .
调用此函数以创建 的实例,该实例以通常的方式传递给侦听器。
向函数提供包含所有上下文信息的 object 类型的对象。
您可以在 headers 中找到 (作为序列化的 Java 对象)。
有关更多信息,请参阅 Javadoc 。ErrorHandlingDeserializer
failedDeserializationFunction
Function<FailedDeserializationInfo, T>
T
FailedDeserializationInfo
DeserializationException
ErrorHandlingDeserializer
您可以使用采用键和值对象的构造函数,并连接已使用适当委托配置的相应实例。
或者,您也可以使用使用者配置属性(由 使用)来实例化委托。
属性名称为 和 。
属性值可以是类或类名。
以下示例显示如何设置这些属性:DefaultKafkaConsumerFactory
Deserializer
ErrorHandlingDeserializer
ErrorHandlingDeserializer
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用 .failedDeserializationFunction
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
前面的示例使用以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果 consumer 配置了 ,则必须使用 serializer 配置 the 及其 producer ,该序列化器可以处理普通对象以及由反序列化异常导致的原始值。
模板的 generic value type 应为 。
一种方法是使用 ;示例如下:ErrorHandlingDeserializer KafkaTemplate byte[] Object DelegatingByTypeSerializer |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
将 与 batch listener 一起使用时,必须检查消息标头中的反序列化异常。
与 一起使用时,您可以使用该标头来确定异常失败的记录,并通过 与错误处理程序通信。ErrorHandlingDeserializer
DefaultBatchErrorHandler
BatchListenerFailedException
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException()
可用于将标头转换为 .DeserializationException
当使用 , 时,改用:List<ConsumerRecord<?, ?>
SerializationUtils.getExceptionFromHeader()
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果您还使用 ,则为 a 发布的记录将具有 a 类型的 ;这不应该被序列化。
考虑使用配置为使用 for 和普通序列化器(Json、Avro 等)来处理所有其他类型的序列化程序。DeadLetterPublishingRecoverer DeserializationException record.value() byte[] DelegatingByTypeSerializer ByteArraySerializer byte[] |
从版本 3.1 开始,您可以向 .
如果委托成功反序列化对象,但该对象未通过验证,则会引发类似于发生的反序列化异常的异常。
这允许将原始原始数据传递给错误处理程序。
当自己创建反序列化器时,只需调用 ;如果使用 Properties 配置序列化程序,请将 Consumer Configuration 属性设置为 .
使用 Spring Boot 时,此属性名称为 .Validator
ErrorHandlingDeserializer
Deserializer
setValidator
ErrorHandlingDeserializer.VALIDATOR_CLASS
Validator
spring.kafka.consumer.properties.spring.deserializer.validator.class
使用批量侦听器进行有效负载转换
当您使用批处理侦听器容器工厂时,您还可以使用 within a 来转换批处理消息。
有关更多信息,请参见序列化、反序列化和消息转换和 Spring 消息传递消息转换。JsonMessageConverter
BatchMessagingMessageConverter
默认情况下,转换的类型是从 listener 参数推断的。
如果使用 a 将其设置为 (而不是 default) 进行配置,则转换器将改用 headers 中的类型信息(如果存在)。
例如,这允许使用接口而不是具体类来声明侦听器方法。
此外,类型转换器支持映射,因此反序列化可以是与源不同的类型(只要数据兼容)。
当您使用类级 @KafkaListener
实例时,这也很有用,其中有效负载必须已转换以确定要调用的方法。
下面的示例创建使用此方法的 bean:JsonMessageConverter
DefaultJackson2TypeMapper
TypePrecedence
TYPE_ID
INFERRED
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,要使其正常工作,转换目标的方法签名必须是具有单个泛型参数类型的容器对象,如下所示:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
请注意,您仍然可以访问批处理标头。
如果批处理转换器具有支持它的记录转换器,则您还可以接收一个消息列表,其中的有效负载根据泛型类型进行转换。 以下示例显示了如何执行此操作:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
ConversionService
定制
从版本 2.1.1 开始,默认情况下用于解析调用侦听器方法的参数的 bean 与实现以下任何接口的所有 bean 一起提供:org.springframework.core.convert.ConversionService
org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
-
org.springframework.core.convert.converter.Converter
-
org.springframework.core.convert.converter.GenericConverter
-
org.springframework.format.Formatter
这样,您就可以进一步自定义侦听器反序列化,而无需更改 和 的默认配置。ConsumerFactory
KafkaListenerContainerFactory
在 通过 bean 上设置 custom 将禁用此功能。MessageHandlerMethodFactory KafkaListenerEndpointRegistrar KafkaListenerConfigurer |
将 Custom 添加到HandlerMethodArgumentResolver
@KafkaListener
从版本 2.4.2 开始,您可以添加自己的和解析自定义方法参数。
你只需要实现和使用类 中的方法。HandlerMethodArgumentResolver
KafkaListenerConfigurer
setCustomMethodArgumentResolvers()
KafkaListenerEndpointRegistrar
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
您还可以通过向 Bean 添加自定义来完全替换框架的参数解析。
如果你这样做了,并且你的应用程序需要处理 tombstone 记录,使用 a (例如,来自压缩的主题),你应该将 a 添加到工厂;它必须是最后一个解析程序,因为它支持所有类型的解析器,并且可以在没有注释的情况下匹配参数。
如果您使用的是 ,请将此解析程序设置为最后一个自定义解析程序;工厂将确保此解析器将在 standard 之前使用,该 standard 对有效负载一无所知。MessageHandlerMethodFactory
KafkaListenerEndpointRegistrar
null
value()
KafkaNullAwarePayloadArgumentResolver
@Payload
DefaultMessageHandlerMethodFactory
PayloadMethodArgumentResolver
KafkaNull