此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

0.11.0.0 客户端引入了对消息中标头的支持。 从版本 2.0 开始,Spring for Apache Kafka 现在支持将这些标头映射到 和 从 .spring-messagingMessageHeadersSpring中文文档

以前的版本映射到 spring-messaging ,其中 value 属性映射到 和 其他属性(、 等)映射到标头。 情况仍然如此,但现在可以映射其他(任意)标头。ConsumerRecordProducerRecordMessage<?>payloadtopicpartition
以前的版本映射到 spring-messaging ,其中 value 属性映射到 和 其他属性(、 等)映射到标头。 情况仍然如此,但现在可以映射其他(任意)标头。ConsumerRecordProducerRecordMessage<?>payloadtopicpartition

Apache Kafka 标头有一个简单的 API,如以下接口定义所示:Spring中文文档

public interface Header {

    String key();

    byte[] value();

}

该策略用于在 Kafka 和 . 其接口定义如下:KafkaHeaderMapperHeadersMessageHeadersSpring中文文档

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

将原始标头映射为 ,并带有用于转换为值的配置选项。SimpleKafkaHeaderMapperbyte[]StringSpring中文文档

将密钥映射到标头名称,并且为了支持出站消息的丰富标头类型,将执行 JSON 转换。 “”标头(键为 )包含 的 JSON 映射。 此标头在入站端使用,以提供将每个标头值适当转换为原始类型的转换。DefaultKafkaHeaderMapperMessageHeadersspecialspring_json_header_types<key>:<type>Spring中文文档

在入站端,所有 Kafka 实例都映射到 . 在出站端,默认情况下,除了 、 和映射到属性的标头外,所有内容都会被映射。HeaderMessageHeadersMessageHeadersidtimestampConsumerRecordSpring中文文档

通过向映射器提供模式,可以指定要为出站消息映射的标头。 以下列表显示了许多示例映射:Spring中文文档

