对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

概述

Apache Kafka 提供了一个高级 API,用于序列化和反序列化记录值及其键。 它与具有一些内置实现的 and 抽象一起存在。 同时,我们可以通过使用 or 配置属性来指定序列化程序和反序列化程序类。 以下示例演示如何执行此操作:org.apache.kafka.common.serialization.Serializer<T>org.apache.kafka.common.serialization.Deserializer<T>ProducerConsumerSpring中文文档

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

对于更复杂或更特殊的情况,(因此,)提供重载 要接受的构造函数和 和 的实例。KafkaConsumerKafkaProducerSerializerDeserializerkeysvaluesSpring中文文档

使用此 API 时,and 还提供属性(通过构造函数或 setter 方法)以将自定义和实例注入到目标或 . 此外,您可以通过构造函数传入 or 实例 - 这些 s 在创建每个 or 时调用。DefaultKafkaProducerFactoryDefaultKafkaConsumerFactorySerializerDeserializerProducerConsumerSupplier<Serializer>Supplier<Deserializer>SupplierProducerConsumerSpring中文文档

字符串序列化

从 2.5 版开始,Spring for Apache Kafka 提供了使用实体的 String 表示的类。 它们依赖于方法和一些 or 来解析 String 并填充实例的属性。 通常,这会在类上调用一些静态方法,例如:ToStringSerializerParseStringDeserializertoStringFunction<String>BiFunction<String, Headers>parseSpring中文文档

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);

默认情况下,配置为传达有关记录中序列化实体的类型信息。 可以通过将属性设置为 来禁用此功能。 此信息可由接收方使用。ToStringSerializerHeadersaddTypeInfofalseParseStringDeserializerSpring中文文档

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS(默认):您可以将其设置为在(设置属性)上禁用此功能。truefalseToStringSerializeraddTypeInfoSpring中文文档

ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});

您可以配置用于转换的 to/to/to,默认为 。CharsetStringbyte[]UTF-8Spring中文文档

您可以使用属性使用解析器方法的名称配置反序列化程序:ConsumerConfigSpring中文文档

这些属性必须包含类的完全限定名称,后跟方法名称,并用句点分隔。 该方法必须是静态的,并且签名为 或 。.(String, Headers)(String)Spring中文文档

还提供了 A,用于 Kafka Streams。ToFromStringSerdeSpring中文文档

JSON的

Spring for Apache Kafka 还提供和实现基于 Jackson JSON 对象映射器。 允许将任何 Java 对象写入 JSON 。 需要一个额外的参数,以允许将消耗的反序列化为正确的目标对象。 以下示例演示如何创建一个:JsonSerializerJsonDeserializerJsonSerializerbyte[]JsonDeserializerClass<?> targetTypebyte[]JsonDeserializerSpring中文文档

JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);

您可以使用 . 还可以扩展它们以在方法中实现某些特定的配置逻辑。JsonSerializerJsonDeserializerObjectMapperconfigure(Map<String, ?> configs, boolean isKey)Spring中文文档

从版本 2.3 开始,默认情况下,所有 JSON 感知组件都配置了一个实例,该实例带有禁用的 和 功能。 此外,此类实例还提供了用于自定义数据类型的知名模块,例如 Java time 和 Kotlin 支持。 有关更多信息,请参阅 JavaDocs。 此方法还会将 for 对象序列化注册到纯字符串中,以实现网络上的平台间兼容性。 可以在应用程序上下文中将 A 注册为 bean,它将自动配置到 Spring Boot ObjectMapper 实例中。JacksonUtils.enhancedObjectMapper()MapperFeature.DEFAULT_VIEW_INCLUSIONDeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIESJacksonUtils.enhancedObjectMapper()org.springframework.kafka.support.JacksonMimeTypeModuleorg.springframework.util.MimeTypeJacksonMimeTypeModuleSpring中文文档

同样从 2.3 版开始,提供了基于 的构造函数,以便更好地处理目标泛型容器类型。JsonDeserializerTypeReferenceSpring中文文档

从版本 2.1 开始,您可以在记录中传达类型信息,允许处理多种类型。 此外,还可以使用以下 Kafka 属性配置序列化程序和反序列化程序。 如果您分别提供了 和 实例,则它们无效。HeadersSerializerDeserializerKafkaConsumerKafkaProducerSpring中文文档

