消息报头
0.11.0.0 客户端引入了对消息中标头的支持。
从版本 2.0 开始, Spring for Apache Kafka 现在支持将这些 Headers 映射到和从。spring-messaging
MessageHeaders
以前的版本映射并映射到 spring-messaging ,其中 value 属性映射到 和其他属性(、 等)映射到 headers。
情况仍然如此,但现在可以映射其他(任意)标头。ConsumerRecord ProducerRecord Message<?> payload topic partition |
Apache Kafka 标头具有一个简单的 API,如以下接口定义所示:
public interface Header {
String key();
byte[] value();
}
提供该策略以在 Kafka 和 之间映射标头条目。
其接口定义如下:KafkaHeaderMapper
Headers
MessageHeaders
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
将原始标头映射为 ,其中包含用于转换为值的配置选项。SimpleKafkaHeaderMapper
byte[]
String
将键映射到报头名称,为了支持出站消息的丰富报头类型,将执行 JSON 转换。
“” 标头(键为 )包含 的 JSON 映射。
此标头在入站端使用,以提供每个标头值到原始类型的适当转换。DefaultKafkaHeaderMapper
MessageHeaders
special
spring_json_header_types
<key>:<type>
在入站端,所有 Kafka 实例都映射到 。
在出站端,默认情况下,除 、 和映射到 properties 的标头外,所有 Headers 都被映射。Header
MessageHeaders
MessageHeaders
id
timestamp
ConsumerRecord
您可以通过向映射器提供模式来指定要为出站消息映射的标头。 下面的清单显示了一些示例映射:
public DefaultKafkaHeaderMapper() { (1)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public DefaultKafkaHeaderMapper(String... patterns) { (3)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
1 | 使用默认 Jackson 并映射大多数 headers,如示例前所述。ObjectMapper |
2 | 使用提供的 Jackson 并映射大多数标头,如示例前所述。ObjectMapper |
3 | 使用默认的 Jackson 并根据提供的模式映射标头。ObjectMapper |
4 | 使用提供的 Jackson 并根据提供的模式映射标头。ObjectMapper |
模式相当简单,可以包含前导通配符 (、尾随通配符或两者 (例如)。
您可以使用前导 .
与标头名称匹配的第一个模式(无论是正的还是负的)获胜。*
*.cat.*
!
当您提供自己的模式时,我们建议包括 和 ,因为这些标头在入站端是只读的。!id
!timestamp
默认情况下,映射器仅反序列化 和 中的类。
您可以通过使用该方法添加受信任的包来信任其他(或所有)包。
如果您收到来自不受信任的来源的消息,您可能希望只添加您信任的那些包。
要信任所有包,您可以使用 .java.lang java.util addTrustedPackages mapper.addTrustedPackages("*") |
在与不知道映射器的 JSON 格式的系统通信时,以原始形式映射标头值非常有用。String |
从版本 2.2.5 开始,您可以指定某些字符串值标头不应使用 JSON 进行映射,而是应映射到/从 raw 进行映射。
具有新属性; 当设置为 true 时,所有字符串值标头都将转换为使用 property (default) 。
此外,还有一个属性 ,它是 的映射 ;如果 Map 包含 Headers 名称,并且 Headers 包含一个值,则将使用 CharSet 将其映射为 Raw。
此 map 还用于当且仅当 map 值中的布尔值为 .
如果布尔值为 ,或者 Headers 名称在映射中没有值,则传入的 Headers 将简单地映射为原始未映射的 Headers。byte[]
AbstractKafkaHeaderMapper
mapAllStringsOut
byte[]
charset
UTF-8
rawMappedHeaders
header name : boolean
String
byte[]
byte[]
String
true
false
true
以下测试用例说明了此机制。
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
默认情况下,两个 Headers 映射器都会映射所有入站 Headers。 从版本 2.8.8 开始,模式也可以应用于入站映射。 要创建用于入站映射的映射器,请在相应的映射器上使用静态方法之一:
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以 开头的标头,并包括所有其他标头。abc
默认情况下,只要 Jackson 在 Classpath 上,就会在 和 中使用 the 。DefaultKafkaHeaderMapper
MessagingMessageConverter
BatchMessagingMessageConverter
使用批处理转换器,转换后的标头以 as a 形式提供,其中列表位置的映射对应于有效负载中的数据位置。KafkaHeaders.BATCH_CONVERTED_HEADERS
List<Map<String, Object>>
如果没有转换器(因为 Jackson 不存在或显式设置为 ),则使用者记录中的 Headers 在 Headers 中提供未转换。
此标头是一个对象(在批处理转换器的情况下为 a),其中列表中的位置对应于有效负载中的数据位置。null
KafkaHeaders.NATIVE_HEADERS
Headers
List<Headers>
某些类型不适合 JSON 序列化,对于这些类型,可能首选简单的序列化。
具有一个名为 method 的方法,它允许您提供应以这种方式处理出站映射的类的名称。
在入站映射期间,它们被映射为 。
默认情况下,只有 和 以这种方式映射。toString() DefaultKafkaHeaderMapper addToStringClasses() String org.springframework.util.MimeType org.springframework.http.MediaType |
从版本 2.3 开始,简化了 String 值 Headers 的处理。
默认情况下,此类标头不再是 JSON 编码的(即它们没有添加 enhalation)。
该类型仍会添加到 JSON_TYPES 标头中,以便接收系统可以转换回 String (from)。
mapper 可以处理(解码)旧版本生成的 headers(它会检查前导);这样,使用 2.3 的应用程序可以使用旧版本中的记录。"..." byte[] " |
为了与早期版本兼容,如果使用 2.3 的版本生成的记录可能被使用早期版本的应用程序使用,请设置为 。
当所有应用程序都使用 2.3 或更高版本时,可以将该属性保留为其默认值 。encodeStrings true false |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,它会自动将此转换器 bean 配置到自动配置的 ;否则,您应该将此转换器添加到模板中。KafkaTemplate