public DefaultKafkaHeaderMapper() { (1)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { (3)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
    ...
}
1 使用默认的 Jackson 并映射大多数标头,如示例前面所述。ObjectMapper
2 使用提供的 Jackson 并映射大多数标头,如示例前面所述。ObjectMapper
3 使用默认的 Jackson 并根据提供的模式映射标头。ObjectMapper
4 使用提供的 Jackson 并根据提供的模式映射标头。ObjectMapper
1 使用默认的 Jackson 并映射大多数标头,如示例前面所述。ObjectMapper
2 使用提供的 Jackson 并映射大多数标头,如示例前面所述。ObjectMapper
3 使用默认的 Jackson 并根据提供的模式映射标头。ObjectMapper
4 使用提供的 Jackson 并根据提供的模式映射标头。ObjectMapper

模式相当简单,可以包含前导通配符 ()、尾随通配符或两者兼而有之(例如,)。 您可以使用前导 来否定模式。 与标头名称匹配的第一个模式(无论是正数还是负数)获胜。**.cat.*!Spring中文文档

当您提供自己的模式时,我们建议包括 和 ,因为这些标头在入站端是只读的。!id!timestampSpring中文文档

默认情况下,映射器仅反序列化 和 中的类。 通过使用该方法添加受信任的包,可以信任其他(或所有)包。 如果收到来自不受信任来源的消息,您可能希望仅添加信任的那些包。 要信任所有包,可以使用 .java.langjava.utiladdTrustedPackagesmapper.addTrustedPackages("*")
默认情况下,映射器仅反序列化 和 中的类。 通过使用该方法添加受信任的包,可以信任其他(或所有)包。 如果收到来自不受信任来源的消息,您可能希望仅添加信任的那些包。 要信任所有包,可以使用 .java.langjava.utiladdTrustedPackagesmapper.addTrustedPackages("*")
在与不知道映射器的 JSON 格式的系统通信时,以原始形式映射标头值非常有用。String
在与不知道映射器的 JSON 格式的系统通信时,以原始形式映射标头值非常有用。String

从版本 2.2.5 开始,您可以指定某些字符串值标头不应使用 JSON 映射,而应与原始 . 具有新属性; 设置为 true 时,所有字符串值标头都将转换为 using the property (default )。 此外,还有一个属性,它是 的映射;如果映射包含标头名称,并且标头包含值,则将使用 charset 将其映射为 RAW格式。 此映射还用于将原始传入标头映射到使用字符集,当且仅当映射值中的布尔值为 。 如果布尔值为 ,或者标头名称不在具有值的映射中,则传入标头将简单地映射为未映射的原始标头。byte[]AbstractKafkaHeaderMappermapAllStringsOutbyte[]charsetUTF-8rawMappedHeadersheader name : booleanStringbyte[]byte[]StringtruefalsetrueSpring中文文档

以下测试用例说明了此机制。Spring中文文档

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

默认情况下,两个标头映射器都映射所有入站标头。 从版本 2.8.8 开始,这些模式也可以应用于入站映射。 若要为入站映射创建映射器,请在相应的映射器上使用静态方法之一:Spring中文文档

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

这将排除所有以 开头并包括所有其他标头。abcSpring中文文档

默认情况下,只要 Jackson 在类路径上,就会在 和 中使用 。DefaultKafkaHeaderMapperMessagingMessageConverterBatchMessagingMessageConverterSpring中文文档

使用批处理转换器,转换后的标题在 as 中可用,其中列表位置的映射对应于有效负载中的数据位置。KafkaHeaders.BATCH_CONVERTED_HEADERSList<Map<String, Object>>Spring中文文档

如果没有转换器(因为 Jackson 不存在或显式设置为 ),则使用者记录中的标头将在标头中提供未转换的标头。 此标头是一个对象(在批处理转换器的情况下为 a),其中列表中的位置对应于有效负载中的数据位置。nullKafkaHeaders.NATIVE_HEADERSHeadersList<Headers>Spring中文文档

某些类型不适合 JSON 序列化,对于这些类型,简单序列化可能是首选。 该方法称为,可用于提供应以这种方式处理出站映射的类的名称。 在入站映射期间,它们映射为 . 默认情况下,只有 和 以这种方式映射。toString()DefaultKafkaHeaderMapperaddToStringClasses()Stringorg.springframework.util.MimeTypeorg.springframework.http.MediaType
某些类型不适合 JSON 序列化,对于这些类型,简单序列化可能是首选。 该方法称为,可用于提供应以这种方式处理出站映射的类的名称。 在入站映射期间,它们映射为 . 默认情况下,只有 和 以这种方式映射。toString()DefaultKafkaHeaderMapperaddToStringClasses()Stringorg.springframework.util.MimeTypeorg.springframework.http.MediaType
从版本 2.3 开始,简化了对 String 值标头的处理。 默认情况下,此类标头不再进行 JSON 编码(即它们没有添加封闭)。 该类型仍会添加到 JSON_TYPES 标头中,以便接收系统可以转换回 String (from )。 映射器可以处理(解码)旧版本生成的标头(它检查前导);这样,使用 2.3 的应用程序可以使用旧版本中的记录。"..."byte[]"
从版本 2.3 开始,简化了对 String 值标头的处理。 默认情况下,此类标头不再进行 JSON 编码(即它们没有添加封闭)。 该类型仍会添加到 JSON_TYPES 标头中,以便接收系统可以转换回 String (from )。 映射器可以处理(解码)旧版本生成的标头(它检查前导);这样,使用 2.3 的应用程序可以使用旧版本中的记录。"..."byte[]"
若要与早期版本兼容,请设置为 ,如果使用 2.3 的版本生成的记录可能会被使用早期版本的应用程序使用。 当所有应用程序都使用 2.3 或更高版本时,可以将该属性保留为其默认值 。encodeStringstruefalse
若要与早期版本兼容,请设置为 ,如果使用 2.3 的版本生成的记录可能会被使用早期版本的应用程序使用。 当所有应用程序都使用 2.3 或更高版本时,可以将该属性保留为其默认值 。encodeStringstruefalse
@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

如果使用 Spring Boot,它会自动将此转换器 Bean 配置成自动配置的 ;否则,您应该将此转换器添加到模板中。KafkaTemplateSpring中文文档