配置属性

  • JsonSerializer.ADD_TYPE_INFO_HEADERS(默认):您可以将其设置为在(设置属性)上禁用此功能。truefalseJsonSerializeraddTypeInfoSpring中文文档

  • JsonSerializer.TYPE_MAPPINGS(默认):请参阅映射类型emptySpring中文文档

  • JsonDeserializer.USE_TYPE_INFO_HEADERS(默认):可以将其设置为忽略序列化程序设置的标头。truefalseSpring中文文档

  • JsonDeserializer.REMOVE_TYPE_INFO_HEADERS(默认):可以设置为保留序列化程序设置的标头。truefalseSpring中文文档

  • JsonDeserializer.KEY_DEFAULT_TYPE:如果不存在标头信息,则用于反序列化密钥的回退类型。Spring中文文档

  • JsonDeserializer.VALUE_DEFAULT_TYPE:如果不存在标头信息,则用于反序列化值的回退类型。Spring中文文档

  • JsonDeserializer.TRUSTED_PACKAGES(默认 , ):允许反序列化的包模式的逗号分隔列表。 意味着对所有内容进行反序列化。java.utiljava.lang*Spring中文文档

  • JsonDeserializer.TYPE_MAPPINGS(默认):请参阅映射类型emptySpring中文文档

  • JsonDeserializer.KEY_TYPE_METHOD(默认):请参阅使用方法确定类型emptySpring中文文档

  • JsonDeserializer.VALUE_TYPE_METHOD(默认):请参阅使用方法确定类型emptySpring中文文档

从版本 2.2 开始,反序列化程序将删除类型信息标头(如果由序列化程序添加)。 可以通过直接在反序列化程序上或使用前面所述的配置属性将属性设置为 来恢复以前的行为。removeTypeHeadersfalseSpring中文文档

从版本 2.8 开始,如果以编程方式构造序列化程序或反序列化程序,则只要未显式设置任何属性(使用方法或使用 Fluent API),工厂就会应用上述属性。 以前,以编程方式创建时,从未应用配置属性;如果直接在对象上显式设置属性,则仍然如此。set*()

映射类型

从版本 2.2 开始,使用 JSON 时,现在可以使用前面列表中的属性提供类型映射。 以前,您必须在序列化程序和反序列化程序中自定义类型映射器。 映射由逗号分隔的对列表组成。 在出站时,有效负载的类名将映射到相应的令牌。 在入站时,类型标头中的标记将映射到相应的类名。token:classNameSpring中文文档

以下示例创建一组映射:Spring中文文档

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.hat");
相应的对象必须兼容。

如果使用 Spring Boot,则可以在 (或 yaml) 文件中提供这些属性。 以下示例演示如何执行此操作:application.propertiesSpring中文文档

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

您只能使用属性执行简单的配置。 对于更高级的配置(例如,在序列化程序和反序列化程序中使用自定义项),应使用接受预生成序列化程序和反序列化程序的生产者和使用者工厂构造函数。 以下 Spring Boot 示例覆盖默认工厂:ObjectMapperSpring中文文档

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

还提供了 Setter,作为使用这些构造函数的替代方法。Spring中文文档

当使用 Spring Boot 并覆盖 and 时,如上所示,通配符泛型类型需要与 bean 方法返回类型一起使用。 如果提供了具体的泛型类型,则 Spring Boot 将忽略这些 bean,并且仍然使用默认的 bean。ConsumerFactoryProducerFactory

从版本 2.2 开始,您可以显式配置反序列化程序以使用提供的目标类型,并使用具有布尔参数(默认情况下)的重载构造函数之一忽略标头中的类型信息。 以下示例演示如何执行此操作:useHeadersIfPresenttrueSpring中文文档

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

使用方法确定类型

从版本 2.5 开始,您现在可以通过属性配置反序列化程序,以调用方法来确定目标类型。 如果存在,这将覆盖上面讨论的任何其他技术。 如果数据由不使用 Spring 序列化程序的应用程序发布,并且您需要根据数据或其他标头反序列化为不同类型的数据,这可能很有用。 将这些属性设置为方法名称 - 一个完全限定的类名,后跟方法名称,以句点分隔。 该方法必须声明为 ,具有三个签名之一,或者返回一个 Jackson 。.public static(String topic, byte[] data, Headers headers)(byte[] data, Headers headers)(byte[] data)JavaTypeSpring中文文档

您可以使用任意标头或检查数据来确定类型。Spring中文文档

JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}

对于更复杂的数据检查,请考虑使用或类似方法,但是,确定类型的测试越简单,该过程的效率就越高。JsonPathSpring中文文档

以下是以编程方式创建反序列化程序的示例(在构造函数中为消费者工厂提供解串器时):Spring中文文档

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}

方案化建设

从版本 2.3 开始,以编程方式构造序列化程序/反序列化程序以在生产者/使用者工厂中使用时,可以使用 Fluent API,从而简化配置。Spring中文文档

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}

若要以编程方式提供类型映射(类似于使用方法确定类型),请使用该属性。typeFunctionSpring中文文档

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);

或者,只要您不使用 fluent API 配置属性,或者使用方法设置属性,工厂就会使用配置属性配置序列化程序/反序列化程序;请参阅配置属性set*()Spring中文文档

从版本 2.8 开始,如果以编程方式构造序列化程序或反序列化程序,则只要未显式设置任何属性(使用方法或使用 Fluent API),工厂就会应用上述属性。 以前,以编程方式创建时,从未应用配置属性;如果直接在对象上显式设置属性,则仍然如此。set*()
相应的对象必须兼容。

您只能使用属性执行简单的配置。 对于更高级的配置(例如,在序列化程序和反序列化程序中使用自定义项),应使用接受预生成序列化程序和反序列化程序的生产者和使用者工厂构造函数。 以下 Spring Boot 示例覆盖默认工厂:ObjectMapperSpring中文文档

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

还提供了 Setter,作为使用这些构造函数的替代方法。Spring中文文档

当使用 Spring Boot 并覆盖 and 时,如上所示,通配符泛型类型需要与 bean 方法返回类型一起使用。 如果提供了具体的泛型类型,则 Spring Boot 将忽略这些 bean,并且仍然使用默认的 bean。ConsumerFactoryProducerFactory

委派序列化程序和解串化程序

使用标头

版本 2.3 引入了 和 ,它允许生成和使用具有不同键和/或值类型的记录。 生产者必须将标头设置为一个选择器值,该值用于选择要用于值和键的序列化程序;如果未找到匹配项,则抛出 an。DelegatingSerializerDelegatingDeserializerDelegatingSerializer.VALUE_SERIALIZATION_SELECTORDelegatingSerializer.KEY_SERIALIZATION_SELECTORIllegalStateExceptionSpring中文文档

对于传入记录,解串化程序使用相同的标头来选择要使用的反序列化程序;如果未找到匹配项或标头不存在,则返回 RAW。byte[]Spring中文文档

您可以通过构造函数将选择器映射到 / 配置,也可以通过 Kafka 生产者/使用者属性使用 keys 和 来配置它。 对于序列化程序,producer 属性可以是 其中键是选择器,值是实例、序列化程序或类名。 该属性也可以是逗号分隔的映射条目的字符串,如下所示。SerializerDeserializerDelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIGDelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIGMap<String, Object>SerializerClassSpring中文文档

对于反序列化程序,consumer 属性可以是 其中键是选择器,值是实例、反序列化程序或类名。 该属性也可以是逗号分隔的映射条目的字符串,如下所示。Map<String, Object>DeserializerClassSpring中文文档

若要配置使用属性,请使用以下语法:Spring中文文档

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

然后,生产者会将标头设置为 或 。DelegatingSerializer.VALUE_SERIALIZATION_SELECTORthing1thing2Spring中文文档

此技术支持将不同的类型发送到同一主题(或不同主题)。Spring中文文档

从版本 2.5.1 开始,如果类型(键或值)是 (、 等) 支持的标准类型之一,则无需设置选择器标头。 相反,序列化程序会将标头设置为类型的类名。 无需为这些类型配置序列化程序或反序列化程序,它们将(一次)动态创建。SerdesLongInteger

有关将不同类型发送到不同主题的另一种技术,请参阅使用 RoutingKafkaTemplateSpring中文文档

按类型

版本 2.8 引入了 .DelegatingByTypeSerializerSpring中文文档

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

从版本 2.8.3 开始,可以配置序列化程序以检查映射键是否可以从目标对象分配,这在委托序列化程序可以序列化子类时非常有用。 在这种情况下,如果存在友好匹配项,则应提供有序的 ,例如 a。MapLinkedHashMapSpring中文文档

按主题

从版本 2.8 开始,并允许根据主题名称选择序列化程序/反序列化程序。 正则表达式用于查找要使用的实例。 可以使用构造函数或通过属性(逗号分隔的 列表)来配置映射。DelegatingByTopicSerializerDelegatingByTopicDeserializerPatternpattern:serializerSpring中文文档

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());

将其用于密钥时使用。KEY_SERIALIZATION_TOPIC_CONFIGSpring中文文档

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            new IntegerSerializer(),
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}

您可以使用 和 指定缺省序列化程序/反序列化程序,以便在没有模式匹配时使用。DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULTDelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULTSpring中文文档

附加属性(默认值)设置为时,主题查找不区分大小写。DelegatingByTopicSerialization.CASE_SENSITIVEtruefalseSpring中文文档

从版本 2.5.1 开始,如果类型(键或值)是 (、 等) 支持的标准类型之一,则无需设置选择器标头。 相反,序列化程序会将标头设置为类型的类名。 无需为这些类型配置序列化程序或反序列化程序,它们将(一次)动态创建。SerdesLongInteger

重试反串化程序

当委托在反序列化过程中可能出现暂时性错误(如网络问题)时,使用委托并重试反序列化。RetryingDeserializerDeserializerRetryTemplateSpring中文文档

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

从版本开始,可以选择在 上设置 a。3.1.2RecoveryCallbackRetryingDeserializerSpring中文文档

请参阅 spring-retry 项目,了解 with a retry 策略、back off 策略等的配置。RetryTemplateSpring中文文档

Spring 消息传递消息转换

尽管从低级 Kafka 和角度来看,和 API 非常简单和灵活,但在使用 Spring Integration 的 Apache Kafka 支持时,您可能需要在 Spring Messaging 级别上具有更大的灵活性。 为了让您轻松地与 之间的转换,Spring for Apache Kafka 提供了一个包含实现及其(和子类)自定义的抽象。 您可以直接将 注入到实例中,也可以使用属性的 Bean 定义。 以下示例演示如何执行此操作:SerializerDeserializerConsumerProducer@KafkaListenerorg.springframework.messaging.MessageMessageConverterMessagingMessageConverterJsonMessageConverterMessageConverterKafkaTemplateAbstractKafkaListenerContainerFactory@KafkaListener.containerFactory()Spring中文文档

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

使用 Spring Boot 时,只需将转换器定义为 Spring Boot 自动配置,就会将其连接到自动配置的模板和容器工厂。@BeanSpring中文文档

使用 时,将向消息转换器提供参数类型以帮助进行转换。@KafkaListenerSpring中文文档

只有在方法级别声明注释时,才能实现这种类型推理。 对于类级别,有效负载类型用于选择要调用的方法,因此在选择方法之前必须已经转换了有效负载类型。@KafkaListener@KafkaListener@KafkaHandlerSpring中文文档

在消费者端,您可以配置一个 ;它可以处理 类型的值,因此应该与 、 或 结合使用。 (并且效率更高,因为它们避免了不必要的转换)。 如果您愿意,还可以配置与反序列化程序对应的特定子类。JsonMessageConverterConsumerRecordbyte[]BytesStringByteArrayDeserializerBytesDeserializerStringDeserializerbyte[]Bytesbyte[]StringJsonMessageConverterSpring中文文档

在生产者端,当您使用 Spring Integration 或该方法(请参阅使用 KafkaTemplate)时,必须配置与配置的 Kafka 兼容的消息转换器。KafkaTemplate.send(Message<?> message)SerializerSpring中文文档

同样,使用 or 更有效,因为它们避免了 a 到 的转换。byte[]BytesStringbyte[]Spring中文文档

为方便起见,从版本 2.3 开始,该框架还提供了一个可以序列化所有三种值类型的 IT,以便它可以与任何消息转换器一起使用。StringOrBytesSerializerSpring中文文档

从版本 2.7.1 开始,消息负载转换可以委托给 ;例如,这样就可以基于标头进行转换。spring-messagingSmartMessageConverterMessageHeaders.CONTENT_TYPESpring中文文档

调用该方法以将出站转换为属性中的消息有效负载。 调用该方法进行入站转换,有效负载为属性。 调用该方法以创建从传递给(通常由 )的新出站。 同样,在该方法中,在转换器从 创建新的 后,将调用该方法,然后使用新转换的有效负载创建最终的入站消息。 无论哪种情况,如果返回 ,则使用原始消息。KafkaMessageConverter.fromMessage()ProducerRecordProducerRecord.value()KafkaMessageConverter.toMessage()ConsumerRecordConsumerRecord.value()SmartMessageConverter.toMessage()Message<?>MessagefromMessage()KafkaTemplate.send(Message<?> msg)KafkaMessageConverter.toMessage()Message<?>ConsumerRecordSmartMessageConverter.fromMessage()SmartMessageConverternull

在 and 侦听器容器工厂中使用默认转换器时,可以通过调用模板和属性来配置 on 方法。KafkaTemplateSmartMessageConvertersetMessagingConverter()contentMessageConverter@KafkaListenerSpring中文文档

template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}

使用 Spring 数据投影接口

从版本 2.1.1 开始,您可以将 JSON 转换为 Spring Data Projection 接口,而不是具体类型。 这允许对数据进行非常有选择性和低耦合的绑定,包括从 JSON 文档中的多个位置查找值。 例如,以下接口可以定义为消息负载类型:Spring中文文档

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

默认情况下,访问器方法将用于在收到的 JSON 文档中将属性名称查找为字段。 该表达式允许自定义值查找,甚至定义多个 JSON 路径表达式,以便从多个位置查找值,直到表达式返回实际值。@JsonPathSpring中文文档

要启用此功能,请使用配置了适当委托转换器(用于出站转换和转换非投影接口)的转换器。 还必须将 和 添加到类路径中。ProjectingMessageConverterspring-data:spring-data-commonscom.jayway.jsonpath:json-pathSpring中文文档

当用作方法的参数时,接口类型会像往常一样自动传递给转换器。@KafkaListenerSpring中文文档

只有在方法级别声明注释时,才能实现这种类型推理。 对于类级别,有效负载类型用于选择要调用的方法,因此在选择方法之前必须已经转换了有效负载类型。@KafkaListener@KafkaListener@KafkaHandlerSpring中文文档

在消费者端,您可以配置一个 ;它可以处理 类型的值,因此应该与 、 或 结合使用。 (并且效率更高,因为它们避免了不必要的转换)。 如果您愿意,还可以配置与反序列化程序对应的特定子类。JsonMessageConverterConsumerRecordbyte[]BytesStringByteArrayDeserializerBytesDeserializerStringDeserializerbyte[]Bytesbyte[]StringJsonMessageConverterSpring中文文档

在生产者端,当您使用 Spring Integration 或该方法(请参阅使用 KafkaTemplate)时,必须配置与配置的 Kafka 兼容的消息转换器。KafkaTemplate.send(Message<?> message)SerializerSpring中文文档

同样,使用 or 更有效,因为它们避免了 a 到 的转换。byte[]BytesStringbyte[]Spring中文文档

为方便起见,从版本 2.3 开始,该框架还提供了一个可以序列化所有三种值类型的 IT,以便它可以与任何消息转换器一起使用。StringOrBytesSerializerSpring中文文档

调用该方法以将出站转换为属性中的消息有效负载。 调用该方法进行入站转换,有效负载为属性。 调用该方法以创建从传递给(通常由 )的新出站。 同样,在该方法中,在转换器从 创建新的 后,将调用该方法,然后使用新转换的有效负载创建最终的入站消息。 无论哪种情况,如果返回 ,则使用原始消息。KafkaMessageConverter.fromMessage()ProducerRecordProducerRecord.value()KafkaMessageConverter.toMessage()ConsumerRecordConsumerRecord.value()SmartMessageConverter.toMessage()Message<?>MessagefromMessage()KafkaTemplate.send(Message<?> msg)KafkaMessageConverter.toMessage()Message<?>ConsumerRecordSmartMessageConverter.fromMessage()SmartMessageConverternull

ErrorHandlingDeserializer

当反序列化程序无法反序列化消息时,Spring 无法处理该问题,因为它发生在返回之前。 为了解决这个问题,已经引入了。 此解串化程序委托给真正的解串化器(键或值)。 如果委托无法反序列化记录内容,则在包含原因和原始字节的标头中返回一个值和 a。 当您使用记录级别时,如果 包含键或值的标头,则容器的 将使用 failed 调用。 记录不会传递给侦听器。poll()ErrorHandlingDeserializerErrorHandlingDeserializernullDeserializationExceptionMessageListenerConsumerRecordDeserializationExceptionErrorHandlerConsumerRecordSpring中文文档

或者,您可以通过提供 来配置以创建自定义值,该值是 . 调用此函数以创建 的实例,该实例以通常的方式传递给侦听器。 向函数提供一个类型为 的对象,其中包含所有上下文信息。 您可以在标头中找到(作为序列化的 Java 对象)。 有关更多信息,请参见 JavadocErrorHandlingDeserializerfailedDeserializationFunctionFunction<FailedDeserializationInfo, T>TFailedDeserializationInfoDeserializationExceptionErrorHandlingDeserializerSpring中文文档

可以使用构造函数,该构造函数采用键和值对象,并在已使用适当委托配置的适当实例中进行连接。 或者,可以使用使用者配置属性(由 使用 )来实例化委托。 属性名称为 和 。 属性值可以是类或类名。 下面的示例演示如何设置这些属性:DefaultKafkaConsumerFactoryDeserializerErrorHandlingDeserializerErrorHandlingDeserializerErrorHandlingDeserializer.KEY_DESERIALIZER_CLASSErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASSSpring中文文档

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

以下示例使用 .failedDeserializationFunctionSpring中文文档

public class BadThing extends Thing {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

  @Override
  public Thing apply(FailedDeserializationInfo info) {
    return new BadThing(info);
  }

}

前面的示例使用以下配置:Spring中文文档

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果使用者配置了 ,则必须使用 序列化程序为其 及其生产者配置序列化程序,该序列化程序可以处理由反序列化异常导致的正常对象和原始值。 模板的泛型值类型应为 。 一种技术是使用 ;示例如下:ErrorHandlingDeserializerKafkaTemplatebyte[]ObjectDelegatingByTypeSerializer
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

将 an 与批处理侦听器一起使用时,必须检查消息标头中的反序列化异常。 当与 一起使用时,可以使用该标头来确定异常失败的记录,并通过 与错误处理程序通信。ErrorHandlingDeserializerDefaultBatchErrorHandlerBatchListenerFailedExceptionSpring中文文档

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            try {
                DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                        headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
                if (deserEx != null) {
                    logger.error(deserEx, "Record at index " + i + " could not be deserialized");
                }
            }
            catch (Exception ex) {
                logger.error(ex, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

SerializationUtils.byteArrayToDeserializationException()可用于将标头转换为 .DeserializationExceptionSpring中文文档

消费时,改用 :List<ConsumerRecord<?, ?>SerializationUtils.getExceptionFromHeader()Spring中文文档

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}
如果您还使用 ,则为 a 发布的记录将具有 的 类型;这不应该被序列化。 请考虑将配置为使用 for 和普通序列化程序(Json、Avro 等)用于所有其他类型。DeadLetterPublishingRecovererDeserializationExceptionrecord.value()byte[]DelegatingByTypeSerializerByteArraySerializerbyte[]

从版本 3.1 开始,您可以将 a 添加到 . 如果委托成功反序列化对象,但该对象未通过验证,则会引发类似于反序列化异常的异常。 这允许将原始原始数据传递到错误处理程序。 如果自己创建解串器,只需调用;如果使用 Properties 配置序列化程序,请将 Consumer Configuration 属性设置为 . 使用 Spring Boot 时,此属性名称为 。ValidatorErrorHandlingDeserializerDeserializersetValidatorErrorHandlingDeserializer.VALIDATOR_CLASSValidatorspring.kafka.consumer.properties.spring.deserializer.validator.classSpring中文文档

如果使用者配置了 ,则必须使用 序列化程序为其 及其生产者配置序列化程序,该序列化程序可以处理由反序列化异常导致的正常对象和原始值。 模板的泛型值类型应为 。 一种技术是使用 ;示例如下:ErrorHandlingDeserializerKafkaTemplatebyte[]ObjectDelegatingByTypeSerializer
如果您还使用 ,则为 a 发布的记录将具有 的 类型;这不应该被序列化。 请考虑将配置为使用 for 和普通序列化程序(Json、Avro 等)用于所有其他类型。DeadLetterPublishingRecovererDeserializationExceptionrecord.value()byte[]DelegatingByTypeSerializerByteArraySerializerbyte[]

使用批处理侦听器进行有效负载转换

在使用批处理侦听器容器工厂时,还可以使用 within a 来转换批处理消息。 有关更多信息,请参阅序列化、反序列化和消息转换Spring 消息传递转换JsonMessageConverterBatchMessagingMessageConverterSpring中文文档

默认情况下,转换的类型是从侦听器参数推断出来的。 如果使用 a 将其设置为(而不是默认值)进行配置,则转换器将改用标头中的类型信息(如果存在)。 例如,这允许使用接口而不是具体类来声明侦听器方法。 此外,类型转换器支持映射,因此反序列化可以与源不同(只要数据兼容)。 当您使用类级@KafkaListener实例时,这也很有用,在这些实例中,有效负载必须已转换以确定要调用的方法。 以下示例创建使用此方法的 Bean:JsonMessageConverterDefaultJackson2TypeMapperTypePrecedenceTYPE_IDINFERREDSpring中文文档

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}

请注意,要使此功能正常,转换目标的方法签名必须是具有单个泛型参数类型的容器对象,如下所示:Spring中文文档

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

请注意,您仍然可以访问批处理标头。Spring中文文档

如果批处理转换器具有支持它的记录转换器,则还可以接收根据泛型类型转换有效负载的消息列表。 以下示例演示如何执行此操作:Spring中文文档

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
    ...
}

ConversionService定制

从 V2.1.1 开始,默认情况下用于解析用于调用侦听器方法的参数与实现以下任何接口的所有 Bean 一起提供:org.springframework.core.convert.ConversionServiceorg.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactorySpring中文文档

这样,您就可以进一步自定义侦听器反序列化,而无需更改 和 的默认配置。ConsumerFactoryKafkaListenerContainerFactorySpring中文文档

在 through a bean 上设置自定义将禁用此功能。MessageHandlerMethodFactoryKafkaListenerEndpointRegistrarKafkaListenerConfigurer
在 through a bean 上设置自定义将禁用此功能。MessageHandlerMethodFactoryKafkaListenerEndpointRegistrarKafkaListenerConfigurer

将自定义添加到HandlerMethodArgumentResolver@KafkaListener

从版本 2.4.2 开始,您可以添加自己的参数并解析自定义方法参数。 您所需要的只是实现和使用类中的方法。HandlerMethodArgumentResolverKafkaListenerConfigurersetCustomMethodArgumentResolvers()KafkaListenerEndpointRegistrarSpring中文文档

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}

您还可以通过向 Bean 添加自定义来完全替换框架的参数解析。 如果这样做,并且应用程序需要处理逻辑删除记录,则应将逻辑删除记录添加到工厂中(例如,来自压缩主题);它必须是最后一个解析器,因为它支持所有类型,并且可以在没有注释的情况下匹配参数。 如果您使用的是 ,请将此解析程序设置为最后一个自定义解析程序;工厂将确保该解析器将在标准之前使用,该标准不了解有效载荷。MessageHandlerMethodFactoryKafkaListenerEndpointRegistrarnullvalue()KafkaNullAwarePayloadArgumentResolver@PayloadDefaultMessageHandlerMethodFactoryPayloadMethodArgumentResolverKafkaNullSpring中文文档