4. 参考资料
参考文档的这一部分详细介绍了构成 Spring for Apache Kafka 的各种组件。 主要章节介绍了使用 Spring 开发 Kafka 应用程序的核心类。
4.1. 使用 Spring for Apache Kafka
本节详细解释了影响使用 Spring for Apache Kafka 的各种问题。 有关快速但不太详细的介绍,请参阅 快速浏览。
4.1.1. 连接到 Kafka
从版本 2.5 开始,它们中的每一个都扩展了 .
这允许在运行时通过向其配置中添加 a 来更改引导服务器: .
将对所有新连接调用此函数以获取服务器列表。
Consumer 和 Producer 通常存在很长时间。
要关闭现有 Producer,请调用 .
要关闭现有的 Consumers,请在 and/or 和任何其他侦听器容器 bean 上调用 (然后 )。KafkaResourceFactory
Supplier<String>
setBootstrapServersSupplier(() → …)
reset()
DefaultKafkaProducerFactory
stop()
start()
KafkaListenerEndpointRegistry
stop()
start()
为方便起见,该框架还提供了一个支持两组引导服务器的 Bootstrap Server;其中 1 个随时处于活动状态。
配置 并将其添加到生产者和使用者工厂,以及 ,通过调用 .
当你想切换时,打电话 or 和 call 生产者工厂建立新的连接;对于使用者和所有侦听器容器。
当使用 s 和 bean.ABSwitchCluster
ABSwitchCluster
KafkaAdmin
setBootstrapServersSupplier()
primary()
secondary()
reset()
stop()
start()
@KafkaListener
stop()
start()
KafkaListenerEndpointRegistry
有关更多信息,请参阅 Javadocs。
Factory 侦听器
从版本 2.5 开始,可以将 和 配置为在创建或关闭 producer 或 consumer 时接收通知。DefaultKafkaProducerFactory
DefaultKafkaConsumerFactory
Listener
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,都是通过将属性(从创建后获得)附加到 factory 属性来创建的,以 .id
client-id
metrics()
beanName
.
例如,这些侦听器可用于在创建新 Client 端时创建和绑定 Micrometer 实例(并在 Client 端关闭时关闭它)。KafkaClientMetrics
该框架提供的侦听器正是这样做的;请参阅 Micrometer 本机度量。
4.1.2. 配置 Topic
如果您在应用程序上下文中定义了一个 Bean,它可以自动向代理添加主题。
为此,您可以向应用程序上下文添加 for each 主题。
版本 2.3 引入了一个新类,使创建此类 bean 更加方便。
以下示例显示了如何执行此操作:KafkaAdmin
NewTopic
@Bean
TopicBuilder
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))
@Bean
fun topic1() =
TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build()
@Bean
fun topic2() =
TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
@Bean
fun topic3() =
TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
从版本 2.6 开始,您可以省略 and/or,并且代理默认值将应用于这些属性。
代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅 KIP-464。.partitions()
replicas()
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()
@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()
@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
从版本 2.7 开始,你可以在单个 bean 定义中声明多个 s:NewTopic
KafkaAdmin.NewTopics
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build()
)
使用 Spring Boot 时,会自动注册一个 bean,因此您只需要(和/或)s。KafkaAdmin NewTopic NewTopics @Bean |
默认情况下,如果代理不可用,则会记录一条消息,但会继续加载上下文。
您可以以编程方式调用 admin 的方法,以便稍后重试。
如果您希望将此情况视为致命情况,请将 admin 的属性设置为 。
然后,上下文无法初始化。initialize()
fatalIfBrokerNotAvailable
true
如果 broker 支持它(1.0.0 或更高版本),则如果发现现有主题的分区数少于 .NewTopic.numPartitions |
从版本 2.7 开始,提供了在运行时创建和检查主题的方法。KafkaAdmin
-
createOrModifyTopics
-
describeTopics
有关更高级的功能,您可以直接使用 。
以下示例显示了如何执行此操作:AdminClient
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
4.1.3. 发送消息
本节介绍如何发送消息。
用KafkaTemplate
本节介绍如何使用 发送消息。KafkaTemplate
概述
它包装了一个生产者,并提供了将数据发送到 Kafka 主题的便捷方法。
下面的清单显示了 :KafkaTemplate
KafkaTemplate
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
有关更多详细信息,请参阅 Javadoc。
API 要求已向模板提供默认主题。sendDefault
API 将 a 作为参数,并将此时间戳存储在记录中。
用户提供的时间戳的存储方式取决于在 Kafka 主题上配置的时间戳类型。
如果主题配置为使用 ,则会记录用户指定的时间戳(如果未指定,则生成)。
如果主题配置为使用 ,则忽略用户指定的时间戳,并且代理会添加本地代理时间。timestamp
CREATE_TIME
LOG_APPEND_TIME
要使用该模板,您可以配置生产者工厂并在模板的构造函数中提供它。 以下示例显示了如何执行此操作:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
从版本 2.5 开始,您现在可以覆盖工厂的属性,以从同一工厂创建具有不同生产者配置的模板。ProducerConfig
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
请注意,可以使用不同的狭窄泛型类型引用类型的 bean(例如由 Spring Boot 自动配置的 bean)。ProducerFactory<?, ?>
您还可以使用标准定义配置模板。<bean/>
然后,要使用模板,您可以调用其方法之一。
当您将方法与参数一起使用时,主题、分区和密钥信息将在消息标头中提供,其中包括以下项目:Message<?>
-
KafkaHeaders.TOPIC
-
KafkaHeaders.PARTITION_ID
-
KafkaHeaders.MESSAGE_KEY
-
KafkaHeaders.TIMESTAMP
消息有效负载是数据。
(可选)您可以使用 a 配置 ,以获取包含发送结果(成功或失败)的异步回调,而不是等待 完成。
下面的清单显示了接口的定义:KafkaTemplate
ProducerListener
Future
ProducerListener
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
默认情况下,模板配置了 ,该模板会记录错误,并在发送成功时不执行任何操作。LoggingProducerListener
为方便起见,如果您只想实现其中一种方法,则提供了默认方法实现。
请注意,send 方法返回一个 .
您可以向侦听器注册回调,以异步接收发送结果。
以下示例显示了如何执行此操作:ListenableFuture<SendResult>
ListenableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
SendResult
具有两个属性 a 和 。
有关这些对象的信息,请参阅 Kafka API 文档。ProducerRecord
RecordMetadata
in 可以转换为 ;其属性包含 Failed 记录。Throwable
onFailure
KafkaProducerException
failedProducerRecord
从版本 2.5 开始,您可以使用 a 而不是 a ,从而更容易提取失败的 ,而无需强制转换 :KafkaSendCallback
ListenableFutureCallback
ProducerRecord
Throwable
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(KafkaProducerException ex) {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
}
});
您还可以使用一对 lambda:
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(result -> {
...
}, (KafkaFailureCallback<Integer, String>) ex -> {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
});
如果你希望阻塞发送线程等待结果,你可以调用 future 的方法;建议使用带有 timeout 的方法。
您可能希望在等待之前调用,或者为方便起见,模板具有一个构造函数,该构造函数带有一个参数,该参数会导致模板在每次发送时调用。
仅当您设置了 producer 属性并希望立即发送部分批处理时,才需要刷新。get()
flush()
autoFlush
flush()
linger.ms
例子
本节介绍向 Kafka 发送消息的示例:
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
handleSuccess(data);
}
@Override
public void onFailure(KafkaProducerException ex) {
handleFailure(data, record, ex);
}
});
}
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
请注意,原因在于属性。ExecutionException
KafkaProducerException
failedProducerRecord
用RoutingKafkaTemplate
从版本 2.5 开始,您可以使用 a 在运行时根据目标名称选择生产者。RoutingKafkaTemplate
topic
路由模板不支持 transactions、 、 或 操作,因为这些操作的主题未知。execute flush metrics |
该模板需要 的 映射到实例。
此 map 应排序(例如 a ),因为它是按顺序遍历的;您应该在开始时添加更具体的模式。java.util.regex.Pattern
ProducerFactory<Object, Object>
LinkedHashMap
以下简单的 Spring Boot 应用程序提供了一个示例,说明如何使用同一模板发送到不同的主题,每个主题使用不同的值序列化器。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
此示例的相应 显示在 Annotation Properties 中。@KafkaListener
有关实现类似结果的另一种技术,但具有将不同类型的发送到同一主题的附加功能,请参阅 Delegating Serializer 和 Deserializer。
用DefaultKafkaProducerFactory
如使用 KafkaTemplate
所示,a 用于创建生产者。ProducerFactory
如果不使用 Transactions,默认情况下,它会创建一个供所有客户端使用的单例 producer,如 javadocs 中所建议的那样。
但是,如果您调用模板,这可能会导致使用同一生产者的其他线程出现延迟。
从版本 2.3 开始,它有一个新属性 。
当设置为 时,工厂将为每个线程创建(并缓存)一个单独的 producer,以避免此问题。DefaultKafkaProducerFactory
KafkaProducer
flush()
DefaultKafkaProducerFactory
producerPerThread
true
When is 时,当不再需要 producer 时,用户代码必须在工厂上调用。
这将以物理方式关闭生产者并将其从 .
调用 or 不会清理这些生产者。producerPerThread true closeThreadBoundProducer() ThreadLocal reset() destroy() |
另请参阅 KafkaTemplate
事务性和非事务性发布。
创建时,可以通过调用仅接受属性Map的构造函数(请参阅使用KafkaTemplate
中的示例)从配置中选取键和/或值类,或者可以将实例传递给构造函数(在这种情况下,所有s共享相同的实例)。
或者,您可以提供 s (从版本 2.3 开始),该 s 将用于为每个实例获取单独的实例:DefaultKafkaProducerFactory
Serializer
Serializer
DefaultKafkaProducerFactory
Producer
Supplier<Serializer>
Serializer
Producer
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
从版本 2.5.10 开始,您现在可以在创建工厂后更新 producer 属性。
这可能很有用,例如,如果您必须在凭证更改后更新 SSL 密钥/信任存储位置。
这些更改不会影响现有的生产者实例;调用 以关闭任何现有 Producer,以便使用新属性创建新的 Producer。
注意:您不能将事务性生产者工厂更改为非事务性工厂,反之亦然。reset()
现在提供了两种新方法:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
从版本 2.8 开始,如果你提供序列化器作为对象(在构造函数中或通过 setter),工厂将调用该方法以使用配置属性配置它们。configure()
用ReplyingKafkaTemplate
版本 2.1.3 引入了一个子类 of 来提供请求/回复语义。
该类已命名,并且具有两个附加方法;方法签名如下:KafkaTemplate
ReplyingKafkaTemplate
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
(另请参阅 请求/回复消息<?>
s)。
结果是异步填充结果(或异常,用于超时)。
结果还有一个属性,它是调用 .
您可以使用此 future 来确定 send 操作的结果。ListenableFuture
sendFuture
KafkaTemplate.send()
如果使用第一种方法,或者参数为 ,则使用模板的属性(默认为 5 秒)。replyTimeout
null
defaultReplyTimeout
从版本 2.8.8 开始,模板具有新方法 。
如果 reply 容器配置为避免在初始化容器之前发送请求和发送回复,则这非常有用。waitForAssignment
auto.offset.reset=latest
当使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的属性,因为只有在第一次轮询完成之后才会发送通知。pollTimeout |
以下 Spring Boot 应用程序显示了如何使用该功能的示例:
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
请注意,我们可以使用 Boot 的自动配置的容器工厂来创建回复容器。
如果 non-tanvial deserializer 用于回复,请考虑使用 ErrorHandlingDeserializer
委托给配置的 deserializer。
配置后,将异常完成,您可以捕获 ,并在其属性中使用 。RequestReplyFuture
ExecutionException
DeserializationException
cause
从版本 2.6.7 开始,除了检测 s 之外,模板还将调用该函数(如果提供)。
如果它返回异常,则 future 将异常完成。DeserializationException
replyErrorChecker
下面是一个示例:
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
模板设置一个 header(默认命名),该 header 必须由服务器端回显。KafkaHeaders.CORRELATION_ID
在这种情况下,以下应用程序会响应:@KafkaListener
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
基础架构回显相关 ID 并确定回复主题。@KafkaListener
有关发送回复的更多信息,请参阅使用 @SendTo
转发侦听器结果。
该模板使用默认标头来指示回复转到的主题。KafKaHeaders.REPLY_TOPIC
从版本 2.2 开始,模板会尝试从配置的回复容器中检测回复主题或分区。
如果容器配置为侦听单个主题或单个 ,则用于设置回复标头。
如果容器配置为其他方式,则用户必须设置回复标头。
在这种情况下,将在初始化期间写入日志消息。
以下示例使用 :TopicPartitionOffset
INFO
KafkaHeaders.REPLY_TOPIC
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
使用单个回复进行配置时,只要每个实例侦听不同的分区,就可以对多个模板使用相同的回复主题。
使用单个回复主题进行配置时,每个实例都必须使用不同的 .
在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例会找到相关 ID。
这对于自动扩展可能很有用,但会产生额外网络流量的开销,并且丢弃每个不需要的回复的成本很小。
使用此设置时,我们建议您将模板的 设置为 ,这样可以降低对 DEBUG 而不是默认 ERROR 的意外回复的日志记录级别。TopicPartitionOffset
group.id
sharedReplyTopic
true
以下是配置回复容器以使用相同的共享回复主题的示例:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
如果您有多个客户端实例,并且未按照上一段所述配置它们,则每个实例都需要一个专用的回复主题。
另一种方法是为每个实例设置 并使用专用分区。
它包含一个四字节的 int (big-endian)。
服务器必须使用此标头将回复路由到正确的分区( 执行此操作)。
但是,在这种情况下,回复容器不能使用 Kafka 的组管理功能,并且必须配置为侦听固定分区(在其构造函数中使用 a)。KafkaHeaders.REPLY_PARTITION Header @KafkaListener TopicPartitionOffset ContainerProperties |
要求 Jackson 位于 Classpath 上(对于 )。
如果不可用,则消息转换器没有 Headers 映射器,因此您必须使用 ,如前所述。DefaultKafkaHeaderMapper @KafkaListener MessagingMessageConverter SimpleKafkaHeaderMapper |
默认情况下,使用 3 个标头:
-
KafkaHeaders.CORRELATION_ID
- 用于将回复与请求相关联 -
KafkaHeaders.REPLY_TOPIC
- 用于告诉服务器在哪里回复 -
KafkaHeaders.REPLY_PARTITION
- (可选)用于告诉服务器要回复哪个分区
基础设施使用这些标头名称来路由回复。@KafkaListener
从版本 2.3 开始,您可以自定义标头名称 - 模板具有 3 个属性、 、 和 。
如果您的服务器不是 Spring 应用程序(或不使用 ),这将非常有用。correlationHeaderName
replyTopicHeaderName
replyPartitionHeaderName
@KafkaListener
请求/回复Message<?>
版本 2.7 向 send 和 receive 的抽象添加了方法:ReplyingKafkaTemplate
spring-messaging
Message<?>
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
这些将使用模板的 default ,还有一些重载版本可能会在方法调用中超时。replyTimeout
如果使用者或模板可以通过回复消息中的配置或类型元数据转换有效负载,而无需任何其他信息,请使用第一种方法。Deserializer
MessageConverter
如果需要为 return 类型提供类型信息,请使用第二种方法来帮助消息转换器。 这也允许同一个模板接收不同的类型,即使回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。 以下是后者的示例:
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
@Bean
fun template(
pf: ProducerFactory<String?, String>?,
factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
val template = ReplyingKafkaTemplate(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.defaultTopic = "requests"
return template
}
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())
val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
回复类型 message<?>
当 返回 ,且版本低于 2.5 时,必须填充回复主题和相关 ID 标头。
在此示例中,我们使用请求中的回复主题标头:@KafkaListener
Message<?>
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
这还显示了如何在回复记录上设置键。
从版本 2.5 开始,框架将检测是否缺少这些 Headers 并使用主题填充它们 - 根据值确定的主题或传入的 Headers(如果存在)。
它还将回显传入的 和 ,如果存在。@SendTo
KafkaHeaders.REPLY_TOPIC
KafkaHeaders.CORRELATION_ID
KafkaHeaders.REPLY_PARTITION
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.build();
}
聚合多个回复
使用 ReplyingKafkaTemplate
中的模板严格适用于单个请求/回复场景。
如果一条消息的多个接收者返回回复,您可以使用 .
这是 Scatter-Gather Enterprise Integration Pattern 的客户端实现。AggregatingReplyingKafkaTemplate
与 一样,构造函数采用 producer 工厂和侦听器容器来接收回复;它有第三个参数,每次收到回复时都会查询该参数;当谓词返回时,s 的集合用于完成该方法返回的 s。ReplyingKafkaTemplate
AggregatingReplyingKafkaTemplate
BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy
true
ConsumerRecord
Future
sendAndReceive
还有一个附加属性 (default false)。
当此设置为 时,而不是使用 完成 future ,部分结果将正常完成 future(只要至少收到一条回复记录)。returnPartialOnTimeout
true
KafkaReplyTimeoutException
从版本 2.3.5 开始,谓词也会在超时后调用(如果为 )。
第一个参数是当前记录列表;第二个原因是此调用是否由于超时。
谓词可以修改记录列表。returnPartialOnTimeout
true
true
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
请注意,返回类型是 a,其值是 s 的集合。
“外部”不是“真实”记录,它由模板合成,作为为请求收到的实际回复记录的持有者。
当发生正常发布时(发布策略返回 true),主题设置为 ;如果为 true,并且发生超时(并且至少收到了一条回复记录),则主题将设置为 。
该模板为这些 “topic” 名称提供常量静态变量:ConsumerRecord
ConsumerRecord
ConsumerRecord
aggregatedResults
returnPartialOnTimeout
partialResultsAfterTimeout
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
中的 real s 包含从中接收回复的实际主题。ConsumerRecord
Collection
回复的侦听器容器必须配置为 或 ;consumer 属性必须是(自 2.3 版以来的默认值)。
为避免任何丢失消息的可能性,模板仅在未完成请求为零时(即当 release 策略释放最后一个未完成的请求时)提交 offsets。
重新平衡后,可能会出现重复的回复投放;对于任何正在进行的请求,这些都将被忽略;当收到已发布回复的重复回复时,您可能会看到错误日志消息。AckMode.MANUAL AckMode.MANUAL_IMMEDIATE enable.auto.commit false |
如果你将 ErrorHandlingDeserializer 与此聚合模板一起使用,框架将不会自动检测 s。
相反,记录(带有值)将完整返回,但 Headers 中存在反序列化异常。
建议应用程序调用 utility 方法方法来确定是否发生了反序列化异常。
有关更多信息,请参阅其 javadocs。
此聚合模板也不要求 the;您应该对回复的每个元素执行检查。DeserializationException null ReplyingKafkaTemplate.checkDeserialization() replyErrorChecker |
4.1.4. 接收消息
您可以通过配置 并提供消息侦听器或使用注释来接收消息。MessageListenerContainer
@KafkaListener
消息侦听器
使用消息侦听器容器时,必须提供侦听器以接收数据。 消息侦听器目前支持 8 个接口。 下面的清单显示了这些接口:
public interface MessageListener<K, V> { (1)
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> { (2)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (3)
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (4)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> { (5)
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> { (6)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (7)
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (8)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
1 | 使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 使用者操作接收的单个实例。ConsumerRecord poll() |
2 | 使用其中一个手动提交方法时,使用此接口处理从 Kafka 使用者操作接收的单个实例。ConsumerRecord poll() |
3 | 使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 使用者操作接收的单个实例。
提供对对象的访问。ConsumerRecord poll() Consumer |
4 | 使用其中一个手动提交方法时,使用此接口处理从 Kafka 使用者操作接收的单个实例。
提供对对象的访问。ConsumerRecord poll() Consumer |
5 | 使用此接口可在使用自动提交或容器管理的提交方法之一时处理从 Kafka 使用者操作收到的所有实例。 在使用此接口时不受支持,因为侦听器将获得完整的批处理。ConsumerRecord poll() AckMode.RECORD |
6 | 使用手动提交方法之一时,使用此接口处理从 Kafka 使用者操作收到的所有实例。ConsumerRecord poll() |
7 | 使用此接口可在使用自动提交或容器管理的提交方法之一时处理从 Kafka 使用者操作收到的所有实例。 在使用此接口时不受支持,因为侦听器将获得完整的批处理。
提供对对象的访问。ConsumerRecord poll() AckMode.RECORD Consumer |
8 | 使用手动提交方法之一时,使用此接口处理从 Kafka 使用者操作收到的所有实例。
提供对对象的访问。ConsumerRecord poll() Consumer |
该对象不是线程安全的。
您只能在调用侦听器的线程上调用其方法。Consumer |
您不应在侦听器中执行任何影响使用者位置和/或提交偏移量的方法;容器需要管理此类信息。Consumer<?, ?> |
消息侦听器容器
提供了两种实现:MessageListenerContainer
-
KafkaMessageListenerContainer
-
ConcurrentMessageListenerContainer
从单个线程上的所有主题或分区接收所有消息。
委托给一个或多个实例以提供多线程使用。KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
从版本 2.2.7 开始,你可以向侦听器容器添加 ;它将在调用侦听器之前调用,以允许检查或修改记录。
如果侦听器返回 null,则不会调用侦听器。
从版本 2.7 开始,它有额外的方法,这些方法在 listener 退出后调用(通常,或通过抛出异常)。
此外,从版本 2.7 开始,现在有一个 ,为 Batch 侦听器提供类似的功能。
此外,(和 ) 提供对 .
例如,这可能用于访问侦听器中的使用者指标。RecordInterceptor
BatchInterceptor
ConsumerAwareRecordInterceptor
BatchInterceptor
Consumer<?, ?>
你不应该在这些拦截器中执行任何影响消费者位置和/或提交偏移量的方法;容器需要管理此类信息。 |
如果拦截器更改记录(通过创建新记录),则 , , 和 必须保持不变,以避免意外的副作用,例如记录丢失。topic partition offset |
和 可用于调用多个拦截器。CompositeRecordInterceptor
CompositeBatchInterceptor
默认情况下,从版本 2.8 开始,当使用事务时,在事务开始之前调用拦截器。
你可以将侦听器容器的属性设置为在事务启动后调用侦听器。interceptBeforeTx
false
从版本 2.3.8、2.4.6 开始,现在支持在并发大于 1 时进行静态成员资格。
的后缀为 with starting at 。
这与 increased 一起可用于减少再平衡事件,例如,当应用程序实例重新启动时。ConcurrentMessageListenerContainer
group.instance.id
-n
n
1
session.timeout.ms
用KafkaMessageListenerContainer
以下构造函数可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它接收对象中有关主题和分区以及其他配置的信息。 具有以下构造函数:ConsumerFactory
ContainerProperties
ContainerProperties
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数采用一个参数数组来显式指示容器要使用哪些分区(使用 consumer 方法)并使用可选的初始偏移量。
默认情况下,正值是绝对偏移量。
默认情况下,负值是相对于分区中的当前最后一个偏移量。
为此,提供了一个额外的参数。
如果为 ,则初始偏移量(正或负)相对于此使用者的当前位置。
偏移量在容器启动时应用。
第二个选项采用一系列主题,Kafka 根据属性分配分区 — 在组中分配分区。
第三个版本使用正则表达式来选择主题。TopicPartitionOffset
assign()
TopicPartitionOffset
boolean
true
group.id
Pattern
要将 a 分配给容器,您可以在创建 Container 时使用该方法。
以下示例显示了如何执行此操作:MessageListener
ContainerProps.setMessageListener
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请注意,在创建 时,使用仅接受上述属性的构造函数意味着从配置中获取键和值类。
或者,可以将实例传递给 key 和/或 value 的构造函数,在这种情况下,所有 Consumer 共享相同的实例。
另一种选择是提供 s (从版本 2.3 开始),该 s 将用于为每个实例获取单独的实例:DefaultKafkaConsumerFactory
Deserializer
Deserializer
DefaultKafkaConsumerFactory
Supplier<Deserializer>
Deserializer
Consumer
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有关您可以设置的各种属性的更多信息,请参阅 Javadoc。ContainerProperties
从版本 2.1.1 开始,一个名为 .
启用 and logging 后,每个侦听器容器都会写入一条日志消息,汇总其配置属性。logContainerConfig
true
INFO
默认情况下,主题偏移提交的日志记录在日志记录级别执行。
从版本 2.1.2 开始,called 中的属性允许您指定这些消息的日志级别。
例如,要将日志级别更改为 ,可以使用 。DEBUG
ContainerProperties
commitLogLevel
INFO
containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
从版本 2.2 开始,添加了一个名为 的新容器属性 (default: 自 2.3.4 起)。
如果代理上不存在任何已配置的主题,这将阻止容器启动。
如果容器配置为侦听主题模式 (regex),则它不适用。
以前,容器线程在方法中循环,等待主题出现,同时记录许多消息。
除了日志之外,没有迹象表明存在问题。missingTopicsFatal
false
consumer.poll()
从版本 2.8 开始,引入了一个新的 container 属性。
这会导致容器在从 .
例如,当已配置的用户被拒绝读取特定主题的访问权限或凭证不正确时,可能会发生这种情况。
定义允许容器在授予适当权限时恢复。authExceptionRetryInterval
AuthenticationException
AuthorizationException
KafkaConsumer
authExceptionRetryInterval
默认情况下,未配置间隔 - 身份验证和授权错误被视为致命错误,这会导致容器停止。 |
从版本 2.8 开始,在创建 consumer 工厂时,如果你提供反序列化器作为对象(在构造函数中或通过 setter),工厂将调用该方法以使用配置属性配置它们。configure()
用ConcurrentMessageListenerContainer
单个构造函数类似于构造函数。
下面的清单显示了构造函数的签名:KafkaListenerContainer
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还有一个属性。
例如,创建 3 个实例。concurrency
container.setConcurrency(3)
KafkaMessageListenerContainer
对于第一个构造函数,Kafka 使用其组管理功能在使用者之间分配分区。
侦听多个主题时,默认分区分布可能不是您所期望的。
例如,如果您有 3 个主题,每个主题有 5 个分区,并且您想要使用 ,则您只会看到 5 个活动使用者,每个使用者都从每个主题分配了一个分区,而其他 10 个使用者处于空闲状态。
这是因为默认的 Kafka 是(参见其 Javadoc)。
对于此方案,您可能需要考虑使用 instead,它将分区分布到所有使用者之间。
然后,为每个使用者分配一个主题或分区。
要更改 ,可以在提供给 的属性中设置 consumer 属性 () 。 使用 Spring Boot 时,可以按如下方式分配 set 策略:
|
当容器属性配置为 s 时,将在委托实例之间分配实例。TopicPartitionOffset
ConcurrentMessageListenerContainer
TopicPartitionOffset
KafkaMessageListenerContainer
例如,如果提供了 6 个实例,并且 is ;每个容器获取两个分区。
对于五个实例,两个容器获得两个分区,第三个容器获得一个分区。
如果 大于 的数量,则向下调整 ,以便每个容器获得一个分区。TopicPartitionOffset
concurrency
3
TopicPartitionOffset
concurrency
TopicPartitions
concurrency
该属性(如果已设置)将附加 where is the consumer instance that he correspond to the concurrency.
启用 JMX 时,需要为 MBean 提供唯一名称。client.id -n n |
从版本 1.3 开始,提供对底层 .
如果为 ,该方法将返回所有目标实例的指标。
这些量度按为基础提供的 分组到 中。MessageListenerContainer
KafkaConsumer
ConcurrentMessageListenerContainer
metrics()
KafkaMessageListenerContainer
Map<MetricName, ? extends Metric>
client-id
KafkaConsumer
从版本 2.3 开始,它提供了一个选项,可以让侦听器容器中的主循环在两次调用之间休眠。
从提供的选项中选择实际休眠间隔作为最小值,以及使用者配置与当前记录批处理时间之间的差值。ContainerProperties
idleBetweenPolls
KafkaConsumer.poll()
max.poll.interval.ms
提交偏移量
提供了几个选项来提交偏移量。
如果 consumer 属性为 ,Kafka 会根据其配置自动提交偏移量。
如果是 ,则容器支持多个设置(在下一个列表中描述)。
默认值为 .
从版本 2.3 开始,除非在配置中明确设置,否则框架将设置为 。
以前,如果未设置该属性,则使用 Kafka default ()。enable.auto.commit
true
false
AckMode
AckMode
BATCH
enable.auto.commit
false
true
consumer 方法返回一个或多个 .
为每条记录调用 the。
以下列表描述了容器对每个容器执行的操作(当未使用事务时):poll()
ConsumerRecords
MessageListener
AckMode
-
RECORD
:当侦听器在处理记录后返回时提交偏移量。 -
BATCH
:当 返回的所有记录都已处理完毕后,提交偏移量。poll()
-
TIME
:当 返回的所有记录都已处理完时,只要超过自上次提交以来的 时间,就提交偏移量。poll()
ackTime
-
COUNT
:只要自上次提交以来已收到记录,则在处理完 返回的所有记录后提交偏移量。poll()
ackCount
-
COUNT_TIME
:类似于 和 ,但如果任一条件为 ,则执行提交。TIME
COUNT
true
-
MANUAL
:消息侦听器负责 。 之后,应用相同的语义。acknowledge()
Acknowledgment
BATCH
-
MANUAL_IMMEDIATE
:当侦听器调用该方法时,立即提交偏移量。Acknowledgment.acknowledge()
使用事务时,偏移量将发送到事务,语义等效于 或 ,具体取决于侦听器类型(记录或批处理)。RECORD
BATCH
MANUAL ,并要求侦听器为 an 或 a 。
请参见消息侦听器。MANUAL_IMMEDIATE AcknowledgingMessageListener BatchAcknowledgingMessageListener |
根据容器属性,使用使用者上的 or 方法。 是默认的;另请参阅 。
请参阅获取异步提交的结果;默认回调是记录错误(以及调试级别的成功)的回调。syncCommits
commitSync()
commitAsync()
syncCommits
true
setSyncCommitTimeout
setCommitCallback
LoggingCommitCallback
因为侦听器容器有自己的提交偏移量的机制,所以它更喜欢 Kafka 是。
从版本 2.3 开始,它会无条件地将其设置为 false,除非在 consumer 工厂中特别设置或容器的 consumer 属性覆盖。ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
false
有以下方法:Acknowledgment
public interface Acknowledgment {
void acknowledge();
}
此方法使侦听器可以控制何时提交偏移量。
从版本 2.3 开始,该接口有两个额外的方法 和 .
第一个用于记录侦听器,第二个用于批处理侦听器。
为您的侦听器类型调用错误的方法将引发 .Acknowledgment
nack(long sleep)
nack(int index, long sleep)
IllegalStateException
如果要提交部分批处理,使用 时,使用事务时,将 设置为 ;调用会将成功处理的记录的偏移量发送到事务。nack() AckMode MANUAL nack() |
nack() 只能在调用侦听器的使用者线程上调用。 |
nack() 在使用 Out of Order Commits 时不允许。 |
使用记录侦听器时,调用时,将提交任何待处理的偏移量,丢弃上次轮询的剩余记录,并在其分区上执行查找,以便在下一次重新交付失败的记录和未处理的记录。
通过设置参数,可以在重新交付之前暂停使用者。
这与在容器配置了 .nack()
poll()
sleep
DefaultErrorHandler
使用批处理侦听器时,您可以指定发生故障的批处理中的索引。
调用时,将在索引之前为记录提交偏移量,并在失败和丢弃的记录的分区上执行查找,以便在下一个 .nack()
poll()
有关更多信息,请参阅容器错误处理程序。
使用者在休眠期间暂停,以便我们继续轮询 broker 以保持使用者处于活动状态。
实际休眠时间及其分辨率取决于容器的 default-5 秒。
最短睡眠时间等于 ,所有睡眠时间都是它的倍数。
对于较短的睡眠时间,或者为了提高其准确性,请考虑减少容器的 .pollTimeout pollTimeout pollTimeout |
手动提交偏移量
通常,在使用 or 时,必须按顺序确认确认,因为 Kafka 不维护每条记录的状态,只维护每个组/分区的已提交偏移量。
从版本 2.8 开始,您现在可以设置 container property ,该属性允许以任何顺序确认 poll 返回的记录的确认。
侦听器容器将延迟无序提交,直到收到缺少的确认。
使用者将被暂停(不传送新记录),直到上一次轮询的所有偏移量都已提交。AckMode.MANUAL
AckMode.MANUAL_IMMEDIATE
asyncAcks
虽然此功能允许应用程序异步处理记录,但应理解,它增加了失败后重复交付的可能性。 |
@KafkaListener
注解
该 Comments 用于将 Bean 方法指定为侦听器容器的侦听器。
该 bean 包装在一个配置了各种功能(例如转换器)中,以便在必要时转换数据以匹配方法参数。@KafkaListener
MessagingMessageListenerAdapter
您可以使用 or 属性占位符 () 在 SpEL 上配置注释上的大多数属性。
有关更多信息,请参阅 Javadoc。#{…}
${…}
录制侦听器
该注释为简单的 POJO 侦听器提供了一种机制。
以下示例演示如何使用它:@KafkaListener
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
此机制需要其中一个类上的注释和侦听器容器工厂,该工厂用于配置底层 .
默认情况下,需要一个 name 为 的 bean。
以下示例演示如何使用:@EnableKafka
@Configuration
ConcurrentMessageListenerContainer
kafkaListenerContainerFactory
ConcurrentMessageListenerContainer
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
请注意,要设置容器属性,您必须在工厂中使用该方法。
它用作注入到容器中的实际属性的模板。getContainerProperties()
从版本 2.1.1 开始,您现在可以为注释创建的使用者设置属性。
后缀为 ,其中 是一个整数,表示使用并发时的容器编号。client.id
clientIdPrefix
-n
n
从版本 2.2 开始,您现在可以通过在 Comments 本身上使用 properties 来覆盖容器工厂的 和 properties。
属性可以是简单值、属性占位符或 SPEL 表达式。
以下示例显示了如何执行此操作:concurrency
autoStartup
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
显式分区分配
您还可以使用显式主题和分区(以及可选的其初始偏移量)配置 POJO 侦听器。 以下示例显示了如何执行此操作:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
您可以在 or 属性中指定每个分区,但不能同时指定两者。partitions
partitionOffsets
与大多数 Comments 属性一样,您可以使用 SPEL 表达式;有关如何生成大型分区列表的示例,请参阅手动分配所有分区。
从版本 2.5.5 开始,您可以将初始偏移量应用于所有分配的分区:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
通配符表示属性中的所有分区。
每个 中只能有一个带有通配符的 .*
partitions
@PartitionOffset
@TopicPartition
此外,当侦听器实现 时,现在会调用 ,即使使用手动分配也是如此。
例如,这允许在当时执行任何任意 seek 操作。ConsumerSeekAware
onPartitionsAssigned
从版本 2.6.4 开始,您可以指定以逗号分隔的分区列表或分区范围:
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
范围是非独占的;上面的示例将分配 partitions 。0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15
指定初始偏移量时,可以使用相同的技术:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初始偏移量将应用于所有 6 个分区。
手动确认
使用 manual 时,您还可以为侦听器提供 .
以下示例还显示了如何使用其他容器工厂。AckMode
Acknowledgment
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
消费者记录元数据
最后,有关记录的元数据可从消息标头中获得。 您可以使用以下报头名称来检索邮件的报头:
-
KafkaHeaders.OFFSET
-
KafkaHeaders.RECEIVED_MESSAGE_KEY
-
KafkaHeaders.RECEIVED_TOPIC
-
KafkaHeaders.RECEIVED_PARTITION_ID
-
KafkaHeaders.RECEIVED_TIMESTAMP
-
KafkaHeaders.TIMESTAMP_TYPE
从版本 2.5 开始,如果传入记录具有键,则不存在;以前,标头中填充了一个值。
此更改是为了使框架与不存在有值标头的约定保持一致。RECEIVED_MESSAGE_KEY
null
null
spring-messaging
null
以下示例演示如何使用标头:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
从版本 2.5 开始,您可以在参数中接收记录元数据,而不是使用离散标头。ConsumerRecordMetadata
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
这包含除 key 和 value 之外的所有数据。ConsumerRecord
Batch 侦听器
从版本 1.1 开始,您可以配置方法来接收从使用者轮询接收的整批使用者记录。
要配置侦听器容器工厂以创建批处理侦听器,您可以设置该属性。
以下示例显示了如何执行此操作:@KafkaListener
batchListener
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
从版本 2.8 开始,你可以使用 Comments 上的属性覆盖工厂的专有属性。
这与对 Container Error Handlers 的更改一起,允许将同一工厂用于记录侦听器和批处理侦听器。batchListener batch @KafkaListener |
以下示例显示如何接收有效负载列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
topic 、 partition 、 offset 等在与有效负载并行的标头中可用。 以下示例演示如何使用标头:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
或者,您可以在每条消息中接收具有每个偏移量和其他详细信息的 of 对象,但它必须是在方法上定义的唯一参数(使用手动提交时,除了 optional 和/或 parameters)。
以下示例显示了如何执行此操作:List
Message<?>
Acknowledgment
Consumer<?, ?>
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
在这种情况下,不会对有效负载执行任何转换。
如果配置了 ,则还可以向参数添加泛型类型,并转换有效负载。
有关更多信息,请参阅使用 Batch Listeners 进行有效负载转换。BatchMessagingMessageConverter
RecordMessageConverter
Message
您还可以接收对象列表,但它必须是方法上定义的唯一参数(在使用手动提交和参数时,除了 optional )。
以下示例显示了如何执行此操作:ConsumerRecord<?, ?>
Acknowledgment
Consumer<?, ?>
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
从版本 2.2 开始,侦听器可以接收该方法返回的完整对象,从而允许侦听器访问其他方法,例如(返回列表中的实例)和(获取选择性记录)。
同样,这必须是方法上的唯一参数(使用手动提交或参数时,除了 optional 之外)。
以下示例显示了如何执行此操作:ConsumerRecords<?, ?>
poll()
partitions()
TopicPartition
records(TopicPartition)
Acknowledgment
Consumer<?, ?>
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
如果容器工厂已配置,则侦听器将忽略该 Factory,并发出一条日志消息。
如果使用侦听器的形式,则只能使用批处理侦听器筛选记录。
默认情况下,一次筛选一个记录;从版本 2.8 开始,您可以覆盖以在一次调用中筛选整个批处理。RecordFilterStrategy ConsumerRecords<?, ?> WARN <List<?>> filterBatch |
注释属性
从版本 2.0 开始,该属性(如果存在)将用作 Kafka 使用者属性,覆盖使用者工厂中配置的属性(如果存在)。
您还可以显式设置或设置为 false 以恢复以前使用 Consumer Factory 的行为。id
group.id
groupId
idIsGroup
group.id
您可以在大多数 Comments 属性中使用属性占位符或 SPEL 表达式,如下例所示:
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
从版本 2.1.2 开始,SpEL 表达式支持特殊标记:.
它是一个伪 Bean 名称,表示存在此 Comments 的当前 Bean 实例。__listener
请考虑以下示例:
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
给定上一个示例中的 bean,然后我们可以使用以下内容:
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
如果在不太可能的情况下,您有一个名为 的实际 bean,则可以使用该属性更改表达式标记。
以下示例显示了如何执行此操作:__listener
beanRef
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
groupId = "#{__x.topic}.group")
从版本 2.2.4 开始,您可以直接在 Comments 上指定 Kafka consumer 属性,这些属性将覆盖在 consumer 工厂中配置的具有相同名称的任何属性。您不能以这种方式指定 and 属性;他们将被忽视;对这些对象使用 and annotation 属性。group.id
client.id
groupId
clientIdPrefix
这些属性指定为具有常规 Java 文件格式的单个字符串: , , 或 .Properties
foo:bar
foo=bar
foo bar
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下是 使用 RoutingKafkaTemplate
中示例的相应侦听器示例。
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
获取 Consumergroup.id
在多个容器中运行相同的侦听器代码时,能够确定记录来自哪个容器(由其使用者属性标识)可能很有用。group.id
您可以调用侦听器线程来执行此操作。
或者,您可以在 method 参数中访问组 ID。KafkaUtils.getConsumerGroupId()
@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")
public void listener(@Payload String foo,
@Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
这在接收 a of 记录的记录侦听器和批处理侦听器中可用。
它在接收参数的批处理侦听器中不可用。
在这种情况下,请使用 the mechanism。List<?> ConsumerRecords<?, ?> KafkaUtils |
容器线程命名
侦听器容器目前使用两个任务执行程序,一个用于调用使用者,另一个用于在 kafka 使用者属性为时调用侦听器。
您可以通过设置容器的 .
使用池化执行程序时,请确保有足够的线程可用于处理使用它们的所有容器之间的并发。
使用 时,每个使用方 () 都使用每个使用方的线程。enable.auto.commit
false
consumerExecutor
listenerExecutor
ContainerProperties
ConcurrentMessageListenerContainer
concurrency
如果您未提供使用者执行程序,则使用 a。
此执行程序创建名称类似于 (consumer thread) 的线程。
对于 ,线程名称的一部分变为 ,其中表示使用者实例。 每次启动容器时递增。
因此,如果 bean 名称为 ,则此容器中的线程将在容器首次启动后命名为 ,以此类推;等,在停止和随后的启动之后。SimpleAsyncTaskExecutor
<beanName>-C-1
ConcurrentMessageListenerContainer
<beanName>
<beanName>-m
m
n
container
container-0-C-1
container-1-C-1
container-0-C-2
container-1-C-2
@KafkaListener
作为元注释
从版本 2.2 开始,您现在可以用作 meta 注释。
以下示例显示了如何执行此操作:@KafkaListener
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics();
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
您必须为、 、 或 (and、通常,或者除非您在 Consumer Factory 配置中指定了 a) 中的至少一个别名。
以下示例显示了如何执行此操作:topics
topicPattern
topicPartitions
id
groupId
group.id
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
...
}
@KafkaListener
在类上
在类级别使用时,必须在方法级别指定。
在传送消息时,转换后的消息负载类型用于确定要调用的方法。
以下示例显示了如何执行此操作:@KafkaListener
@KafkaHandler
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
从版本 2.1.3 开始,如果其他方法不匹配,则可以将方法指定为调用的默认方法。
最多可以指定一种方法。
使用方法时,有效负载必须已转换为域对象(以便可以执行匹配)。
使用自定义反序列化器 , 或 the 并将其设置为 .
有关更多信息,请参见 序列化、反序列化和消息转换。@KafkaHandler
@KafkaHandler
JsonDeserializer
JsonMessageConverter
TypePrecedence
TYPE_ID
由于 Spring 解析方法参数的方式存在一些限制,因此默认无法接收离散的 Headers。它必须使用 Consumer Record Metadata 中所述的 。@KafkaHandler ConsumerRecordMetadata |
例如:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果对象是 ;该参数还将获得对 .String
topic
object
如果您需要 default 方法中有关记录的元数据,请使用以下命令:
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
@KafkaListener
属性修改
从版本 2.7.2 开始,您现在可以在创建容器之前以编程方式修改注释属性。
为此,请将一个或多个添加到应用程序上下文中。 是 a,并且必须返回属性映射。
属性值可以包含 SPEL 和/或属性占位符;在执行任何解析之前调用增强器。
如果存在多个 enhancer,并且它们 implements ,则将按顺序调用它们。KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer
AnnotationEnhancer
BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>
Ordered
AnnotationEnhancer 必须声明 bean 定义,因为它们在应用程序上下文生命周期的早期就是必需的。static |
示例如下:
@Bean
public static AnnotationEnhancer groupIdEnhancer() {
return (attrs, element) -> {
attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
? ((Class<?>) element).getSimpleName()
: ((Method) element).getDeclaringClass().getSimpleName()
+ "." + ((Method) element).getName()));
return attrs;
};
}
@KafkaListener
生命周期管理
为 Comments 创建的侦听器容器不是应用程序上下文中的 Bean。
相反,它们注册到类型为 的基础结构 Bean。
这个 bean 由框架自动声明并管理容器的生命周期;它将自动启动任何已设置为 的容器。
所有容器工厂创建的所有容器必须位于同一 .
有关更多信息,请参阅 Listener Container Auto Startup。
您可以使用注册表以编程方式管理生命周期。
启动或停止注册表将启动或停止所有已注册的容器。
或者,您可以使用其属性获取对单个容器的引用。
您可以在注释上进行设置,这将覆盖在容器工厂中配置的默认设置。
您可以从应用程序上下文(例如自动布线)获取对 bean 的引用,以管理其已注册的容器。
以下示例说明如何执行此操作:@KafkaListener
KafkaListenerEndpointRegistry
autoStartup
true
phase
id
autoStartup
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
注册表仅维护其管理的容器的生命周期;声明为 bean 的容器不受注册表管理,可以从应用程序上下文中获取。
可以通过调用 registry 的方法获取托管容器的集合。
版本2.2.5添加了一个 convenience method ,它返回所有容器的集合,包括由注册表管理的容器和声明为bean的容器。
返回的集合将包括任何已初始化的原型 bean,但它不会初始化任何惰性 bean 声明。getListenerContainers()
getAllListenerContainers()
刷新应用程序上下文后注册的端点将立即启动,无论其属性如何,以符合契约,其中仅在应用程序上下文初始化期间考虑。
延迟注册的一个示例是具有 in prototype 范围的 Bean,其中实例是在初始化上下文后创建的。
从版本 2.8.7 开始,您可以将 registry 的属性设置为,然后容器的属性将定义是否启动容器。autoStartup SmartLifecycle autoStartup @KafkaListener alwaysStartAfterRefresh false autoStartup |
@KafkaListener
@Payload
验证
从版本 2.2 开始,现在添加 a 来验证参数变得更加容易。
以前,您必须配置自定义并将其添加到注册商。
现在,您可以将验证器添加到注册商本身。
以下代码演示如何执行此操作:Validator
@KafkaListener
@Payload
DefaultMessageHandlerMethodFactory
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(new MyValidator());
}
}
当你将 Spring Boot 与验证Starters一起使用时,会自动配置 a,如下例所示:LocalValidatorFactoryBean |
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
以下示例演示如何验证:
public static class ValidatedClass {
@Max(10)
private int bar;
public int getBar() {
return this.bar;
}
public void setBar(int bar) {
this.bar = bar;
}
}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}
从版本 2.5.11 开始,验证现在适用于类级侦听器中方法的有效负载。
请参阅 类@KafkaListener
。@KafkaHandler
重新平衡侦听器
ContainerProperties
具有一个名为 的属性,该属性采用 Kafka 客户端接口的实现。
如果未提供此属性,则容器将配置一个日志记录侦听器,用于记录 级别的再平衡事件。
框架还添加了一个子接口。
下面的清单显示了接口定义:consumerRebalanceListener
ConsumerRebalanceListener
INFO
ConsumerAwareRebalanceListener
ConsumerAwareRebalanceListener
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
请注意,当 partitions 被撤销时,有两个回调。 第一个会立即调用。 第二个 is 在提交任何待处理的偏移量后调用。 如果您希望在某些外部存储库中维护偏移量,这非常有用,如下例所示:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从版本 2.4 开始,添加了一个新方法(类似于 中的同名方法)。
的默认实现 on 只是调用 。
默认实现 on 不执行任何操作。
为侦听器容器提供自定义侦听器(任一类型)时,您的实现不能从 中调用,这一点很重要。
如果实现,则应覆盖 default 方法。
这是因为侦听器容器将在对你的实现调用该方法后从其实现中调用自己的容器。
如果你的实现委托给默认行为,每次在容器的侦听器上调用该方法时,都会调用两次。onPartitionsLost() ConsumerRebalanceLister ConsumerRebalanceLister onPartionsRevoked ConsumerAwareRebalanceListener onPartitionsRevoked onPartitionsLost ConsumerRebalanceListener onPartitionsRevoked onPartitionsLost onPartitionsRevoked Consumer |
转发侦听器结果@SendTo
从版本 2.0 开始,如果您还使用注释对 a 进行注释,并且方法调用返回结果,则结果将转发到由 .@KafkaListener
@SendTo
@SendTo
该值可以有多种形式:@SendTo
-
@SendTo("someTopic")
路由到 Literal 主题 -
@SendTo("#{someExpression}")
路由到通过在应用程序上下文初始化期间计算表达式一次来确定的主题。 -
@SendTo("!{someExpression}")
路由到通过在运行时评估表达式确定的主题。 用于评估的对象具有三个属性:#root
-
request
:入站(或批处理侦听器的对象))ConsumerRecord
ConsumerRecords
-
source
:从 .org.springframework.messaging.Message<?>
request
-
result
:该方法返回结果。
-
-
@SendTo
(no properties):这被视为(自版本 2.1.3 起)。!{source.headers['kafka_replyTopic']}
从版本 2.1.11 和 2.2.1 开始,属性占位符在 values 中解析。@SendTo
表达式评估的结果必须是表示主题名称的 a。
以下示例显示了各种使用方法:String
@SendTo
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
...
}
}
为了支持 ,必须为侦听器容器工厂提供一个 (在其属性中),用于发送回复。
这应该是 a 而不是 a,它在客户端用于请求/回复处理。
使用 Spring Boot 时,boot 会自动将模板配置到工厂中;在配置您自己的工厂时,必须按照以下示例所示进行设置。@SendTo KafkaTemplate replyTemplate KafkaTemplate ReplyingKafkaTemplate |
从版本 2.2 开始,你可以向侦听器容器工厂添加 a。
将查询此信息以确定要在回复消息中设置的标头。
以下示例演示如何添加 :ReplyHeadersConfigurer
ReplyHeadersConfigurer
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
如果需要,您还可以添加更多标题。 以下示例显示了如何执行此操作:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
使用 时,必须在其属性中配置 with a 以执行发送。@SendTo
ConcurrentKafkaListenerContainerFactory
KafkaTemplate
replyTemplate
除非你使用请求/回复语义,否则只使用简单的方法,因此你可能希望创建一个子类来生成分区或键。
以下示例显示了如何执行此操作:send(topic, value) |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
@Override
public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
如果 listener 方法返回 或 ,则 listener 方法负责设置回复的消息标头。
例如,在处理来自 的请求时,您可以执行以下操作:
|
当使用请求/回复语义时,发送方可以请求目标分区。
即使未返回任何结果,您也可以对方法进行注释。
这是为了允许配置可以将有关失败消息传送的信息转发到某个主题。
以下示例显示了如何执行此操作:
有关更多信息,请参阅处理异常。 |
如果侦听器方法返回 ,则默认情况下,在发送值时,每个元素都会有一条记录。
从版本 2.3.5 开始,将属性 on 设置为 ,整个结果将作为 single 的值发送。
这需要在回复模板的 producer 配置中使用合适的序列化器。
但是,如果回复为 ,则忽略该属性,并且每条消息将单独发送。Iterable splitIterables @KafkaListener false ProducerRecord Iterable<Message<?>> |
筛选消息
Spring for Apache Kafka 项目还通过类提供了一些帮助,该类可以将 .
此类采用一个实现,您可以在其中实现该方法以指示消息是重复的,应丢弃。
这有一个名为 的附加属性,该属性指示适配器是否应确认丢弃的记录。
这是默认的。FilteringMessageListenerAdapter
MessageListener
RecordFilterStrategy
filter
ackDiscarded
false
当您使用 时,请在容器工厂上设置 (和可选 ),以便将侦听器包装在适当的过滤适配器中。@KafkaListener
RecordFilterStrategy
ackDiscarded
此外,还提供了 a ,用于使用批处理消息侦听器。FilteringBatchMessageListenerAdapter
如果您收到 a 而不是 ,则忽略 ,因为 是不可变的。FilteringBatchMessageListenerAdapter @KafkaListener ConsumerRecords<?, ?> List<ConsumerRecord<?, ?>> ConsumerRecords |
从版本 2.8.4 开始,你可以通过使用侦听器注释上的属性来覆盖侦听器容器工厂的默认值。RecordFilterStrategy
filter
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
重试投放
请参见 处理异常.DefaultErrorHandler
按顺序启动 s@KafkaListener
一个常见的用例是在另一个侦听器使用主题中的所有记录后启动侦听器。
例如,您可能希望在处理来自其他主题的记录之前将一个或多个压缩主题的内容加载到内存中。
从版本 2.7.3 开始,引入了一个新组件。
它使用该属性将容器分组在一起,并在当前组中的所有容器都处于空闲状态时启动下一个组中的容器。ContainerGroupSequencer
@KafkaListener
containerGroup
最好用一个例子来说明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
在这里,我们有 4 个侦听器,分为两组,和 。g1
g2
在应用程序上下文初始化期间,排序器将所提供组中所有容器的属性设置为 。
它还将任何容器(尚未设置)的 设置为 提供的值(在本例中为 5000 毫秒)。
然后,当 sequencer 由应用程序上下文启动时,将启动第一组中的容器。
收到 s 时,每个容器中的每个子容器都将停止。
当 中的所有子容器都停止时,父容器将停止。
当一个组中的所有容器都已停止时,将启动下一个组中的容器。
组中的组或容器的数量没有限制。autoStartup
false
idleEventInterval
ListenerContainerIdleEvent
ConcurrentMessageListenerContainer
默认情况下,最后一个组(上面)中的容器在空闲时不会停止。
要修改该行为,请在 sequencer 上设置为 。g2
stopLastGroupWhenIdle
true
顺便说一句;以前,每个组中的容器都被添加到一个 Bean 类型的 Bean 中,其 Bean 名称为 .
这些集合现在已被弃用,取而代之的是 bean 名称为组名称的 bean,后缀为 ;在上面的示例中,将有 2 个 bean 和 。
这些 bean 将在将来的发行版中删除。Collection<MessageListenerContainer>
containerGroup
ContainerGroup
.group
g1.group
g2.group
Collection
用于接收KafkaTemplate
本节介绍如何使用 来接收消息。KafkaTemplate
从版本 2.8 开始,该模板有四种方法:receive()
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
如您所见,您需要知道需要检索的记录的分区和偏移量;将为每个操作创建 (并关闭) 一个新的。Consumer
使用最后两种方法,将单独检索每条记录,并将结果组合到一个对象中。
为请求创建 s 时,仅支持正的绝对偏移量。ConsumerRecords
TopicPartitionOffset
4.1.5. 侦听器容器属性
财产 | 违约 | 描述 |
---|---|---|
1 |
当 is 或 . |
|
|
对象链(例如 around 建议)包装消息侦听器,按顺序调用。 |
|
Batch |
控制 offset 的提交频率 - 请参阅 Committing Offsets。 |
|
5000 |
在 is 或 . |
|
LATEST_ONLY _NO_TX |
是否在分配时提交初始位置;默认情况下,初始偏移量仅在 is 时提交,并且即使存在事务管理器,它也不会在事务中运行。
有关可用选项的更多信息,请参阅 javadocs 。 |
|
|
如果不是 null,则当 Kafka 客户端抛出 or 时,将在轮询之间休眠。
当 null 时,此类异常被视为致命异常,容器将停止。 |
|
(空字符串) |
consumer 属性的前缀。
覆盖 Consumer Factory 属性;在 concurrent 容器中,将添加为每个使用者实例的后缀。 |
|
假 |
设置为 在收到 a 时始终检查标头。
当使用者代码无法确定 an 是否已配置时(例如,当使用委托反序列化器时)很有用。 |
|
假 |
设置为 在收到 a 时始终检查标头。
当使用者代码无法确定 an 是否已配置时(例如,当使用委托反序列化器时)很有用。 |
|
|
如果存在,并且是在提交完成后调用的回调。 |
|
|
的提供程序 ;默认情况下,提供程序会创建具有空元数据的偏移量和元数据。提供程序提供了一种自定义元数据的方法。 |
|
调试 |
与提交偏移量相关的日志的日志记录级别。 |
|
|
再平衡侦听器;请参阅 重新平衡侦听器。 |
|
30 秒 |
在记录错误之前等待使用者启动的时间;例如,如果您使用的 Task Executor 线程不足,则可能会发生这种情况。 |
|
|
用于运行使用者线程的任务执行程序。
默认执行程序创建名为 ;使用 ,名称是 bean 名称;其中 name 是 Bean 名称,后缀为 其中 n 对于每个子容器递增。 |
|
|
请参阅 Delivery Attempts 标头。 |
|
|
Exactly Once 语义模式;请参阅 Exactly Once 语义。 |
|
|
当使用事务创建者生成的记录时,如果使用者位于分区的末尾,则滞后可能会错误地报告为大于零,因为伪记录用于指示事务提交/回滚,并且可能存在回滚记录。
这在功能上不会影响消费者,但一些用户表示担心 “滞后” 不为零。
将此属性设置为,容器将更正此类误报的偏移量。
该检查在下一次轮询之前执行,以避免增加提交处理的复杂性。
在撰写本文时,只有当使用者配置了 1 且大于 1 时,才会纠正滞后。
有关更多信息,请参阅 KAFKA-10683。 |
|
|
覆盖 consumer 属性;由 or 属性自动设置。 |
|
5.0 |
该 Multiplier 在收到任何记录之前应用。
收到记录后,将不再应用乘数。
自 2.8 版起可用。 |
|
0 |
用于通过在轮询之间休眠线程来减慢投放速度。
处理一批记录的时间加上此值必须小于 consumer 属性。 |
|
|
设置后,将启用发布 s,请参阅 应用程序事件 和 检测空闲和无响应的使用者。
另请参阅 。 |
|
|
设置后,将启用发布 s,请参阅 应用程序事件 和 检测空闲和无响应的使用者。 |
|
没有 |
用于覆盖在 Consumer Factory 上配置的任何任意 Consumer 属性。 |
|
|
设置为 true 可在 INFO 级别记录所有容器属性。 |
|
|
消息侦听器。 |
|
|
是否维护使用者线程的 Micrometer 计时器。 |
|
|
如果代理上不存在已配置的主题,则为 true 时阻止容器启动。 |
|
30 秒 |
检查 s 的使用者线程状态的频率。
请参阅 和 。 |
|
3.0 |
乘以来确定是否发布 .
看。 |
|
|
设置为 false 可记录完整的使用者记录(错误、调试日志等),而不仅仅是 .
荒废的。
替换为 。 |
|
5000 |
传入的超时。 |
|
|
要在其上运行使用者监视器任务的计划程序。 |
|
10000 |
在所有使用者停止之前和发布容器停止事件之前阻止方法的最长时间(以毫秒为单位)。 |
|
|
如果引发 a,则停止侦听器容器。
有关更多信息,请参阅 After-rollback Processor 。 |
|
|
当容器停止时,请在当前记录之后停止处理,而不是在处理上一次轮询的所有记录之后停止处理。 |
|
见 desc. |
使用批处理侦听器时,如果为 ,则调用侦听器,并将轮询结果拆分为子批处理,每个分区一个子批处理。
默认,除非使用 Transactions 时 - 请参阅 Exactly Once 语义。 |
|
|
当 为 时要使用的超时。
如果未设置,容器将尝试确定 consumer 属性并使用该属性;否则将使用 60 秒。 |
|
|
是否对偏移量使用 sync 或 async commits;看。 |
|
不适用 |
配置的主题、主题模式或显式分配的主题/分区。
互斥;必须至少提供一个;由构造函数强制执行。 |
|
|
请参阅 事务。 |
财产 | 违约 | 描述 |
---|---|---|
|
在事务回滚后调用。 |
|
应用程序上下文 |
事件发布者。 |
|
见 desc. |
已弃用 - 请参阅 。 |
|
|
将 a 设置为在调用批处理侦听器之前调用 ;不适用于 Record 侦听器。
另请参阅 。 |
|
Bean 名称 |
容器的 Bean 名称;后缀为 子容器。 |
|
见 desc. |
|
|
|
容器 properties 实例。 |
|
见 desc. |
已弃用 - 请参阅 。 |
|
见 desc. |
已弃用 - 请参阅 。 |
|
见 desc. |
如果存在,则为来自 Consumer Factory 的属性。 |
|
|
确定是在事务开始之前还是之后调用 the 。 |
|
见 desc. |
用户配置的容器的 Bean 名称或 s 的属性。 |
|
零 |
要在标头中填充的值。
使用 ,此值是从属性获取的。
此标头可以在各种位置使用,例如 ,也可以在侦听器代码本身中使用。 |
|
(只读) |
如果已请求使用者暂停,则为 True。 |
|
|
设置 a to 在调用记录侦听器之前调用 ;不适用于批处理侦听器。
另请参阅 。 |
|
30 秒 |
当 container 属性为 时,等待操作完成的时间(以秒为单位)。 |
财产 | 违约 | 描述 |
---|---|---|
(只读) |
当前分配给此容器的分区(显式或不显式)。 |
|
(只读) |
当前分配给此容器的分区(显式或不显式)。 |
|
|
由并发容器使用,为每个子容器的使用者提供唯一的 . |
|
不适用 |
如果已请求暂停,并且使用者实际上已暂停,则为 True。 |
财产 | 违约 | 描述 |
---|---|---|
|
设置为 false 以禁止向 consumer 属性添加后缀,当 the 仅为 1 时。 |
|
(只读) |
当前分配给此容器的子 s 的分区的聚合(显式或不显式)。 |
|
(只读) |
当前分配给此容器的子 s 的分区(显式或非显式),由子容器的 consumer 的属性作为键。 |
|
1 |
要管理的子 s 的数量。 |
|
不适用 |
如果已请求暂停,并且所有子容器的使用者实际上都已暂停,则为 True。 |
|
不适用 |
对所有子 s 的引用。 |
4.1.6. 动态创建容器
有几种技术可用于在运行时创建侦听器容器。 本节探讨了其中的一些技术。
MessageListener 实现
如果你直接实现自己的侦听器,你可以简单地使用容器工厂为该侦听器创建一个原始容器:
public class MyListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// ...
}
}
private ConcurrentMessageListenerContainer<String, String> createContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(new MyListener());
container.getContainerProperties().setGroupId(group);
container.setBeanName(group);
container.start();
return container;
}
class MyListener : MessageListener<String?, String?> {
override fun onMessage(data: ConsumerRecord<String?, String?>) {
// ...
}
}
private fun createContainer(
factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
val container = factory.createContainer(topic)
container.containerProperties.messageListener = MyListener()
container.containerProperties.groupId = group
container.beanName = group
container.start()
return container
}
原型 Bean
可以通过将 bean 声明为 prototype 来动态创建带有 Comments 的方法的容器:@KafkaListener
public class MyPojo {
private final String id;
private final String topic;
public MyPojo(String id, String topic) {
this.id = id;
this.topic = topic;
}
public String getId() {
return this.id;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
return new MyPojo(id, topic);
}
applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
class MyPojo(id: String?, topic: String?) {
@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topics}"])
fun listen(`in`: String?) {
println(`in`)
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun pojo(id: String?, topic: String?): MyPojo {
return MyPojo(id, topic)
}
applicationContext.getBean(MyPojo::class.java, "one", arrayOf("topic2"))
applicationContext.getBean(MyPojo::class.java, "two", arrayOf("topic3"))
侦听器必须具有唯一的 ID。
从版本 2.8.9 开始,它有一个新方法允许您重复使用 id。
取消注册容器不会取消注册容器,您必须自己执行此操作。KafkaListenerEndpointRegistry unregisterListenerContainer(String id) stop() |
4.1.7. 应用程序事件
以下 Spring 应用程序事件由侦听器容器及其使用者发布:
-
ConsumerStartingEvent
- 在 Consumer 线程首次启动时发布,在它开始轮询之前。 -
ConsumerStartedEvent
- 在消费者即将开始轮询时发布。 -
ConsumerFailedToStartEvent
- 发布 如果 No 在容器属性中发布。 此事件可能表示配置的任务执行程序没有足够的线程来支持使用它的容器及其并发性。 出现此情况时,还会记录错误消息。ConsumerStartingEvent
consumerStartTimeout
-
ListenerContainerIdleEvent
:在未收到任何消息时发布(如果已配置)。idleInterval
-
ListenerContainerNoLongerIdleEvent
:在之前发布 .ListenerContainerIdleEvent
-
ListenerContainerPartitionIdleEvent
:未从中的该分区收到任何消息时发布(如果已配置)。idlePartitionEventInterval
-
ListenerContainerPartitionNoLongerIdleEvent
:当从之前发布 .ListenerContainerPartitionIdleEvent
-
NonResponsiveConsumerEvent
:当使用者似乎在方法中被阻止时发布。poll
-
ConsumerPartitionPausedEvent
:当分区暂停时,由每个使用者发布。 -
ConsumerPartitionResumedEvent
:每个 Consumer 在恢复分区时发布的 -
ConsumerPausedEvent
:在容器暂停时由每个使用者发布。 -
ConsumerResumedEvent
:容器恢复时由每个使用者发布。 -
ConsumerStoppingEvent
:由每个使用者在停止前发布。 -
ConsumerStoppedEvent
:在 Consumer 关闭后发布。 请参见线程安全。 -
ContainerStoppedEvent
:当所有使用者都已停止时发布。
默认情况下,应用程序上下文的事件 multicaster 在调用线程上调用事件侦听器。
如果将 multicaster 更改为使用异步执行程序,则当事件包含对使用者的引用时,不得调用任何方法。Consumer |
具有以下属性:ListenerContainerIdleEvent
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
idleTime
:发布事件时容器处于空闲状态的时间。 -
topicPartitions
:在生成事件时为容器分配的主题和分区。 -
consumer
:对 Kafka 对象的引用。 例如,如果之前调用了使用者的方法,则在收到事件时可以调用该方法。Consumer
pause()
resume()
-
paused
:容器当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
具有相同的属性,但 和 除外。ListenerContainerNoLongerIdleEvent
idleTime
paused
具有以下属性:ListenerContainerPartitionIdleEvent
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
idleTime
:发布事件时分区消耗处于空闲状态的时间。 -
topicPartition
:触发事件的主题和分区。 -
consumer
:对 Kafka 对象的引用。 例如,如果之前调用了使用者的方法,则在收到事件时可以调用该方法。Consumer
pause()
resume()
-
paused
:该使用者的分区使用当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
具有相同的属性,但 和 除外。ListenerContainerPartitionNoLongerIdleEvent
idleTime
paused
具有以下属性:NonResponsiveConsumerEvent
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
timeSinceLastPoll
:容器上次调用 之前的时间。poll()
-
topicPartitions
:在生成事件时为容器分配的主题和分区。 -
consumer
:对 Kafka 对象的引用。 例如,如果之前调用了使用者的方法,则在收到事件时可以调用该方法。Consumer
pause()
resume()
-
paused
:容器当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
、 和 事件 具有以下属性:ConsumerPausedEvent
ConsumerResumedEvent
ConsumerStopping
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partitions
:涉及的实例。TopicPartition
事件的 , 具有以下属性:ConsumerPartitionPausedEvent
ConsumerPartitionResumedEvent
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partition
:涉及的实例。TopicPartition
、 、 和 事件 具有以下属性:ConsumerStartingEvent
ConsumerStartingEvent
ConsumerFailedToStartEvent
ConsumerStoppedEvent
ContainerStoppedEvent
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。
所有容器(无论是子容器还是父容器)都会发布 。
对于父容器,source 和 container 属性相同。ContainerStoppedEvent
此外,它还具有以下附加属性:ConsumerStoppedEvent
-
reason
-
NORMAL
- 消费者正常停止 (容器已停止)。 -
ERROR
- A 被抛出。java.lang.Error
-
FENCED
- 事务生成者已屏蔽,容器属性为 .stopContainerWhenFenced
true
-
AUTH
- 引发了 OR 且 未配置。AuthenticationException
AuthorizationException
authExceptionRetryInterval
-
NO_OFFSET
- 分区没有偏移量,策略为 。auto.offset.reset
none
-
在出现此类情况后,您可以使用此事件重新启动容器:
if (event.getReason.equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的使用者
虽然效率很高,但异步使用者的一个问题是检测它们何时处于空闲状态。 如果在一段时间内没有消息到达,您可能需要采取一些措施。
您可以将侦听器容器配置为在一段时间后没有消息传递时发布。
当容器处于空闲状态时,每毫秒发布一次事件。ListenerContainerIdleEvent
idleEventInterval
要配置此功能,请在容器上设置
以下示例显示了如何执行此操作:idleEventInterval
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);
return container;
}
以下示例演示如何为 :idleEventInterval
@KafkaListener
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在上述每种情况下,当容器处于空闲状态时,每分钟发布一次事件。
如果由于某种原因,consumer 方法没有退出,则不会收到任何消息,并且无法生成空闲事件(当无法访问代理时,这是早期版本的问题)。
在这种情况下,如果 poll 未在属性中返回,则容器会发布 。
默认情况下,每个容器中每 30 秒执行一次此检查。
您可以通过在配置侦听器容器时在 中设置 (默认 30 秒) 和 (默认 3.0) 属性来修改此行为。
这应该大于以避免由于争用条件而获得虚假事件。
接收此类事件可让您停止容器,从而唤醒使用者,使其可以停止。poll()
kafka-clients
NonResponsiveConsumerEvent
3x
pollTimeout
monitorInterval
noPollThreshold
ContainerProperties
noPollThreshold
1.0
从版本 2.6.2 开始,如果容器发布了 ,它将在随后收到记录时发布 。ListenerContainerIdleEvent
ListenerContainerNoLongerIdleEvent
事件消耗
您可以通过实现来捕获这些事件 — 通用侦听器或缩小范围以仅接收此特定事件的侦听器。
您还可以使用 Spring Framework 4.2 中引入的 。ApplicationListener
@EventListener
下一个示例将 和 合并为一个类。
您应该了解应用程序侦听器会获取所有容器的事件,因此,如果您想根据哪个容器处于空闲状态来执行特定操作,则可能需要检查侦听器 ID。
您也可以将 用于此目的。@KafkaListener
@EventListener
@EventListener
condition
有关事件属性的信息,请参阅应用程序事件。
该事件通常在使用者线程上发布,因此与对象交互是安全的。Consumer
以下示例同时使用 和 :@KafkaListener
@EventListener
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件侦听器查看所有容器的事件。
因此,在前面的示例中,我们根据侦听器 ID 缩小接收的事件范围。
由于为 创建的容器支持并发,因此实际容器的命名方式为 ,其中 是支持并发的每个实例的唯一值。
这就是我们在 CONDITION 中使用的原因。@KafkaListener id-n n startsWith |
如果您希望使用 idle 事件来停止 lister 容器,则不应在调用侦听器的线程上调用。
这样做会导致延迟和不必要的日志消息。
相反,您应该将事件移交给其他线程,然后该线程可以停止容器。
此外,如果容器实例是子容器,则不应使用容器实例。
您应该改为停止并发容器。container.stop() stop() |
空闲时的当前位置
注意,当检测到 idle 时,你可以通过在 Listener 中实现来获取当前位置。
请参见 寻找特定偏移量。ConsumerSeekAware
onIdleContainer()
4.1.8. Topic/Partition 初始偏移量
有几种方法可以设置分区的初始偏移量。
手动分配分区时,您可以在配置的参数中设置初始偏移量(如果需要)(请参阅 消息侦听器容器)。
您也可以随时寻找特定的偏移量。TopicPartitionOffset
当您在 broker 分配分区的位置使用组管理时:
-
对于新的 ,初始偏移量由消费者属性 ( 或 ) 确定。
group.id
auto.offset.reset
earliest
latest
-
对于现有组 ID,初始偏移量是该组 ID 的当前偏移量。 但是,您可以在初始化期间(或之后的任何时间)寻找特定的偏移量。
4.1.9. 寻找特定的偏移量
为了进行查找,您的侦听器必须实现 ,它具有以下方法:ConsumerSeekAware
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
当容器启动时以及分配分区时,将调用 The 。
在初始化后的任意时间进行 seek 时,您应该使用此回调。
您应该保存对回调的引用。
如果您在多个容器(或 )中使用相同的侦听器,则应将回调存储在 a 或由侦听器键控的其他结构中。registerSeekCallback
ConcurrentMessageListenerContainer
ThreadLocal
Thread
使用组管理时,在分配分区时调用。
例如,您可以使用此方法通过调用 callback 来设置 partition 的初始偏移量。
您还可以使用此方法将此线程的回调与分配的分区相关联(请参阅下面的示例)。
您必须使用 callback 参数,而不是传递给 .
从版本 2.5.5 开始,即使使用手动分区分配,也会调用此方法。onPartitionsAssigned
registerSeekCallback
onPartitionsRevoked
在容器停止或 Kafka 撤销分配时调用。
您应该丢弃此线程的回调并删除与已撤销分区的任何关联。
该回调有以下方法:
void seek(String topic, int partition, long offset);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection=<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection=<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
seekRelative
在版本 2.3 中添加,以执行相对查找。
-
offset
negative 和 - 相对于分区末尾查找。toCurrent
false
-
offset
positive 和 - seek 相对于分区的开头。toCurrent
false
-
offset
negative 和 - 相对于当前位置查找 (rewind)。toCurrent
true
-
offset
positive 和 - 相对于当前位置进行搜索(快进)。toCurrent
true
这些方法也在 2.3 版中添加。seekToTimestamp
在 or 方法中为多个分区查找相同的时间戳时,首选第二种方法,因为在对使用者方法的单次调用中查找时间戳的偏移量效率更高。
从其他位置调用时,容器将收集所有时间戳查找请求,并对 .onIdleContainer onPartitionsAssigned offsetsForTimes offsetsForTimes |
您还可以在检测到空闲容器时执行查找操作。
有关如何启用空闲容器检测的信息,请参阅检测空闲和无响应的使用者。onIdleContainer()
接受集合的方法很有用,例如,在处理压缩的主题时,并且您希望在每次启动应用程序时都寻找开头:seekToBeginning |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
要在运行时任意查找,请使用 for 相应线程的回调引用。registerSeekCallback
这是一个简单的 Spring Boot 应用程序,它演示了如何使用回调;它向主题发送 10 条记录;点击控制台会导致所有分区查找到开头。<Enter>
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
为了简单起见,版本 2.3 添加了类,该类跟踪要用于主题/分区的回调。
以下示例演示如何在每次容器空闲时在每个分区中查找处理的最后一条记录。
它还具有允许任意外部调用按一条记录倒带分区的方法。AbstractConsumerSeekAware
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
版本 2.6 向 abstract 类添加了便捷方法:
-
seekToBeginning()
- 查找所有已分配的分区到开头 -
seekToEnd()
- 查找所有已分配的分区到末尾 -
seekToTimestamp(long time)
- 将所有分配的分区查找到该时间戳表示的偏移量。
例:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
}
}
4.1.10. 容器工厂
如 @KafkaListener
Annotation 中所述,a 用于为带 Comments 的方法创建容器。ConcurrentKafkaListenerContainerFactory
从版本 2.2 开始,您可以使用同一工厂创建任何 .
如果要创建多个具有相似属性的容器,或者希望使用一些外部配置的工厂,例如 Spring Boot 自动配置提供的工厂,这可能很有用。
创建容器后,您可以进一步修改其属性,其中许多属性是使用 设置的。
以下示例配置了一个 :ConcurrentMessageListenerContainer
container.getContainerProperties()
ConcurrentMessageListenerContainer
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
以这种方式创建的容器不会添加到终端节点注册表中。
应将它们创建为定义,以便将它们注册到应用程序上下文中。@Bean |
从版本 2.3.4 开始,您可以在创建和配置每个容器后向工厂添加 以进一步配置每个容器。ContainerCustomizer
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
4.1.11. 线程安全
使用并发消息侦听器容器时,将在所有使用者线程上调用单个侦听器实例。 因此,侦听器需要是线程安全的,最好使用无状态侦听器。 如果无法使侦听器线程安全,或者添加同步会显著降低添加并发的好处,则可以使用以下几种技术之一:
-
将容器与 prototype 范围的 bean 一起使用,以便每个容器都获得自己的实例(使用 时无法执行此操作)。
n
concurrency=1
MessageListener
@KafkaListener
-
将状态保留在实例中。
ThreadLocal<?>
-
将单例侦听器委托给在 (或类似范围) 中声明的 bean。
SimpleThreadScope
为了便于清理线程状态(对于前面列表中的第二项和第三项),从版本 2.2 开始,侦听器容器在每个线程退出时发布一个。
您可以通过 or 方法使用这些事件,以从范围中删除实例或线程范围的 bean。
请注意,它不会销毁具有销毁接口(例如 )的 bean,因此您应该自己创建实例。ConsumerStoppedEvent
ApplicationListener
@EventListener
ThreadLocal<?>
remove()
SimpleThreadScope
DisposableBean
destroy()
默认情况下,应用程序上下文的事件 multicaster 在调用线程上调用事件侦听器。 如果将 multicaster 更改为使用异步执行程序,则线程清理无效。 |
4.1.12. 监控
监视侦听器性能
从版本 2.3 开始,如果在 class path 上检测到 micrometer s,并且应用程序上下文中存在 single ,则侦听器容器将自动为侦听器创建和更新 Micrometer s。
可以通过将 设置为 来禁用计时器。Timer
Micrometer
MeterRegistry
ContainerProperty
micrometerEnabled
false
维护两个计时器 - 一个用于成功调用侦听器,另一个用于失败。
计时器已命名并具有以下标签:spring.kafka.listener
-
name
:(容器 Bean 名称) -
result
:或success
failure
-
exception
:或none
ListenerExecutionFailedException
您可以使用 该属性添加其他标记。ContainerProperties
micrometerTags
使用并发容器,将为每个线程创建计时器,并且标记后缀为 where n is to 。name -n 0 concurrency-1 |
监控 KafkaTemplate 性能
从版本 2.5 开始,如果在 class path 上检测到 micrometer s for send 操作,并且应用程序上下文中存在 single ,则模板将自动创建和更新 Micrometer s for send 操作。
可以通过将模板的属性设置为 来禁用计时器。Timer
Micrometer
MeterRegistry
micrometerEnabled
false
维护两个计时器 - 一个用于成功调用侦听器,另一个用于失败。
计时器已命名并具有以下标签:spring.kafka.template
-
name
:(模板 Bean 名称) -
result
:或success
failure
-
exception
: 或失败的异常类名称none
您可以使用模板的属性添加其他标记。micrometerTags
Micrometer 原生指标
从版本 2.5 开始,该框架提供了 Factory Listeners 来在创建和关闭 producer 和 consumer 时管理 Micrometer 实例。KafkaClientMetrics
要启用此功能,只需将侦听器添加到您的 producer 和 consumer 工厂:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
传递给侦听器的使用者/生产者将添加到带有 tag name 的计量器标签中。id
spring.id
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count()
为 - 请参阅 KafkaStreams Micrometer Support 提供了类似的侦听器。StreamsBuilderFactoryBean
4.1.13. 事务
本节介绍 Spring for Apache Kafka 如何支持事务。
概述
0.11.0.0 客户端库添加了对事务的支持。 Spring for Apache Kafka 通过以下方式添加支持:
-
KafkaTransactionManager
:与常规的 Spring 事务支持(等)一起使用。@Transactional
TransactionTemplate
-
事务
KafkaMessageListenerContainer
-
本地事务
KafkaTemplate
-
与其他事务管理器的事务同步
通过向 提供 来启用事务。
在这种情况下,工厂不是管理单个 shared ,而是维护事务 producer 的缓存。
当用户调用生产者时,它会返回到缓存以供重用,而不是实际关闭。
每个生产者的属性是 + ,其中 starts with 并为每个新的生产者递增,除非事务由具有基于记录的侦听器的侦听器容器启动。
在这种情况下,是 。
这是为了正确支持击剑僵尸,如此处所述。
此新行为已在版本 1.3.7、2.0.6、2.1.10 和 2.2.0 中添加。
如果要恢复到以前的行为,可以将 上的属性设置为 。DefaultKafkaProducerFactory
transactionIdPrefix
Producer
close()
transactional.id
transactionIdPrefix
n
n
0
transactional.id
<transactionIdPrefix>.<group.id>.<topic>.<partition>
producerPerConsumerPartition
DefaultKafkaProducerFactory
false
虽然批处理侦听器支持事务,但默认情况下,不支持僵尸隔离,因为一个批处理可能包含来自多个主题或分区的记录。
但是,从版本 2.3.2 开始,如果将 container 属性设置为 true,则支持僵尸围栏。
在这种情况下,从上次轮询收到的每个分区都会调用一次批处理侦听器,就像每次轮询仅返回单个分区的记录一样。
这是从 2.5 版本开始默认的,当时使用 ;如果您正在使用事务但不关心僵尸围栏,请将其设置为 。
另请参阅 Exactly Once 语义。subBatchPerPartition true EOSMode.ALPHA false |
另请参阅 transactionIdPrefix
。
使用 Spring Boot,只需要设置属性 - Boot 将自动配置一个 bean 并将其连接到侦听器容器中。spring.kafka.producer.transaction-id-prefix
KafkaTransactionManager
从版本 2.5.8 开始,您现在可以在 producer 工厂上配置该属性。
当使用可能为 broker 的 .
使用 current 时,这可能会导致 a 没有 rebalance。
通过将 小于 ,如果生产者超过其最大年龄,工厂将刷新生产者。maxAge transactional.id.expiration.ms kafka-clients ProducerFencedException maxAge transactional.id.expiration.ms |
用KafkaTransactionManager
它是 Spring Framework 的 .
在其构造函数中提供了对 producer 工厂的引用。
如果您提供自定义生产者工厂,则它必须支持事务。
看。KafkaTransactionManager
PlatformTransactionManager
ProducerFactory.transactionCapable()
你可以将 与普通的 Spring 事务支持(、 、 和其他)一起使用。
如果事务处于活动状态,则在事务范围内执行的任何操作都使用事务的 .
管理器提交或回滚事务,具体取决于成功或失败。
您必须将 配置为使用与事务管理器相同的 。KafkaTransactionManager
@Transactional
TransactionTemplate
KafkaTemplate
Producer
KafkaTemplate
ProducerFactory
事务同步
本节引用仅限生产者事务(不是由侦听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅Using Consumer-Initiated Transaction。
如果要将记录发送到 kafka 并执行一些数据库更新,则可以使用普通的 Spring 事务管理,例如。DataSourceTransactionManager
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
Comments 的拦截器启动事务,并将事务与该事务管理器同步;每次发送都将参与该交易。
当方法退出时,数据库事务将提交,然后是 Kafka 事务。
如果您希望以相反的顺序执行提交(首先是 Kafka),请使用嵌套方法,其中 outer 方法配置为使用 ,而 internal 方法配置为使用 .@Transactional
KafkaTemplate
@Transactional
DataSourceTransactionManager
KafkaTransactionManager
有关在 Kafka 优先或 DB 优先配置中同步 JDBC 和 Kafka 事务的应用程序示例,请参阅与其他事务管理器的 Kafka 事务示例。
从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果同步事务上的提交失败(在主事务提交之后),将向调用方抛出异常。 以前,这会被静默忽略(在 debug 时记录)。 如有必要,应用程序应采取补救措施来补偿已提交的主事务。 |
使用使用者发起的事务
从 2.7 版开始,现在已经弃用了;有关更多信息,请参阅其 super class 的 Javadocs。
相反,在容器中使用 a 来启动 Kafka 事务,并使用 Comments 侦听器方法来启动另一个事务。ChainedKafkaTransactionManager
ChainedTransactionManager
KafkaTransactionManager
@Transactional
有关链接 JDBC 和 Kafka 事务的示例应用程序,请参阅使用其他事务管理器的 Kafka 事务示例。
KafkaTemplate
本地事务
您可以使用 在本地事务中执行一系列操作。
以下示例显示了如何执行此操作:KafkaTemplate
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回调中的参数是模板本身 ()。
如果回调正常退出,则提交事务。
如果引发异常,则回滚事务。this
如果存在正在处理的(或已同步的)事务,则不会使用它。
相反,使用新的 “嵌套” 事务。KafkaTransactionManager |
transactionIdPrefix
如概述中所述,producer 工厂配置了此属性以构建 producer 属性。
指定此属性时存在二分法,即使用 运行应用程序的多个实例时,它在所有实例上都必须相同,以便在侦听器容器线程上生成记录时满足隔离僵尸(在概述中也提到)。
但是,当使用不是由侦听器容器启动的事务生成记录时,每个实例上的前缀必须不同。
版本 2.3 使配置更简单,尤其是在 Spring Boot 应用程序中。
在以前的版本中,您必须创建两个生产者工厂和 s——一个用于在侦听器容器线程上生成记录,另一个用于由方法上的事务拦截器启动或由事务拦截器启动的独立事务。transactional.id
EOSMode.ALPHA
KafkaTemplate
kafkaTemplate.executeInTransaction()
@Transactional
现在,您可以覆盖工厂对 和 的 .transactionalIdPrefix
KafkaTemplate
KafkaTransactionManager
当为侦听器容器使用事务管理器和模板时,你通常会将其保留为 producer 工厂的 property 的默认值。
使用 时,此值对于所有应用程序实例都应相同。
有了它,不再需要使用相同的 ,即使对于消费者发起的交易也是如此;事实上,它在每个实例上都必须是唯一的,这与生产者发起的事务相同。
对于由模板(或事务管理器)启动的事务,应分别在模板和事务管理器上设置属性。
此属性在每个应用程序实例上必须具有不同的值。EOSMode.ALPHA
EOSMode.BETA
transactional.id
@Transaction
在使用时(代理版本 >= 2.5)时,此问题(不同的规则)已消除;请参阅 Exactly Once 语义。transactional.id EOSMode.BETA |
KafkaTemplate
事务性和非事务性发布
通常,当 a 是事务性的(配置了支持事务的 producer 工厂)时,需要事务。
事务可以由 、 方法、调用 或侦听器容器(如果配置了 )启动 。
任何在事务范围之外使用模板的尝试都会导致模板抛出 .
从版本 2.4.3 开始,您可以将模板的属性设置为 。
在这种情况下,模板将通过调用 's 方法允许操作在没有事务的情况下运行;Producer 将像往常一样被缓存或线程绑定,以便重用。
请参见使用 DefaultKafkaProducerFactory
。KafkaTemplate
TransactionTemplate
@Transactional
executeInTransaction
KafkaTransactionManager
IllegalStateException
allowNonTransactional
true
ProducerFactory
createNonTransactionalProducer()
使用 Batch 侦听器的事务
当侦听器在使用事务时失败时,将调用 在回滚发生后执行一些操作。
将 default 与记录侦听器一起使用时,将执行查找,以便重新交付失败的记录。
但是,使用批处理侦听器时,将重新交付整个批处理,因为框架不知道批处理中的哪条记录失败。
有关更多信息,请参阅 After-rollback Processor 。AfterRollbackProcessor
AfterRollbackProcessor
使用批处理侦听器时,版本 2.4.2 引入了一种替代机制来处理处理批处理时的故障。这。
当设置为 true 的容器工厂配置了 时,一次使用一条记录调用侦听器。
这将启用批处理中的错误处理,同时仍可以停止处理整个批处理,具体取决于异常类型。
提供了一个默认值,可以使用标准(如 .
以下测试用例配置代码段说明了如何使用此功能:BatchToRecordAdapter
batchListener
BatchToRecordAdapter
BatchToRecordAdapter
ConsumerRecordRecoverer
DeadLetterPublishingRecoverer
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}
4.1.14. 恰好一次语义
您可以为侦听器容器提供实例。
配置后,容器会在调用侦听器之前启动事务。
侦听器执行的任何操作都参与事务。
如果侦听器成功处理了记录(或使用多个记录,则使用 ),则容器会在事务管理器提交事务之前使用 ),将偏移量发送到事务。
如果侦听器引发异常,则回滚事务并重新定位使用者,以便可以在下一次轮询时检索回滚的记录。
有关更多信息以及处理反复失败的记录,请参阅 After-rollback Processor 。KafkaAwareTransactionManager
KafkaTemplate
BatchMessageListener
producer.sendOffsetsToTransaction()
使用事务可实现 Exactly Once 语义 (EOS)。
这意味着,对于序列,可以保证序列只完成一次。
(读取和进程具有至少 1 次语义)。read→process-write
Spring for Apache Kafka 版本 2.5 及更高版本支持两种 EOS 模式:
-
ALPHA
- 别名 (已弃用)V1
-
BETA
- 别名 (已弃用)V2
-
V1
- 又名击剑(自版本 0.11.0.0 起)transactional.id
-
V2
- 又名 fetch-offset-request fencing (自 2.5 版本起)
使用 mode ,如果启动了另一个具有相同实例的实例,则生产者将被“隔离”。
Spring 通过使用 for each 来管理这一点;当发生再平衡时,新实例将使用相同的实例,而旧的 Producer 将被隔离。V1
transactional.id
Producer
group.id/topic/partition
transactional.id
使用 mode ,每个都没有必要有一个生产者,因为消费者元数据与偏移量一起发送到事务,并且代理可以使用该信息来确定生产者是否被隔离。V2
group.id/topic/partition
从版本 2.6 开始,默认值为 .EOSMode
V2
要将容器配置为使用 mode ,请将 container 属性设置为 ,以恢复到之前的行为。ALPHA
EOSMode
ALPHA
使用 (default),您的代理必须是 2.5 或更高版本; 版本 3.0 中,创建器将不再回退到 ;如果代理不支持 ,则会引发异常。
如果您的代理版本早于 2.5,则必须将 设置为 ,将设置保留为 ,如果您使用的是批处理侦听器,则应设置为 。V2 kafka-clients V1 V2 EOSMode V1 DefaultKafkaProducerFactory producerPerConsumerPartition true subBatchPerPartition true |
当您的代理升级到 2.5 或更高版本时,您应该将模式切换到 ,但生产者的数量将保持不变。
然后,您可以使用 set to 对应用程序进行滚动升级,以减少生产者的数量;您也不应再设置 Container 属性。V2
producerPerConsumerPartition
false
subBatchPerPartition
如果您的代理已经是 2.5 或更高版本,则应将属性设置为 ,以减少所需的生成者数量。DefaultKafkaProducerFactory
producerPerConsumerPartition
false
当 与 一起使用时,在所有应用程序实例中必须是唯一的。EOSMode.V2 producerPerConsumerPartition=false transactional.id |
使用 mode 时,不再需要将 设置为 ;它将默认为 when the is .V2
subBatchPerPartition
true
false
EOSMode
V2
有关更多信息,请参阅 KIP-447。
V1
并且之前是 和 ;它们已更改以使框架与 KIP-732 保持一致。V2
ALPHA
BETA
4.1.15. 将 Spring Bean 连接到生产者/消费者拦截器
Apache Kafka 提供了一种向生产者和使用者添加拦截器的机制。
这些对象由 Kafka 而不是 Spring 管理,因此普通的 Spring 依赖注入不适用于依赖的 Spring Bean 中的连接。
但是,您可以使用 interceptor 方法手动连接这些依赖项。
下面的 Spring Boot 应用程序展示了如何通过覆盖 boot 的默认工厂以将一些依赖的 bean 添加到配置属性中来做到这一点。config()
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
Map<String, Object> consumerProperties = new HashMap<>();
// consumerProperties.put(..., ...)
// ...
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("some.bean", someBean);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(SomeBean someBean) {
Map<String, Object> producerProperties = new HashMap<>();
// producerProperties.put(..., ...)
// ...
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
producerProperties.put("some.bean", someBean);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
return factory;
}
@Bean
public SomeBean someBean() {
return new SomeBean();
}
@KafkaListener(id = "kgk897", topics = "kgh897")
public void listen(String in) {
System.out.println("Received " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("kgh897", "test");
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kgh897")
.partitions(1)
.replicas(1)
.build();
}
}
public class SomeBean {
public void someMethod(String what) {
System.out.println(what + " in my foo bean");
}
}
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.bean.someMethod("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
结果:
producer interceptor in my foo bean
consumer interceptor in my foo bean
Received test
4.1.16. 暂停和恢复侦听器容器
版本 2.1.3 向侦听器容器添加了 and 方法。
以前,您可以在 a 中暂停使用者,然后通过侦听 a 来恢复它,它提供对对象的访问。
虽然您可以使用事件侦听器在空闲容器中暂停使用者,但在某些情况下,这不是线程安全的,因为无法保证在使用者线程上调用事件侦听器。
要安全地暂停和恢复使用者,您应该在侦听器容器上使用 and 方法。
A 在下一个 ;a 在 current 返回后立即生效。
当容器暂停时,它会继续向使用者发送,从而避免在使用组管理时进行再平衡,但它不会检索任何记录。
有关更多信息,请参阅 Kafka 文档。pause()
resume()
ConsumerAwareMessageListener
ListenerContainerIdleEvent
Consumer
pause
resume
pause()
poll()
resume()
poll()
poll()
从版本 2.1.5 开始,您可以调用 以查看是否已调用。
但是,使用者可能实际上尚未暂停。 如果所有实例实际上都已暂停,则返回 true。isPauseRequested()
pause()
isConsumerPaused()
Consumer
此外(也是从 2.1.5 开始),实例以容器作为属性和属性中涉及的实例发布。ConsumerPausedEvent
ConsumerResumedEvent
source
TopicPartition
partitions
以下简单的 Spring Boot 应用程序通过使用容器注册表来获取对方法容器的引用并暂停或恢复其使用者以及接收相应的事件:@KafkaListener
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
下面的清单显示了前面示例的结果:
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2
4.1.17. 暂停和恢复侦听器容器上的分区
从版本 2.7 开始,您可以通过使用侦听器容器中的 and 方法来暂停和恢复分配给该使用者的特定分区的使用。
暂停和恢复分别发生在与 和 方法类似的 and 方法之前和之后。
如果已请求该分区的 pause,则该方法返回 true。
如果该分区已有效暂停,则该方法返回 true。pausePartition(TopicPartition topicPartition)
resumePartition(TopicPartition topicPartition)
poll()
pause()
resume()
isPartitionPauseRequested()
isPartitionPaused()
此外,从版本 2.7 开始,实例以容器作为属性和实例发布。ConsumerPartitionPausedEvent
ConsumerPartitionResumedEvent
source
TopicPartition
4.1.18. 序列化、反序列化和消息转换
概述
Apache Kafka 提供了一个高级 API,用于序列化和反序列化记录值及其键。
它与 和 abstractions 一起存在,并带有一些内置实现。
同时,我们可以通过 using 或 configuration 属性来指定序列化器和反序列化器类。
以下示例显示了如何执行此操作:org.apache.kafka.common.serialization.Serializer<T>
org.apache.kafka.common.serialization.Deserializer<T>
Producer
Consumer
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特殊的情况,(并且因此,) 提供重载的
构造函数 和 和 的实例。KafkaConsumer
KafkaProducer
Serializer
Deserializer
keys
values
使用此 API 时, 和 还会提供属性(通过构造函数或 setter 方法),以将自定义和实例注入目标或 .
此外,还可以通过构造函数传入 或 实例 - 这些 s 在创建每个 或 时调用。DefaultKafkaProducerFactory
DefaultKafkaConsumerFactory
Serializer
Deserializer
Producer
Consumer
Supplier<Serializer>
Supplier<Deserializer>
Supplier
Producer
Consumer
字符串序列化
从版本 2.5 开始, Spring for Apache Kafka 提供了使用实体的 String 表示的类。
它们依赖于 methods 和 some or 来解析 String 和 fill 实例的属性。
通常,这会在类上调用一些静态方法,例如:ToStringSerializer
ParseStringDeserializer
toString
Function<String>
BiFunction<String, Headers>
parse
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下, , 配置为传达有关记录中序列化实体的类型信息 。
您可以通过将属性设置为 false 来禁用此功能。
接收方可以使用此信息。ToStringSerializer
Headers
addTypeInfo
ParseStringDeserializer
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(默认):您可以将其设置为在 (设置属性) 上禁用此功能。true
false
ToStringSerializer
addTypeInfo
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以配置 用于转换为 / from 的默认值。Charset
String
byte[]
UTF-8
您可以使用 properties 使用 parser 方法的名称配置 deserializer:ConsumerConfig
-
ParseStringDeserializer.KEY_PARSER
-
ParseStringDeserializer.VALUE_PARSER
属性必须包含类的完全限定名称,后跟方法名称,用句点分隔。
该方法必须是静态的,并且具有 or 的签名。.
(String, Headers)
(String)
还提供了 A,用于 Kafka Streams。ToFromStringSerde
JSON 格式
Spring for Apache Kafka 还提供基于
Jackson JSON 对象映射器。
它允许将任何 Java 对象写入 JSON 。
这需要一个额外的参数来允许将 consumed 反序列化为适当的目标对象。
以下示例演示如何创建 :JsonSerializer
JsonDeserializer
JsonSerializer
byte[]
JsonDeserializer
Class<?> targetType
byte[]
JsonDeserializer
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以使用 .
您还可以扩展它们以实现方法中的某些特定配置逻辑。JsonSerializer
JsonDeserializer
ObjectMapper
configure(Map<String, ?> configs, boolean isKey)
从版本 2.3 开始,默认情况下,所有 JSON 感知组件都配置了一个实例,该实例禁用了 and 功能。
此外,此类实例还提供了用于自定义数据类型的众所周知的模块,例如 Java 时间和 Kotlin 支持。
有关更多信息,请参阅 JavaDocs。
此方法还将 for objects 序列化注册到纯字符串中,以实现网络上的平台间兼容性。
A 可以在应用程序上下文中注册为 bean,并且它将被自动配置到 Spring Boot ObjectMapper
实例中。JacksonUtils.enhancedObjectMapper()
MapperFeature.DEFAULT_VIEW_INCLUSION
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
JacksonUtils.enhancedObjectMapper()
org.springframework.kafka.support.JacksonMimeTypeModule
org.springframework.util.MimeType
JacksonMimeTypeModule
同样从版本 2.3 开始,提供了基于 -的构造函数,以更好地处理目标通用容器类型。JsonDeserializer
TypeReference
从版本 2.1 开始,您可以在 record 中传达类型信息,从而允许处理多种类型。
此外,您可以使用以下 Kafka 属性配置序列化程序和反序列化程序。
如果您分别为 和 提供了 和 实例,则它们无效。Headers
Serializer
Deserializer
KafkaConsumer
KafkaProducer
配置属性
-
JsonSerializer.ADD_TYPE_INFO_HEADERS
(默认):您可以将其设置为在 (设置属性) 上禁用此功能。true
false
JsonSerializer
addTypeInfo
-
JsonSerializer.TYPE_MAPPINGS
(默认):请参阅映射类型。empty
-
JsonDeserializer.USE_TYPE_INFO_HEADERS
(默认):您可以将其设置为忽略序列化程序设置的标头。true
false
-
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(默认):您可以将其设置为保留序列化程序设置的标头。true
false
-
JsonDeserializer.KEY_DEFAULT_TYPE
:如果不存在标头信息,则用于键反序列化的回退类型。 -
JsonDeserializer.VALUE_DEFAULT_TYPE
:如果不存在标头信息,则用于反序列化值的回退类型。 -
JsonDeserializer.TRUSTED_PACKAGES
(default , ):允许反序列化的包模式的逗号分隔列表。 表示 deserialize all。java.util
java.lang
*
-
JsonDeserializer.TYPE_MAPPINGS
(默认):请参阅映射类型。empty
-
JsonDeserializer.KEY_TYPE_METHOD
(默认):请参阅使用方法确定类型。empty
-
JsonDeserializer.VALUE_TYPE_METHOD
(默认):请参阅使用方法确定类型。empty
从版本 2.2 开始,类型信息 Headers(如果由序列化器添加)由反序列化器删除。
您可以通过直接在反序列化器上或使用前面描述的 configuration 属性将属性设置为 , 来恢复到以前的行为。removeTypeHeaders
false
从版本 2.8 开始,如果你以编程方式构造序列化器或反序列化器,如 编程构造 中所示,只要你没有显式设置任何属性(使用方法或使用 Fluent API),工厂就会应用上述属性。
以前,以编程方式创建时,从不应用配置属性;如果您直接在对象上显式设置属性,则仍然会出现这种情况。set*() |
映射类型
从版本 2.2 开始,在使用 JSON 时,您现在可以使用前面列表中的属性来提供类型映射。
以前,您必须在序列化器和反序列化器中自定义类型映射器。
映射由逗号分隔的对列表组成。
在出站时,有效负载的类名将映射到相应的令牌。
在入站时,type 标头中的令牌将映射到相应的类名。token:className
以下示例创建一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.hat");
相应的对象必须兼容。 |
如果使用 Spring Boot,则可以在 (或 yaml) 文件中提供这些属性。
以下示例显示了如何执行此操作:application.properties
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
您只能使用属性执行简单配置。
对于更高级的配置(例如在序列化器和反序列化器中使用自定义),你应该使用接受预构建的序列化器和反序列化器的 producer 和 consumer factory 构造函数。
Spring Boot 示例覆盖了默认工厂:
还提供了 Setter,作为使用这些构造函数的替代方法。 |
从版本 2.2 开始,你可以显式地将反序列化器配置为使用提供的目标类型,并通过使用具有布尔值的重载构造函数之一(默认情况下)来忽略 Headers 中的类型信息。
以下示例显示了如何执行此操作:useHeadersIfPresent
true
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法确定类型
从版本 2.5 开始,您现在可以通过 properties 配置反序列化器,以调用方法来确定目标类型。
如果存在,这将覆盖上面讨论的任何其他技术。
如果数据是由不使用 Spring 序列化器的应用程序发布的,并且您需要根据数据或其他 Headers 反序列化为不同的类型,那么这可能很有用。
将这些属性设置为方法名称 - 一个完全限定的类名,后跟方法名,用句点分隔。
该方法必须声明为 , have one of three signatures 或 and return a Jackson 。.
public static
(String topic, byte[] data, Headers headers)
(byte[] data, Headers headers)
(byte[] data)
JavaType
-
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
-
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
您可以使用任意标头或检查数据以确定类型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的数据检查,请考虑使用 或类似方法,但是,用于确定类型的测试越简单,过程的效率就越高。JsonPath
以下是以编程方式创建反序列化器的示例(在构造函数中为 Consumer Factory 提供反序列化器时):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程序化构建
当以编程方式构造序列化器/反序列化器以在 producer/consumer 工厂中使用时,从版本 2.3 开始,你可以使用 Fluent API,这简化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
若要以编程方式提供类型映射,类似于 Using Methods to Determine Types,请使用该属性。typeFunction
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要你不使用 Fluent API 来配置属性,或者使用方法设置它们,工厂就会使用配置属性配置序列化器/反序列化器;请参阅配置属性。set*()
委托序列化器和反序列化器
使用标头
版本 2.3 引入了 和 ,它允许生成和使用具有不同键和/或值类型的记录。
生产者必须将 Headers 设置为选择器值,该值用于选择要用于值和键的序列化器;如果未找到匹配项,则引发 AN。DelegatingSerializer
DelegatingDeserializer
DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
IllegalStateException
对于传入记录,反序列化器使用相同的 Headers 来选择要使用的反序列化器;如果未找到匹配项或标头不存在,则返回 RAW。byte[]
您可以通过构造函数配置 selector 到 / 的映射,也可以通过 Kafka 生产者/使用者属性使用键 和 进行配置。
对于序列化程序,producer 属性可以是 a,其中 key 是选择器,value 是实例、序列化程序或类名。
该属性也可以是逗号分隔的映射条目的 String,如下所示。Serializer
Deserializer
DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
Map<String, Object>
Serializer
Class
对于反序列化器,consumer 属性可以是 a,其中 key 是选择器,value 是实例、反序列化器或类名。
该属性也可以是逗号分隔的映射条目的 String,如下所示。Map<String, Object>
Deserializer
Class
要使用 properties 进行配置,请使用以下语法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
然后,创建者会将标头设置为 或 。DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
thing1
thing2
此技术支持向同一主题(或不同主题)发送不同类型的内容。
从版本 2.5.1 开始,如果类型(键或值)是 (, , etc) 支持的标准类型之一,则无需设置 selector 标头。
相反,序列化程序会将标头设置为类型的类名。
没有必要为这些类型配置 serializer 或 deserializers,它们将动态创建(一次)。Serdes Long Integer |
有关将不同类型发送到不同主题的另一种技术,请参阅使用 RoutingKafkaTemplate
。
按类型
版本 2.8 引入了 .DelegatingByTypeSerializer
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从版本 2.8.3 开始,你可以配置序列化器来检查是否可以从目标对象分配 map 键,这在委托序列化器可以序列化子类时很有用。
在这种情况下,如果存在友好匹配项,则应提供 ordered ,例如 a 。Map
LinkedHashMap
按主题
从版本 2.8 开始, 和 允许根据主题名称选择序列化器/反序列化器。
Regex 用于查找要使用的实例。
可以使用构造函数或通过 properties(逗号分隔的列表)配置映射。DelegatingByTopicSerializer
DelegatingByTopicDeserializer
Pattern
pattern:serializer
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
在将此选项用于键时使用。KEY_SERIALIZATION_TOPIC_CONFIG
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null,
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以使用 和 指定在没有模式匹配时使用的默认序列化器/反序列化器。DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
当设置为 时,附加属性 (default ) 使主题查找不区分大小写。DelegatingByTopicSerialization.CASE_SENSITIVE
true
false
重试 Deserializer
当委托在反序列化期间可能存在暂时性错误(如网络问题)时,使用委托 and 重试反序列化。RetryingDeserializer
Deserializer
RetryTemplate
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
有关使用重试策略、回退策略等的配置,请参阅 spring-retry 项目。RetryTemplate
Spring 消息传递消息转换
尽管从低级 Kafka 和角度来看,和 API 非常简单灵活,但在使用 Spring Integration的 Apache Kafka 支持时,您可能需要在 Spring 消息传递级别具有更大的灵活性。
为了让您轻松地转换为 Spring for Apache Kafka 提供了带有实现及其(和子类)自定义的抽象。
您可以直接将 the 注入到实例中,也可以通过对属性使用 bean 定义来注入。
以下示例显示了如何执行此操作:Serializer
Deserializer
Consumer
Producer
@KafkaListener
org.springframework.messaging.Message
MessageConverter
MessagingMessageConverter
JsonMessageConverter
MessageConverter
KafkaTemplate
AbstractKafkaListenerContainerFactory
@KafkaListener.containerFactory()
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
使用 Spring Boot 时,只需将转换器定义为 a ,Spring Boot 自动配置就会将其连接到自动配置的模板和容器工厂中。@Bean
当您使用 时,将向消息转换器提供参数类型以帮助进行转换。@KafkaListener
只有在方法级别声明 Comments 时,才能实现此类型推断。
使用 class-level ,有效负载类型用于选择要调用的方法,因此在选择方法之前必须已转换该方法。 |
在使用者端,您可以配置 ;它可以处理 类型的值,因此应与 、 或 结合使用。
( 并且效率更高,因为它们避免了不必要的转换)。
如果你愿意,你还可以配置对应于 deserializer 的特定子类。 在生产者端,当您使用 Spring Integration 或方法(请参阅 使用
同样,使用 or 更有效,因为它们避免了 to 转换。 为方便起见,从版本 2.3 开始,该框架还提供了一个可以序列化所有三种值类型的工具,以便它可以与任何消息转换器一起使用。 |
从版本 2.7.1 开始,消息负载转换可以委托给 ;例如,这允许基于 Headers 进行转换。spring-messaging
SmartMessageConverter
MessageHeaders.CONTENT_TYPE
调用该方法以将消息有效负载出站转换为 a。
该方法用于入站转化 from,有效负载为属性。
调用该方法以从传递给 'fromMessage()' (通常由 ) 创建新的出站。
同样,在该方法中,在转换器从 创建新的 后,将调用该方法,然后使用新转换的有效负载创建最终的入站消息。
在任一情况下,如果 returns ,则使用原始消息。KafkaMessageConverter.fromMessage() ProducerRecord ProducerRecord.value() KafkaMessageConverter.toMessage() ConsumerRecord ConsumerRecord.value() SmartMessageConverter.toMessage() Message<?> Message KafkaTemplate.send(Message<?> msg) KafkaMessageConverter.toMessage() Message<?> ConsumerRecord SmartMessageConverter.fromMessage() SmartMessageConverter null |
当 和 listener 容器工厂中使用默认转换器时,您可以通过调用模板和属性 on methods 来配置 。KafkaTemplate
SmartMessageConverter
setMessagingConverter()
contentMessageConverter
@KafkaListener
例子:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring Data Projection 接口
从版本 2.1.1 开始,你可以将 JSON 转换为 Spring Data Projection 接口,而不是具体类型。 这允许对数据进行非常有选择性的低耦合绑定,包括从 JSON 文档中的多个位置查找值。 例如,可以将以下接口定义为消息有效负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
默认情况下,将使用访问器方法在接收到的 JSON 文档中查找属性名称作为字段。
该表达式允许自定义值查找,甚至定义多个 JSON 路径表达式,以便从多个位置查找值,直到表达式返回实际值。@JsonPath
要启用此功能,请使用配置了适当的代理转换器(用于出站转换和转换非投影接口)。
还必须将 and 添加到类路径中。ProjectingMessageConverter
spring-data:spring-data-commons
com.jayway.jsonpath:json-path
当用作方法的参数时,接口类型会像往常一样自动传递给转换器。@KafkaListener
用ErrorHandlingDeserializer
当反序列化器无法反序列化消息时, Spring 无法处理该问题,因为它发生在返回之前。
为了解决这个问题,引入了 。
此 deserializer 委托给真正的 deserializer (key 或 value)。
如果委托无法反序列化记录内容,则 将在包含原因和原始字节的标头中返回一个值和 a。
使用 record-level 时,如果 包含 key 或 value 的标头,则使用 failed 调用容器的 。
记录不会传递给侦听器。poll()
ErrorHandlingDeserializer
ErrorHandlingDeserializer
null
DeserializationException
MessageListener
ConsumerRecord
DeserializationException
ErrorHandler
ConsumerRecord
或者,您也可以通过提供 来配置 以创建自定义值,即 .
调用此函数以创建 的实例,该实例以通常的方式传递给侦听器。
向函数提供包含所有上下文信息的 object 类型的对象。
您可以在 headers 中找到 (作为序列化的 Java 对象)。
有关更多信息,请参阅 Javadoc 。ErrorHandlingDeserializer
failedDeserializationFunction
Function<FailedDeserializationInfo, T>
T
FailedDeserializationInfo
DeserializationException
ErrorHandlingDeserializer
您可以使用采用键和值对象的构造函数,并连接已使用适当委托配置的相应实例。
或者,您也可以使用使用者配置属性(由 使用)来实例化委托。
属性名称为 和 。
属性值可以是类或类名。
以下示例显示如何设置这些属性:DefaultKafkaConsumerFactory
Deserializer
ErrorHandlingDeserializer
ErrorHandlingDeserializer
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用 .failedDeserializationFunction
public class BadFoo extends Foo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedFooProvider implements Function<FailedDeserializationInfo, Foo> {
@Override
public Foo apply(FailedDeserializationInfo info) {
return new BadFoo(info);
}
}
前面的示例使用以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.class);
...
如果 consumer 配置了 ,那么使用 serializer 配置 the 及其 producer 是很重要的,该序列化器可以处理普通对象以及由反序列化异常导致的原始值。
模板的 generic value type 应为 。
一种方法是使用 ;示例如下:ErrorHandlingDeserializer KafkaTemplate byte[] Object DelegatingByTypeSerializer |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
将 与 batch listener 一起使用时,必须检查消息标头中的反序列化异常。
与 一起使用时,您可以使用该标头来确定异常失败的记录,并通过 与错误处理程序通信。ErrorHandlingDeserializer
DefaultBatchErrorHandler
BatchListenerFailedException
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
DeserializationException deserEx = ListenerUtils.byteArrayToDeserializationException(this.logger,
(byte[]) headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
ListenerUtils.byteArrayToDeserializationException()
可用于将标头转换为 .DeserializationException
当使用 , 时,改用:List<ConsumerRecord<?, ?>
ListenerUtils.getExceptionFromHeader()
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果您还使用 ,则为 a 发布的记录将具有 a 类型的 ;这不应该被序列化。
考虑使用配置为使用 for 和普通序列化器(Json、Avro 等)来处理所有其他类型的序列化程序。DeadLetterPublishingRecoverer DeserializationException record.value() byte[] DelegatingByTypeSerializer ByteArraySerializer byte[] |
使用批量侦听器进行有效负载转换
当您使用批处理侦听器容器工厂时,您还可以使用 within a 来转换批处理消息。
有关更多信息,请参见序列化、反序列化和消息转换和 Spring 消息传递消息转换。JsonMessageConverter
BatchMessagingMessageConverter
默认情况下,转换的类型是从 listener 参数推断的。
如果使用 a 将其设置为 (而不是 default) 进行配置,则转换器将改用 headers 中的类型信息(如果存在)。
例如,这允许使用接口而不是具体类来声明侦听器方法。
此外,类型转换器支持映射,因此反序列化可以是与源不同的类型(只要数据兼容)。
当您使用类级 @KafkaListener
实例时,这也很有用,其中有效负载必须已转换以确定要调用的方法。
下面的示例创建使用此方法的 bean:JsonMessageConverter
DefaultJackson2TypeMapper
TypePrecedence
TYPE_ID
INFERRED
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,要使其正常工作,转换目标的方法签名必须是具有单个泛型参数类型的容器对象,如下所示:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
请注意,您仍然可以访问批处理标头。
如果批处理转换器具有支持它的记录转换器,则您还可以接收一个消息列表,其中的有效负载根据泛型类型进行转换。 以下示例显示了如何执行此操作:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen1(List<Message<Foo>> fooMessages) {
...
}
ConversionService
定制
从版本 2.1.1 开始,默认情况下用于解析调用侦听器方法的参数的 bean 与实现以下任何接口的所有 bean 一起提供:org.springframework.core.convert.ConversionService
o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory
-
org.springframework.core.convert.converter.Converter
-
org.springframework.core.convert.converter.GenericConverter
-
org.springframework.format.Formatter
这样,您就可以进一步自定义侦听器反序列化,而无需更改 和 的默认配置。ConsumerFactory
KafkaListenerContainerFactory
在 通过 bean 上设置 custom 将禁用此功能。MessageHandlerMethodFactory KafkaListenerEndpointRegistrar KafkaListenerConfigurer |
将 custom 添加到HandlerMethodArgumentResolver
@KafkaListener
从版本 2.4.2 开始,您可以添加自己的和解析自定义方法参数。
你只需要实现和使用类 中的方法。HandlerMethodArgumentResolver
KafkaListenerConfigurer
setCustomMethodArgumentResolvers()
KafkaListenerEndpointRegistrar
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
您还可以通过向 Bean 添加自定义来完全替换框架的参数解析。
如果你这样做了,并且你的应用程序需要处理 tombstone 记录,使用 a (例如,来自压缩的主题),你应该将 a 添加到工厂;它必须是最后一个解析程序,因为它支持所有类型的解析器,并且可以在没有注释的情况下匹配参数。
如果您使用的是 ,请将此解析程序设置为最后一个自定义解析程序;工厂将确保此解析器将在 standard 之前使用,该 standard 对有效负载一无所知。MessageHandlerMethodFactory
KafkaListenerEndpointRegistrar
null
value()
KafkaNullAwarePayloadArgumentResolver
@Payload
DefaultMessageHandlerMethodFactory
PayloadMethodArgumentResolver
KafkaNull
4.1.19. 消息头
0.11.0.0 客户端引入了对消息中标头的支持。
从版本 2.0 开始, Spring for Apache Kafka 现在支持将这些 Headers 映射到和从。spring-messaging
MessageHeaders
以前的版本映射并映射到 spring-messaging ,其中 value 属性映射到 和其他属性(、 等)映射到 headers。
情况仍然如此,但现在可以映射其他(任意)标头。ConsumerRecord ProducerRecord Message<?> payload topic partition |
Apache Kafka 标头具有一个简单的 API,如以下接口定义所示:
public interface Header {
String key();
byte[] value();
}
提供该策略以在 Kafka 和 之间映射标头条目。
其接口定义如下:KafkaHeaderMapper
Headers
MessageHeaders
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
将原始标头映射为 ,其中包含用于转换为值的配置选项。SimpleKafkaHeaderMapper
byte[]
String
将键映射到报头名称,为了支持出站消息的丰富报头类型,将执行 JSON 转换。
“特殊”标头(键为 )包含 的 JSON 映射。
此标头在入站端使用,以提供每个标头值到原始类型的适当转换。DefaultKafkaHeaderMapper
MessageHeaders
spring_json_header_types
<key>:<type>
在入站端,所有 Kafka 实例都映射到 。
在出站端,默认情况下,除 、 和映射到 properties 的标头外,所有 Headers 都被映射。Header
MessageHeaders
MessageHeaders
id
timestamp
ConsumerRecord
您可以通过向映射器提供模式来指定要为出站消息映射的标头。 下面的清单显示了一些示例映射:
public DefaultKafkaHeaderMapper() { (1)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public DefaultKafkaHeaderMapper(String... patterns) { (3)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
1 | 使用默认 Jackson 并映射大多数 headers,如示例前所述。ObjectMapper |
2 | 使用提供的 Jackson 并映射大多数标头,如示例前所述。ObjectMapper |
3 | 使用默认的 Jackson 并根据提供的模式映射标头。ObjectMapper |
4 | 使用提供的 Jackson 并根据提供的模式映射标头。ObjectMapper |
模式相当简单,可以包含前导通配符 (、尾随通配符或两者 (例如)。
您可以使用前导 .
与标头名称匹配的第一个模式(无论是正的还是负的)获胜。
.cat.*
!
当您提供自己的模式时,我们建议包括 和 ,因为这些标头在入站端是只读的。!id
!timestamp
默认情况下,映射器仅反序列化 和 中的类。
您可以通过使用该方法添加受信任的包来信任其他(或所有)包。
如果您收到来自不受信任的来源的消息,您可能希望只添加您信任的那些包。
要信任所有包,您可以使用 .java.lang java.util addTrustedPackages mapper.addTrustedPackages("*") |
在与不知道映射器的 JSON 格式的系统通信时,以原始形式映射标头值非常有用。String |
从版本 2.2.5 开始,您可以指定某些字符串值标头不应使用 JSON 进行映射,而是应映射到/从 raw 进行映射。
具有新属性; 当设置为 true 时,所有字符串值标头都将转换为使用 property (default) 。
此外,还有一个属性 ,它是 的映射 ;如果 Map 包含 Headers 名称,并且 Headers 包含一个值,则将使用 CharSet 将其映射为 Raw。
此 map 还用于当且仅当 map 值中的布尔值为 .
如果布尔值为 ,或者 Headers 名称在映射中没有值,则传入的 Headers 将简单地映射为原始未映射的 Headers。byte[]
AbstractKafkaHeaderMapper
mapAllStringsOut
byte[]
charset
UTF-8
rawMappedHeaders
header name : boolean
String
byte[]
byte[]
String
true
false
true
以下测试用例说明了此机制。
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
默认情况下,两个 Headers 映射器都会映射所有入站 Headers。 从版本 2.8.8 开始,模式也可以应用于入站映射。 要创建用于入站映射的映射器,请在相应的映射器上使用静态方法之一:
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以 开头的标头,并包括所有其他标头。abc
默认情况下,只要 Jackson 位于类路径上,就会在 和 中使用 。DefaultKafkaHeaderMapper
MessagingMessageConverter
BatchMessagingMessageConverter
使用批处理转换器,转换后的标头以 as a 形式提供,其中列表位置的映射对应于有效负载中的数据位置。KafkaHeaders.BATCH_CONVERTED_HEADERS
List<Map<String, Object>>
如果没有转换器(因为 Jackson 不存在或显式设置为 ),则使用者记录中的 Headers 在 Headers 中提供未转换。
此标头是一个对象(在批处理转换器的情况下为 a),其中列表中的位置对应于有效负载中的数据位置。null
KafkaHeaders.NATIVE_HEADERS
Headers
List<Headers>
某些类型不适合 JSON 序列化,对于这些类型,可能首选简单的序列化。
具有一个名为 method 的方法,它允许您提供应以这种方式处理出站映射的类的名称。
在入站映射期间,它们被映射为 。
默认情况下,只有 和 以这种方式映射。toString() DefaultKafkaHeaderMapper addToStringClasses() String org.springframework.util.MimeType org.springframework.http.MediaType |
从版本 2.3 开始,简化了 String 值 Headers 的处理。
默认情况下,此类标头不再是 JSON 编码的(即它们没有添加 enhalation)。
该类型仍会添加到 JSON_TYPES 标头中,以便接收系统可以转换回 String (from)。
mapper 可以处理(解码)旧版本生成的 headers(它会检查前导);这样,使用 2.3 的应用程序可以使用旧版本中的记录。"…" byte[] " |
为了与早期版本兼容,如果使用 2.3 的版本生成的记录可能被使用早期版本的应用程序使用,请设置为 。
当所有应用程序都使用 2.3 或更高版本时,可以将该属性保留为其默认值 。encodeStrings true false |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,它会自动将此转换器 bean 配置到自动配置的 ;否则,您应该将此转换器添加到模板中。KafkaTemplate
4.1.20. 'Tombstone' 记录的 Null 有效负载和日志压缩
当您使用 Log Compaction 时,您可以发送和接收带有负载的消息,以识别密钥的删除。null
您还可以出于其他原因接收值,例如,当 a 无法反序列化值时可能会返回该值。null
Deserializer
null
要使用 发送有效负载,可以将 null 传递到方法的 value 参数中。
一个例外是变体。
由于不能有有效负载,因此您可以使用名为 的特殊有效负载类型,框架会发送 .
为方便起见,提供了 static。null
KafkaTemplate
send()
send(Message<?> message)
spring-messaging
Message<?>
null
KafkaNull
null
KafkaNull.INSTANCE
当您使用消息侦听器容器时,收到的容器具有一个 .ConsumerRecord
null
value()
要配置 以处理有效负载,必须将注释与 .
如果它是压缩日志的逻辑删除消息,您通常还需要该密钥,以便您的应用程序可以确定哪个密钥被“删除”。
以下示例显示了此类配置:@KafkaListener
null
@Payload
required = false
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
// value == null represents key deletion
}
当您将类级别与多个方法一起使用时,需要一些额外的配置。
具体来说,您需要一个具有有效负载的方法。
以下示例显示如何配置一个:@KafkaListener
@KafkaHandler
@KafkaHandler
KafkaNull
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
...
}
}
请注意,参数是 ,而不是 。null
KafkaNull
请参阅手动分配所有分区。 |
此功能需要使用 框架将在使用默认 .
使用自定义时,请参阅将自定义 HandlerMethodArgumentResolver 添加到@KafkaListener 。KafkaNullAwarePayloadArgumentResolver MessageHandlerMethodFactory MessageHandlerMethodFactory |
4.1.21. 处理异常
本节介绍如何处理在使用 Spring for Apache Kafka 时可能出现的各种异常。
侦听器错误处理程序
从版本 2.0 开始,注解有一个新属性:.@KafkaListener
errorHandler
您可以使用 来提供实现的 bean 名称。
这个功能接口有一个方法,如下面的清单所示:errorHandler
KafkaListenerErrorHandler
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以访问消息转换器生成的 spring-messaging 对象和侦听器抛出的异常,该异常包装在 .
错误处理程序可以引发原始异常或新异常,该异常将引发到容器中。
错误处理程序返回的任何内容都将被忽略。Message<?>
ListenerExecutionFailedException
从版本 2.7 开始,您可以在 和 上设置属性,这会导致将 raw 添加到 headers 中的 converted。
这很有用,例如,如果您希望在 listener 错误处理程序中使用 。
它可能用于请求/回复方案,即您希望在一定次数的重试后,在死信主题中捕获失败的记录后,将失败结果发送给发件人。rawRecordHeader
MessagingMessageConverter
BatchMessagingMessageConverter
ConsumerRecord
Message<?>
KafkaHeaders.RAW_DATA
DeadLetterPublishingRecoverer
@Bean
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口 (),可以通过以下方法访问 consumer 对象:ConsumerAwareListenerErrorHandler
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
例如,如果您的错误处理程序实现了此接口,则可以相应地调整偏移量。 例如,要重置偏移量以重播失败的消息,您可以执行以下操作:
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
this.listen3Exception = e;
MessageHeaders headers = m.getHeaders();
c.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
同样,您可以对批处理侦听器执行以下操作:
@Bean
public ConsumerAwareListenerErrorHandler listen10ErrorHandler() {
return (m, e, c) -> {
this.listen10Exception = e;
MessageHeaders headers = m.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
int index = i;
offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
(k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
}
offsetsToReset.forEach((k, v) -> c.seek(k, v));
return null;
};
}
这会将批处理中的每个主题/分区重置为批处理中的最低偏移量。
前面的两个示例是简单的实现,您可能希望在错误处理程序中进行更多检查。 |
容器错误处理程序
从版本 2.8 开始,legacy 和 interfaces 已被新的 .
这些错误处理程序可以处理记录侦听器和批处理侦听器的错误,从而允许单个侦听器容器工厂为这两种类型的侦听器创建容器。 提供了替换大多数旧版框架错误处理程序实现的实现,并弃用了旧版错误处理程序。
侦听器容器和侦听器容器工厂仍支持传统接口;它们将在将来的发行版中弃用。ErrorHandler
BatchErrorHandler
CommonErrorHandler
CommonErrorHandler
有关将自定义错误处理程序迁移到 的信息,请参阅将自定义旧版错误处理程序实现迁移到 CommonErrorHandler
。CommonErrorHandler
使用事务时,默认情况下不会配置错误处理程序,以便异常将回滚事务。
事务容器的错误处理由 AfterRollbackProcessor
处理。
如果您在使用事务时提供自定义错误处理程序,并且您希望回滚事务,它必须引发异常。
此接口有一个 default 方法,容器调用该方法来确定如果错误处理程序返回而不引发异常,是否应提交偏移量;默认情况下,它返回 true。isAckAfterHandle()
通常,当错误未被“处理”时(例如,在执行 seek 操作之后),框架提供的错误处理程序将引发异常。
默认情况下,此类异常由容器记录在 level .
所有框架错误处理程序都进行了扩展,这允许您控制记录这些异常的级别。ERROR
KafkaExceptionLogLevelAware
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定要用于容器工厂中所有侦听器的全局错误处理程序。 以下示例显示了如何执行此操作:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果带注释的侦听器方法引发异常,则会将其抛出到容器中,并根据容器配置处理消息。
容器在调用错误处理程序之前提交任何待处理的偏移量提交。
如果您使用的是 Spring Boot,则只需将错误处理程序添加为 a,Boot 就会将其添加到自动配置的工厂中。@Bean
DefaultErrorHandler
这个新的错误处理程序取代了 和 ,它们现在是多个版本的默认错误处理程序。
一个区别是,批处理侦听器的回退行为(当引发除 a 以外的异常时)等效于重试完整批处理。SeekToCurrentErrorHandler
RecoveringBatchErrorHandler
BatchListenerFailedException
错误处理程序可以恢复 (跳过) 不断失败的记录。
默认情况下,在 10 次失败后,将记录失败的记录(在 级别)。
您可以使用自定义 recoverer () 和 a 来配置处理程序,该 () 和控制每个之间的交付尝试和延迟。
使用 with 会导致(有效地)无限次重试。
以下示例配置三次尝试后的恢复:ERROR
BiConsumer
BackOff
FixedBackOff
FixedBackOff.UNLIMITED_ATTEMPTS
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要使用此处理程序的自定义实例配置侦听器容器,请将其添加到容器工厂。
例如,对于容器工厂,您可以按如下方式添加:@KafkaListener
DefaultErrorHandler
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于记录侦听器,这将重试投放最多 2 次(3 次投放尝试),后退 1 秒,而不是默认配置 ()。
在重试次数用尽后,只会记录失败。FixedBackOff(0L, 9)
举个例子;如果返回 6 条记录(每个分区 0、1、2 各 2 条),并且侦听器在第 4 条记录上引发异常,则容器通过提交前三条消息的偏移量来确认前三条消息。
寻求分区 1 的偏移量 1 和分区 2 的偏移量 0。
next 返回 3 个未处理的记录。poll
DefaultErrorHandler
poll()
如果是 ,则容器在调用错误处理程序之前提交前两个分区的偏移量。AckMode
BATCH
对于批处理侦听器,侦听器必须引发一个,指示批处理中的哪些记录失败。BatchListenerFailedException
事件的顺序是:
-
在索引之前提交记录的偏移量。
-
如果重试次数未用尽,则执行 seek 操作,以便重新传送所有剩余记录(包括失败的记录)。
-
如果重试次数已用尽,请尝试恢复失败的记录(仅限默认日志)并执行查找,以便重新传递剩余记录(不包括失败的记录)。 已提交已恢复记录的偏移量
-
如果重试已用尽且恢复失败,则执行查找,就像重试未用尽一样。
默认 recoverer 会在重试次数用尽后记录失败的记录。
您可以使用自定义恢复器,也可以使用框架提供的恢复器,例如 DeadLetterPublishingRecoverer
。
当使用 POJO 批处理侦听器(例如 )并且您没有完整的使用者记录要添加到异常时,您只需添加失败记录的索引:List<Thing>
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < records.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置了 时,可以将错误处理程序配置为提交已恢复记录的偏移量;将属性设置为 。AckMode.MANUAL_IMMEDIATE
commitRecovered
true
另请参阅发布死信记录。
使用事务时,类似的功能由 .
请参阅 After-rollback Processor (回滚后处理器)。DefaultAfterRollbackProcessor
这会将某些异常视为致命异常,并且会跳过此类异常的重试;在第一次失败时调用 recoverer。
默认情况下,被视为致命的异常包括:DefaultErrorHandler
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
,因为这些异常不太可能在重试的投放中得到解决。
您可以向 not-retryable 类别添加更多异常类型,或完全替换分类异常的映射。
有关更多信息,请参阅 Javadocs 和 以及 .DefaultErrorHandler.addNotRetryableException()
DefaultErrorHandler.setClassifications()
spring-retry
BinaryExceptionClassifier
下面是一个添加到不可重试异常的示例:IllegalArgumentException
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
错误处理程序可以配置一个或多个 s,用于接收重试和恢复进度的通知。
从版本 2.8.10 开始,添加了批处理侦听器的方法。RetryListener
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 javadocs。
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
如果 recoverer 失败,则默认情况下将重置 ,并且重新交付将在再次尝试恢复之前再次通过回退。
要在恢复失败后跳过重试,请将错误处理程序的 设置为 。BackOff resetStateOnRecoveryFailure false |
您可以向错误处理程序提供 a ,以根据失败的记录和/或异常来确定要使用的:BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
BackOff
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 ,则将使用处理程序的默认值。null
BackOff
设置为 ,如果异常类型在两次失败之间发生更改,则重试序列将重新启动(包括选择新的 ,如果已配置)。
默认情况下,不考虑异常类型。resetStateOnExceptionChange
true
BackOff
另请参阅 Delivery Attempts 标头。
使用批处理错误处理程序的转换错误
从版本 2.8 开始,当将 a 与 、 a 或 a 以及 一起使用时,批处理侦听器现在可以正确处理转换错误。
发生转换错误时,有效负载将设置为 null,并向记录标头添加反序列化异常,类似于 .
侦听器中提供了 s 的列表,因此侦听器可以抛出 a,指示发生转换异常的第一个索引。MessageConverter
ByteArrayDeserializer
BytesDeserializer
StringDeserializer
DefaultErrorHandler
ErrorHandlingDeserializer
ConversionException
BatchListenerFailedException
例:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
重试完整批处理
现在是批处理侦听器的回退行为,其中侦听器会引发除 .DefaultErrorHandler
BatchListenerFailedException
无法保证在重新交付批次时,该批次具有相同的记录数和/或重新交付的记录具有相同的顺序。
因此,不可能轻松维护批处理的重试状态。
它采用以下方法。
如果批处理侦听器引发的异常不是 a ,则从内存中的记录批处理中执行重试。
为了避免在延长的重试序列期间发生重新平衡,错误处理程序会暂停使用者,在每次重试时在休眠前轮询使用者以进行回退,然后再次调用侦听器。
如果/当重试用尽时,将为批处理中的每条记录调用 the。
如果 recoverer 抛出异常,或者线程在休眠期间中断,则该批记录将在下一次轮询时重新传递。
在退出之前,无论结果如何,使用者都会恢复。FallbackBatchErrorHandler
BatchListenerFailedException
ConsumerRecordRecoverer
此机制不能用于事务。 |
在等待间隔时,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在停止后很快退出,而不是导致延迟。BackOff
stop()
容器停止错误处理程序
如果侦听器引发异常,则停止容器。
对于记录侦听器,当 is 时,将提交已处理记录的偏移量。
对于记录侦听器,当 为 任何手动值时,将提交已确认记录的偏移量。
对于记录侦听器,wWhen the is 或对于批处理侦听器,在重新启动容器时重放整个批处理。CommonContainerStoppingErrorHandler
AckMode
RECORD
AckMode
AckMode
BATCH
容器停止后,将引发包装 the 的异常。
这是为了使事务回滚(如果启用了事务)。ListenerExecutionFailedException
委派错误处理程序
可以根据异常类型委托给不同的错误处理程序。
例如,您可能希望为大多数异常调用 a,或为其他异常调用 a。CommonDelegatingErrorHandler
DefaultErrorHandler
CommonContainerStoppingErrorHandler
日志记录错误处理程序
它只是记录异常;使用 Record 侦听器时,上一次轮询的剩余记录将传递给侦听器。
对于批处理侦听器,将记录批处理中的所有记录。CommonLoggingErrorHandler
对 Record 和 Batch 侦听器使用不同的常见错误处理程序
如果您希望对记录和批处理侦听器使用不同的错误处理策略,则提供了允许为每个侦听器类型配置特定错误处理程序的 。CommonMixedErrorHandler
常见错误处理程序 Summery
-
DefaultErrorHandler
-
CommonContainerStoppingErrorHandler
-
CommonDelegatingErrorHandler
-
CommonLoggingErrorHandler
-
CommonMixedErrorHandler
旧版错误处理程序及其替换
旧版错误处理程序 | 更换 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
无替换,与无限 . |
|
|
|
无替换 - 使用并引发除 . |
将自定义旧版错误处理程序实现迁移到CommonErrorHandler
请参阅 中的 javadocs。CommonErrorHandler
要替换 or 实现,您应该 implement 并保留 return (default)。
您还应该实现 - 以处理发生在记录处理范围之外的异常(例如消费者错误)。ErrorHandler
ConsumerAwareErrorHandler
handleRecord()
remainingRecords()
false
handleOtherException()
要替换实现,您应该 implement 和 override 以返回 。
您还应该实现 - 以处理发生在记录处理范围之外的异常(例如消费者错误)。RemainingRecordsErrorHandler
handleRemaining()
remainingRecords()
true
handleOtherException()
要替换任何 implementation,您应该 implement You also should implement - 以处理发生在记录处理范围之外的异常(例如消费者错误)。BatchErrorHandler
handleBatch()
handleOtherException()
回滚后处理器
使用事务时,如果侦听器引发异常(并且错误处理程序(如果存在)引发异常),则事务将回滚。
默认情况下,任何未处理的记录(包括失败的记录)都会在下次轮询时重新获取。
这是通过在 .
使用批处理侦听器,将重新处理整个记录批次(容器不知道批处理中的哪条记录失败)。
要修改此行为,您可以使用自定义 .
例如,对于基于记录的侦听器,您可能希望跟踪失败的记录,并在尝试一定次数后放弃,也许是通过将其发布到死信主题。seek
DefaultAfterRollbackProcessor
AfterRollbackProcessor
从版本 2.2 开始,现在可以恢复(跳过)不断失败的记录。
默认情况下,在 10 次失败后,将记录失败的记录(在 级别)。
您可以使用自定义 recoverer () 和最大故障数来配置处理器。
将属性设置为负数会导致无限次重试。
以下示例配置三次尝试后的恢复:DefaultAfterRollbackProcessor
ERROR
BiConsumer
maxFailures
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
当您不使用事务时,可以通过配置 .
请参阅 容器错误处理程序。DefaultErrorHandler
批处理侦听器无法进行恢复,因为框架不知道批处理中的哪条记录不断失败。 在这种情况下,应用程序侦听器必须处理不断失败的记录。 |
另请参阅发布死信记录。
从版本 2.2.5 开始,可以在新事务中调用(在失败的事务回滚后启动)。
然后,如果您使用 to publish a failed record,则处理器会将原始 topic/partition 中恢复的记录的偏移量发送到事务。
要启用此功能,请在 .DefaultAfterRollbackProcessor
DeadLetterPublishingRecoverer
commitRecovered
kafkaTemplate
DefaultAfterRollbackProcessor
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
从版本 2.5.5 开始,如果 recoverer 失败,则默认情况下将重置 ,并且重新交付将再次通过回退,然后再尝试恢复。
在早期版本中,不会重置 ,并在下次失败时重新尝试恢复。
要恢复到以前的行为,请将处理器的 属性设置为 。BackOff BackOff resetStateOnRecoveryFailure false |
从版本 2.6 开始,您现在可以为处理器提供一个 ,以根据失败的记录和/或异常来确定要使用的:BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
BackOff
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 ,则将使用处理器的默认值。null
BackOff
从版本 2.6.3 开始,如果异常类型在失败之间发生变化,则设置为 并且重试序列将重新启动(包括选择新的,如果已配置)。
默认情况下,不考虑异常类型。resetStateOnExceptionChange
true
BackOff
从版本 2.3.1 开始,类似于 ,将某些异常视为致命的,并且对于此类异常,将跳过重试;在第一次失败时调用 recoverer。
默认情况下,被视为致命的异常包括:DefaultErrorHandler
DefaultAfterRollbackProcessor
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
,因为这些异常不太可能在重试的投放中得到解决。
您可以向 not-retryable 类别添加更多异常类型,或完全替换分类异常的映射。
有关更多信息,请参阅 Javadocs,以及 .DefaultAfterRollbackProcessor.setClassifications()
spring-retry
BinaryExceptionClassifier
下面是一个添加到不可重试异常的示例:IllegalArgumentException
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另请参阅 Delivery Attempts 标头。
使用 current 时,容器无法检测 a 是由再平衡引起的,还是由于超时或到期而撤销了生产者。
因为,在大多数情况下,这是由再平衡引起的,所以容器不会调用 (因为寻找分区是不合适的,因为我们不再被分配它们)。
如果您确保超时足够大以处理每个事务并定期执行“空”事务(例如通过 a ),则可以避免由于超时和到期而导致的隔离。
或者,您可以将 container 属性设置为,容器将停止,从而避免丢失记录。
您可以使用 a 并检查 属性 来检测此情况。
由于该事件还引用了容器,因此您可以使用此事件重新启动容器。kafka-clients ProducerFencedException transactional.id AfterRollbackProcessor ListenerContainerIdleEvent stopContainerWhenFenced true ConsumerStoppedEvent Reason FENCED |
从版本 2.7 开始,在等待间隔时,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在之后很快退出,而不是导致延迟。BackOff
stop()
从版本 2.7 开始,处理器可以配置一个或多个 s,以接收重试和恢复进度的通知。RetryListener
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 javadocs。
Delivery Attempts 标头
以下内容仅适用于记录侦听器,不适用于批处理侦听器。
从版本 2.5 开始,当使用 或 implements 时,可以启用向记录添加 header () 。
此标头的值是从 1 开始的递增整数。
当接收 raw 时,整数位于 .ErrorHandler
AfterRollbackProcessor
DeliveryAttemptAware
KafkaHeaders.DELIVERY_ATTEMPT
kafka_deliveryAttempt
ConsumerRecord<?, ?>
byte[4]
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt()
与 or 一起使用时,可以通过将它作为参数添加到 listener 方法中来获取。@KafkaListener
DefaultKafkaHeaderMapper
SimpleKafkaHeaderMapper
@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery
要启用此标头的填充,请将 container 属性设置为 。
默认情况下,它是禁用的,以避免查找每条记录的状态并添加标头的 (小) 开销。deliveryAttemptHeader
true
的 和 支持此功能。DefaultErrorHandler
DefaultAfterRollbackProcessor
侦听器信息报头
在某些情况下,能够知道侦听器在哪个容器中运行非常有用。
从版本 2.8.4 开始,您现在可以在侦听器容器上设置属性,或在 Comments 上设置属性。
然后,容器会将 this 添加到所有传入消息的标头中;然后,它可以用于 Record interceptor、filters 等,或者用于侦听器本身。listenerInfo
info
@KafkaListener
KafkaListener.LISTENER_INFO
@KafkaListener(id = "something", topic = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen2(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
在 or 实现中使用时,标头在使用者记录中作为字节数组,使用 's 属性进行转换。RecordInterceptor
RecordFilterStrategy
KafkaListenerAnnotationBeanPostProcessor
charSet
标头映射器也会在从使用者记录创建时转换为,并且永远不会将此标头映射到出站记录上。String
MessageHeaders
对于 POJO 批处理侦听器,从版本 2.8.6 开始,标头将复制到批处理的每个成员中,并且在转换后也可用作单个参数。String
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批处理侦听器具有过滤器,并且过滤器导致空批次,则需要添加到参数中,因为该信息不适用于空批次。required = false @Header |
如果您收到信息,则位于每个 .List<Message<Thing>>
KafkaHeaders.LISTENER_INFO
Message<?>
有关使用批处理的更多信息,请参阅批处理侦听器。
发布死信记录
当记录达到最大失败数时,您可以使用 record recoverer 配置 和 。
框架提供了 ,它将失败的消息发布到另一个主题。
recoverer 需要一个 ,用于发送记录。
您还可以(可选)使用 对其进行配置,调用该 . 以解析目标主题和分区。DefaultErrorHandler
DefaultAfterRollbackProcessor
DeadLetterPublishingRecoverer
KafkaTemplate<Object, Object>
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
默认情况下,死信记录将发送到名为 (原始主题名称后缀为 ) 的主题,以及与原始记录相同的分区。
因此,当您使用默认解析程序时,死信主题必须至少具有与原始主题一样多的分区。<originalTopic>.DLT .DLT
|
如果返回的 具有负 partition,则 中未设置该分区,因此 Kafka 会选择该分区。
从版本 2.2.4 开始,任何(例如,在方法中检测到异常时抛出)都使用属性进行了增强。
这允许目标解析程序使用此 ,以及 to select the dead letter 主题中的信息。TopicPartition
ProducerRecord
ListenerExecutionFailedException
@KafkaListener
groupId
ConsumerRecord
以下示例显示如何连接自定义目标解析程序:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录通过以下标头进行增强:
-
KafkaHeaders.DLT_EXCEPTION_FQCN
:Exception 类名称(通常为 ,但也可以是其他)。ListenerExecutionFailedException
-
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN
:异常原因类名(如果存在)(自版本 2.8 起)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE
:异常堆栈跟踪。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE
:异常消息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN
:异常类名(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE
:异常堆栈跟踪(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
:异常消息(仅限键反序列化错误)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC
:原始主题。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION
:原始分区。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET
:原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP
:原始时间戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE
:原始时间戳类型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP
:处理记录失败的原始消费者组(自 2.8 版本起)。
键异常仅由 s 引起,因此没有 。DeserializationException
DLT_KEY_EXCEPTION_CAUSE_FQCN
有两种机制可以添加更多标头。
-
子类化 recoverer 并覆盖 - 调用并添加更多 headers。
createProducerRecord()
super.createProducerRecord()
-
提供 a 以接收 consumer 记录和异常,返回一个对象;从那里的 Headers 将被复制到最终的 producer 记录;另请参阅 管理死信记录标头。 用于设置 .
BiFunction
Headers
setHeadersFunction()
BiFunction
第二个版本更易于实现,但第一个版本具有更多信息,包括已组装的标准标头。
从版本 2.3 开始,当与 一起使用时,发布者会将死信 producer 记录中的 record 恢复为无法反序列化的原始值。
以前,为 null,用户代码必须从消息标头中解码 the。
此外,您还可以向发布者提供多个 s;例如,如果要发布 from a 以及使用与成功反序列化的记录不同的序列化程序的值,则可能需要这样做。
以下是使用使用 和 序列化程序的 s 配置发布者的示例:ErrorHandlingDeserializer
value()
value()
DeserializationException
KafkaTemplate
byte[]
DeserializationException
KafkaTemplate
String
byte[]
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
发布者使用映射键来查找适合即将发布的模板。
建议使用 A,以便按顺序检查键。value()
LinkedHashMap
发布值时,当有多个模板时,recoverer 将为类查找模板;如果不存在,则将使用 中的第一个模板。null
Void
values().iterator()
从 2.7 开始,您可以使用该方法,以便在消息发布失败时引发异常。
您还可以使用 设置超时,以验证发件人是否成功。setFailIfSendResultIsError
setWaitForSendResultTimeout
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
从版本 2.5.5 开始,如果 recoverer 失败,则默认情况下将重置 ,并且重新交付将再次通过回退,然后再尝试恢复。
在早期版本中,不会重置 ,并在下次失败时重新尝试恢复。
若要还原到以前的行为,请将错误处理程序的属性设置为 。BackOff BackOff resetStateOnRecoveryFailure false |
从版本 2.6.3 开始,如果异常类型在失败之间发生变化,则设置为 并且重试序列将重新启动(包括选择新的,如果已配置)。
默认情况下,不考虑异常类型。resetStateOnExceptionChange
true
BackOff
从版本 2.3 开始,recoverer 也可以与 Kafka Streams 一起使用 - 有关更多信息,请参阅从反序列化异常中恢复。
这会在 headers 和 (使用 java 序列化) 中添加反序列化异常。
默认情况下,这些标头不会保留在发布到死信主题的邮件中。
从版本 2.7 开始,如果 key 和 value 都失败了反序列化,则两者的原始值都会填充到发送到 DLT 的记录中。ErrorHandlingDeserializer
ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER
如果传入的记录彼此依赖,但可能无序到达,那么将失败的记录重新发布到原始主题的尾部(多次)可能很有用,而不是将其直接发送到死信主题。 有关示例,请参阅此 Stack Overflow 问题。
以下错误处理程序配置将完全执行此操作:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从版本 2.7 开始,recoverer 检查目标解析器选择的分区是否确实存在。
如果分区不存在,则 中的分区将设置为 ,允许 选择分区。
您可以通过将属性设置为 来禁用此检查。ProducerRecord
null
KafkaProducer
verifyPartition
false
管理死信记录标头
-
appendOriginalHeaders
(默认true
) -
stripPreviousExceptionHeaders
(自 2.8 版本起默认)true
Apache Kafka 支持多个同名的标头;要获取 “latest” 值,可以使用 ;要获取多个标头的迭代器,请使用 .headers.lastHeader(headerName)
headers.headers(headerName).iterator()
重复重新发布失败的记录时,这些标头可能会增长(并最终导致发布失败,因为 a );对于异常标头尤其如此,尤其是对于 Stack Trace 标头。RecordTooLargeException
使用这两个属性的原因是,虽然您可能希望只保留最后一个异常信息,但您可能希望保留记录在每次失败时传递的主题的历史记录。
appendOriginalHeaders
应用于所有名为 的标题,而应用于所有名为 的标题。ORIGINAL
stripPreviousExceptionHeaders
EXCEPTION
从版本 2.8.4 开始,您现在可以控制将哪些标准标头添加到输出记录中。
有关默认添加的(当前)10 个标准 Headers 的通用名称,请参见 (这些不是实际的 Headers 名称,只是一个抽象;实际的 Headers 名称由子类可以覆盖的方法设置)。enum HeadersToAdd
getHeaderNames()
要排除标头,请使用 method;例如,要禁止在标头中添加异常堆栈跟踪,请使用:excludeHeaders()
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您还可以通过添加 ;这也会禁用所有标准异常标头。ExceptionHeadersCreator
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同样从版本 2.8.4 开始,您现在可以通过 method.
这允许应用其他函数,即使已经注册了另一个函数,例如,在使用非阻塞重试时。addHeadersFunction
另请参阅使用非阻塞重试的 Failure Header Management。
ExponentialBackOffWithMaxRetries
实现
Spring Framework 提供了许多实现。
默认情况下,将无限期重试;要在重试尝试一定次数后放弃,需要计算 .
从版本 2.7.3 开始, Spring for Apache Kafka 提供了 which 是一个子类,它接收属性并自动计算 ,这更方便一些。BackOff
ExponentialBackOff
maxElapsedTime
ExponentialBackOffWithMaxRetries
maxRetries
maxElapsedTime
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
这将在几秒钟后重试,然后再调用 recoverer。1, 2, 4, 8, 10, 10
4.1.22. JAAS 和 Kerberos
从版本 2.0 开始,添加了一个类来帮助进行 Kerberos 配置。
您可以使用所需的配置将此 bean 添加到应用程序上下文中。
下面的示例配置了这样的 bean:KafkaJaasLoginModuleInitializer
@Bean
public KafkaJaasLoginModuleInitializer jaasConfig() throws IOException {
KafkaJaasLoginModuleInitializer jaasConfig = new KafkaJaasLoginModuleInitializer();
jaasConfig.setControlFlag("REQUIRED");
Map<String, String> options = new HashMap<>();
options.put("useKeyTab", "true");
options.put("storeKey", "true");
options.put("keyTab", "/etc/security/keytabs/kafka_client.keytab");
options.put("principal", "[email protected]");
jaasConfig.setOptions(options);
return jaasConfig;
}
4.2. Apache Kafka Streams 支持
从版本 1.1.4 开始, Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。
要从 Spring 应用程序中使用它,jar 必须存在于 Classpath 中。
它是 Spring for Apache Kafka 项目的可选依赖项,不能传递下载。kafka-streams
4.2.1. 基础
参考 Apache Kafka Streams 文档建议了以下使用 API 的方法:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
所以,我们有两个主要组件:
-
StreamsBuilder
:使用 API 构建(或)实例。KStream
KTable
-
KafkaStreams
:管理这些实例的生命周期。
单个实例向实例公开的所有实例将同时启动和停止,即使它们具有不同的逻辑。
换句话说,由 a 定义的所有流都与单个生命周期控制相关联。
实例一旦被 关闭,就无法重新启动。
相反,必须创建一个新实例来重新启动流处理。KStream KafkaStreams StreamsBuilder StreamsBuilder KafkaStreams streams.close() KafkaStreams |
4.2.2. Spring 管理
为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了 .
这是一个将单例实例公开为 bean 的实现。
下面的示例创建这样的 bean:StreamsBuilderFactoryBean
AbstractFactoryBean
StreamsBuilder
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在作为对象提供,而不是 .KafkaStreamsConfiguration StreamsConfig |
还实现了 管理内部实例的生命周期。
与 Kafka Streams API 类似,您必须在启动 .
这也适用于 Spring API for Kafka Streams。
因此,当您在 上使用 default 时,必须在刷新应用程序上下文之前声明 实例。
例如,可以是常规的 bean 定义,而使用 Kafka Streams API 不会产生任何影响。
以下示例显示了如何执行此操作:StreamsBuilderFactoryBean
SmartLifecycle
KafkaStreams
KStream
KafkaStreams
autoStartup = true
StreamsBuilderFactoryBean
KStream
StreamsBuilder
KStream
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果要手动控制生命周期(例如,通过某些条件停止和启动),则可以使用工厂 bean () 前缀直接引用 bean。
由于使用其内部实例,因此可以安全地停止并重新启动它。
在每个 上创建一个新的 。
如果您想单独控制实例的生命周期,您还可以考虑使用不同的实例。StreamsBuilderFactoryBean
&
StreamsBuilderFactoryBean
KafkaStreams
KafkaStreams
start()
StreamsBuilderFactoryBean
KStream
您还可以在 上指定 、 和 选项,这些选项将委派给内部实例。
此外,除了间接设置这些选项外,从版本 2.1.5 开始,您还可以使用回调接口来配置内部实例。
请注意,会覆盖 提供的选项。
如果需要直接执行某些操作,则可以使用 .
你可以按类型自动装配 bean,但你应该确保在 bean 定义中使用 full 类型,如下例所示:KafkaStreams.StateListener
Thread.UncaughtExceptionHandler
StateRestoreListener
StreamsBuilderFactoryBean
KafkaStreams
StreamsBuilderFactoryBean
KafkaStreamsCustomizer
KafkaStreams
KafkaStreamsCustomizer
StreamsBuilderFactoryBean
KafkaStreams
KafkaStreams
StreamsBuilderFactoryBean.getKafkaStreams()
StreamsBuilderFactoryBean
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,如果使用接口 bean 定义,则可以按名称添加 for injection。
以下示例显示了如何执行此操作:@Qualifier
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从版本 2.4.1 开始,工厂 bean 具有 type 为 ;这允许在创建流之前自定义 (例如,添加 state store) 和/或 。infrastructureCustomizer
KafkaStreamsInfrastructureCustomizer
StreamsBuilder
Topology
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供默认的 no-op 实现,以避免在不需要一种方法时必须同时实现这两种方法。
A 用于需要应用多个定制器的情况。CompositeKafkaStreamsInfrastructureCustomizer
4.2.3. KafkaStreams 千分尺支持
在 2.5.3 版中引入,您可以配置 a 以自动为工厂 bean 管理的对象注册千分尺:KafkaStreamsMicrometerListener
KafkaStreams
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
4.2.4. 流 JSON 序列化和反序列化
为了在以 JSON 格式读取或写入主题或状态存储时对数据进行序列化和反序列化,Spring for Apache Kafka 提供了一种使用 JSON 的实现,委托给 和 序列化、反序列化和消息转换中所述。
该实现通过其构造函数(target type 或 )提供相同的配置选项。
在以下示例中,我们使用 the 来序列化和反序列化 Kafka 流的有效负载(在需要实例的地方可以以类似的方式使用 the):JsonSerde
JsonSerializer
JsonDeserializer
JsonSerde
ObjectMapper
JsonSerde
Cat
JsonSerde
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
当以编程方式构造序列化器/反序列化器以在 producer/consumer 工厂中使用时,从版本 2.3 开始,你可以使用 Fluent API,这简化了配置。
stream.through(new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
4.2.5. 使用KafkaStreamBrancher
该类引入了一种更方便的方法,可以在 .KafkaStreamBrancher
KStream
请考虑以下示例,该示例不使用 :KafkaStreamBrancher
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用 :KafkaStreamBrancher
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
4.2.6. 配置
要配置 Kafka Streams 环境,需要一个实例。
有关所有可能的选项,请参阅 Apache Kafka 文档。StreamsBuilderFactoryBean
KafkaStreamsConfiguration
从版本 2.2 开始,流配置现在作为对象提供,而不是作为 .KafkaStreamsConfiguration StreamsConfig |
为了在大多数情况下避免样板代码,尤其是在开发微服务时, Spring for Apache Kafka 提供了注释,您应该将其放在类上。
您只需声明一个名为 .
名为 的 bean 将在应用程序上下文中自动声明。
您也可以声明和使用任何其他 bean。
您可以通过提供实现 .
如果有多个这样的 bean,它们将根据它们的 property 进行应用。@EnableKafkaStreams
@Configuration
KafkaStreamsConfiguration
defaultKafkaStreamsConfig
StreamsBuilderFactoryBean
defaultKafkaStreamsBuilder
StreamsBuilderFactoryBean
StreamsBuilderFactoryBeanConfigurer
Ordered.order
默认情况下,当工厂 Bean 停止时,将调用该方法。
从版本 2.1.2 开始,工厂 bean 具有额外的构造函数,采用具有属性的对象,以控制是否在期间调用该方法,还是在两者之间调用。
从版本 2.7 开始,默认值是永不清理本地状态。KafkaStreams.cleanUp()
CleanupConfig
cleanUp()
start()
stop()
4.2.7. 标头丰富器
版本 2.3 添加了 .
这可用于在流处理中添加标头;标头值是 SPEL 表达式;表达式求值的根对象有 3 个属性:HeaderEnricher
Transformer
-
context
- 的 ,允许访问当前记录元数据ProcessorContext
-
key
- 当前记录的 Key -
value
- 当前记录的值
表达式必须返回 a 或 a (将转换为 using )。byte[]
String
byte[]
UTF-8
要在流中使用扩充器:
.transform(() -> enricher)
转换器不改变 or ;它只是添加标题。key
value
如果您的流是多线程的,则每条记录都需要一个新实例。 |
.transform(() -> new HeaderEnricher<..., ...>(expressionMap))
下面是一个简单的示例,添加了一个 Literal 标头和一个变量:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("context.timestamp() + ' @' + context.offset()"));
HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.transform(() -> enricher)
.to(OUTPUT);
4.2.8.MessagingTransformer
版本 2.3 添加了这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。
转换器需要 .MessagingTransformer
MessagingFunction
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 使用其 .
它还需要将键、值和元数据(包括 headers)与 Spring Messaging 相互转换。
有关更多信息,请参见 [从 KStream
调用 Spring 集成流]。GatewayProxyFactoryBean
MessagingMessageConverter
Message<?>
4.2.9. 从反序列化异常中恢复
版本 2.3 引入了 which 可以在发生反序列化异常时采取一些操作。
请参阅有关 的 Kafka 文档,其中 是实现。
配置了 implementation.
该框架提供了将失败的记录发送到死信主题。
有关此恢复程序的更多信息,请参阅 Publishing Dead-letter Records。RecoveringDeserializationExceptionHandler
DeserializationExceptionHandler
RecoveringDeserializationExceptionHandler
RecoveringDeserializationExceptionHandler
ConsumerRecordRecoverer
DeadLetterPublishingRecoverer
要配置恢复器,请将以下属性添加到您的 streams 配置中:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,该 bean 可以是你自己的 实现。recoverer()
ConsumerRecordRecoverer
4.2.10. Kafka Streams 示例
以下示例结合了我们在本章中介绍的所有主题:
@Configuration
@EnableKafka
@EnableKafkaStreams
public static class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}
4.3. 测试应用程序
该 jar 包含一些有用的实用程序,可帮助测试您的应用程序。spring-kafka-test
4.3.1. KafkaTestUtils
o.s.kafka.test.utils.KafkaTestUtils
提供了许多 static 帮助程序方法来使用记录、检索各种记录偏移量等。
有关完整详细信息,请参阅其 Javadocs。
4.3.2. JUnit
o.s.kafka.test.utils.KafkaTestUtils
还提供了一些静态方法来设置 producer 和 consumer 属性。
下面的清单显示了这些方法签名:
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) { ... }
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
从版本 2.5 开始,该方法将 设置为 。
这是因为,在大多数情况下,您希望使用者使用测试用例中发送的任何消息。
默认值是 which 意味着测试在使用者启动之前已经发送的消息将不会收到这些记录。
若要还原到以前的行为,请将属性设置为 after call the method。 使用嵌入式代理时,通常最佳做法是为每个测试使用不同的主题,以防止串扰。
如果由于某种原因无法做到这一点,请注意该方法的默认行为是在分配后将分配的分区查找到开头。
由于它无权访问使用者属性,因此您必须使用采用 boolean 参数的重载方法来查找末尾而不是开头。 |
提供了 的 JUnit 4 包装器,用于创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器。
(有关与 JUnit 5 一起使用的信息,请参见@EmbeddedKafka Annotation)。
下面的清单显示了这些方法的签名:@Rule
EmbeddedKafkaBroker
@EmbeddedKafka
/**
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param topics the topics to create (2 partitions per).
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }
/**
*
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param partitions partitions per topic.
* @param topics the topics to create.
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
该类有一个 utility 方法,允许您使用它创建的所有主题。
以下示例演示如何使用它:EmbeddedKafkaBroker
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(
consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
它有一些实用程序方法可以从消费者那里获取结果。
下面的清单显示了这些方法签名:KafkaTestUtils
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }
/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
以下示例演示如何使用:KafkaTestUtils
...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
当嵌入式 Kafka 和嵌入式 Zookeeper 服务器由 启动时,系统属性 named 设置为 Kafka 代理的地址,系统属性 named 设置为 Zookeeper 的地址。
为此属性提供了方便的常量 ( 和 )。EmbeddedKafkaBroker
spring.embedded.kafka.brokers
spring.embedded.zookeeper.connect
EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS
EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT
使用 ,您可以为 Kafka 服务器提供其他属性。
有关可能的代理属性的更多信息,请参阅 Kafka Config。EmbeddedKafkaBroker.brokerProperties(Map<String, String>)
4.3.3. 配置 Topic
以下示例配置创建名为 AND 的主题(具有 5 个分区)、一个名为 (具有 10 个分区的主题)和一个名为 (具有 15 个分区) 的主题:cat
hat
thing1
thing2
public class MyTests {
@ClassRule
private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");
@Test
public void test() {
embeddedKafkaRule.getEmbeddedKafka()
.addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
...
}
}
默认情况下,当出现问题时(例如添加已存在的主题)将引发异常。
版本 2.6 添加了该方法的新版本,该版本返回 ;key 是 topic name,值是 success 或 an for failure。addTopics
Map<String, Exception>
null
Exception
4.3.4. 对多个测试类使用相同的 broker
没有内置支持这样做,但您可以对多个测试类使用相同的代理,类似于以下内容:
public final class EmbeddedKafkaHolder {
private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, false)
.brokerListProperty("spring.kafka.bootstrap-servers");
private static boolean started;
public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
try {
embeddedKafka.afterPropertiesSet();
}
catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
return embeddedKafka;
}
private EmbeddedKafkaHolder() {
super();
}
}
这假定 Spring Boot 环境,并且嵌入式代理替换了 bootstrap servers 属性。
然后,在每个测试类中,您可以使用类似于以下内容的内容:
static {
EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
如果不使用 Spring Boot,则可以使用 获取引导服务器。broker.getBrokersAsString()
前面的示例没有提供在所有测试完成后关闭代理的机制。
例如,如果您在 Gradle 守护进程中运行测试,这可能是一个问题。
在这种情况下,您不应该使用此技术,或者您应该在测试完成时使用一些东西来调用 。destroy() EmbeddedKafkaBroker |
4.3.5. @EmbeddedKafka 注解
我们通常建议您使用该规则,以避免在测试之间启动和停止代理(并为每个测试使用不同的主题)。
从版本 2.0 开始,如果使用 Spring 的测试应用程序上下文缓存,则还可以声明一个 bean,因此可以在多个测试类中使用单个代理。
为方便起见,我们提供了一个测试类级 Comments,用于注册 Bean。
以下示例演示如何使用它:@ClassRule
EmbeddedKafkaBroker
@EmbeddedKafka
EmbeddedKafkaBroker
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
KafkaStreamsTests.STREAMING_TOPIC1,
KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void someTest() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
}
@Configuration
@EnableKafkaStreams
public static class KafkaStreamsConfiguration {
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
return new KafkaStreamsConfiguration(props);
}
}
}
从版本 2.2.4 开始,您还可以使用注释来指定 Kafka ports 属性。@EmbeddedKafka
以下示例设置 support 属性占位符分辨率的 、 和 属性:topics
brokerProperties
brokerPropertiesLocation
@EmbeddedKafka
@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
"listeners=PLAINTEXT://localhost:${kafka.broker.port}",
"auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
brokerPropertiesLocation = "classpath:/broker.properties")
在前面的示例中,属性占位符 、 和 是从 Spring 解析的。
此外,代理属性是从 指定的类路径资源加载的。
为 URL 和资源中找到的任何属性占位符解析属性占位符。
由 中的 override 属性定义的属性。${kafka.topics.another-topic}
${kafka.broker.logs-dir}
${kafka.broker.port}
Environment
broker.properties
brokerPropertiesLocation
brokerPropertiesLocation
brokerProperties
brokerPropertiesLocation
您可以将注释与 JUnit 4 或 JUnit 5 一起使用。@EmbeddedKafka
4.3.6. 使用 JUnit5 进行@EmbeddedKafka注释
从版本 2.3 开始,有两种方法可以在 JUnit5 中使用 Comments。
当与 annotation 一起使用时,嵌入式代理将添加到 test 应用程序上下文中。
您可以在类或方法级别将代理自动连接到测试中,以获取代理地址列表。@EmbeddedKafka
@SpringJunitConfig
当不使用 spring test 上下文时,会创建一个 broker;该条件包括一个参数解析器,因此您可以在测试方法中访问代理...EmbdeddedKafkaCondition
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {
@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}
}
如果带 Comments 的类没有也被 Comments(或元 Comments),则将创建一个独立的(不是 Spring 测试上下文)代理。 并且是 meta 注释的,并且当这些 Comments 中的任何一个也存在时,将使用基于上下文的 broker。@EmbeddedBroker
ExtendedWith(SpringExtension.class)
@SpringJunitConfig
@SpringBootTest
当有可用的 Spring 测试应用程序上下文时,主题和代理属性可以包含属性占位符,只要在某处定义了该属性,这些占位符就会被解析。 如果没有可用的 Spring 上下文,则不会解析这些占位符。 |
4.3.7. 注解中的嵌入式代理@SpringBootTest
Spring Initializr 现在会自动将 test 作用域中的依赖项添加到项目配置中。spring-kafka-test
如果您的应用程序在 Kafka Binder 中使用,并且您希望使用嵌入式代理进行测试,则必须删除依赖项,因为它会将实际 Binder 替换为测试用例的测试 Binder。
如果您希望某些测试使用测试 Binder,而一些测试使用嵌入式代理,则使用真实 Binder 的测试需要通过在测试类中排除 Binder auto 配置来禁用测试 Binder。
以下示例显示了如何执行此操作:
|
在 Spring Boot 应用程序测试中,有几种方法可以使用嵌入式代理。
他们包括:
JUnit4 类规则
以下示例说明如何使用 JUnit4 类规则创建嵌入式代理:
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {
@ClassRule
public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1,
false, "someTopic")
.brokerListProperty("spring.kafka.bootstrap-servers");
}
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
请注意,由于这是一个 Spring Boot 应用程序,因此我们覆盖 broker list 属性以设置 Boot 的属性。
@EmbeddedKafka
注解或 BeanEmbeddedKafkaBroker
以下示例显示了如何使用 Annotation 创建嵌入式代理:@EmbeddedKafka
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
public class MyApplicationTests {
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
4.3.8. Hamcrest 匹配器
提供以下匹配器:o.s.kafka.test.hamcrest.KafkaMatchers
/**
* @param key the key
* @param <K> the type.
* @return a Matcher that matches the key in a consumer record.
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Matcher that matches the value in a consumer record.
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }
/**
* @param partition the partition.
* @return a Matcher that matches the partition in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/**
* Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
*
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
return hasTimestamp(TimestampType.CREATE_TIME, ts);
}
/**
* Matcher testing the timestamp of a {@link ConsumerRecord}
* @param type timestamp type of the record
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
return new ConsumerRecordTimestampMatcher(type, ts);
}
4.3.9. AssertJ 条件
您可以使用以下 AssertJ 条件:
/**
* @param key the key
* @param <K> the type.
* @return a Condition that matches the key in a consumer record.
*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Condition that matches the value in a consumer record.
*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }
/**
* @param key the key.
* @param value the value.
* @param <K> the key type.
* @param <V> the value type.
* @return a Condition that matches the key in a consumer record.
* @since 2.2.12
*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }
/**
* @param partition the partition.
* @return a Condition that matches the partition in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
/**
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}
/**
* @param type the type of timestamp
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
return new ConsumerRecordTimestampCondition(type, value);
}
4.3.10. 示例
以下示例汇集了本章中介绍的大部分主题:
public class KafkaTemplateTests {
private static final String TEMPLATE_TOPIC = "templateTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka.getEmbeddedKafka());
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<Integer, String>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}
}
前面的示例使用 Hamcrest 匹配程序。
使用 ,最后一部分类似于以下代码:AssertJ
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
4.4. 非阻塞重试
这是一项实验性功能,在删除实验性名称之前,通常的不中断性 API 更改规则不适用于此功能。 鼓励用户试用该功能,并通过 GitHub 问题或 GitHub 讨论提供反馈。 这仅与 API 有关;该功能被认为是完整且健壮的。 |
使用 Kafka 实现非阻塞重试/DLT 功能通常需要设置额外的主题并创建和配置相应的侦听器。
从 2.7 开始,Spring for Apache Kafka 通过注解和类提供了对此的支持,以简化引导。@RetryableTopic
RetryTopicConfiguration
4.4.1. 模式的工作原理
如果消息处理失败,则消息将转发到具有 back off 时间戳的重试主题。 然后,重试主题使用者会检查时间戳,如果时间戳未过期,则暂停该主题分区的消耗。 到期时,将恢复分区消耗,并再次使用消息。 如果消息处理再次失败,则消息将被转发到下一个重试主题,并重复该模式,直到处理成功或尝试用尽,并将消息发送到死信主题(如果已配置)。
举个例子,如果你有一个 “main-topic” 主题,并且想要设置非阻塞重试,指数退避为 1000 毫秒,最大尝试次数为 2 和 4,它将创建 main-topic-retry-1000、main-topic-retry-2000、main-topic-retry-4000 和 main-topic-dlt 主题并配置相应的使用者。 该框架还负责创建主题以及设置和配置侦听器。
使用此策略,您将失去 Kafka 对该主题的排序保证。 |
您可以设置自己喜欢的模式,但建议使用。AckMode RECORD |
目前,此功能不支持类级注释@KafkaListener |
4.4.2. Back Off Delay 精度
概述和保证
所有消息处理和回退都由使用者线程处理,因此,会尽最大努力保证延迟精度。 如果一条消息的处理时间比该使用者的下一条消息的回退周期长,则下一条消息的延迟将高于预期。 此外,对于短暂的延迟(大约 1 秒或更短),线程必须完成的维护工作(例如提交偏移量)可能会延迟消息处理的执行。 如果重试主题的使用者正在处理多个分区,则精度也会受到影响,因为我们依赖于从轮询中唤醒使用者并拥有完整的 pollTimeouts 来进行计时调整。
话虽如此,对于处理单个分区的使用者,在大多数情况下,消息的处理应该在确切的到期时间后 100 毫秒内进行。
可以保证消息在到期时间之前永远不会被处理。 |
调整 Delay Precision
消息的处理延迟精度取决于两个 : 和 。
这两个属性都将在重试主题和 dlt 中自动设置为该主题最小延迟值的四分之一,最小值为 250 毫秒,最大值为 5000 毫秒。
仅当属性具有默认值时,才会设置这些值 - 如果您自己更改了任一值,则不会覆盖您的更改。
这样,您可以根据需要调整重试主题的精度和性能。ContainerProperties
ContainerProperties.pollTimeout
ContainerProperties.idlePartitionEventInterval
ListenerContainerFactory
您可以为主主题和重试主题设置单独的实例 - 这样,您可以设置不同的设置以更好地满足您的需求,例如,为主主题设置较高的轮询超时设置,为重试主题设置较低的轮询超时设置。ListenerContainerFactory |
4.4.3. 配置
使用注释@RetryableTopic
要为带注释的方法配置重试主题和 dlt,您只需向其添加注释,Spring for Apache Kafka 将使用默认配置引导所有必要的主题和使用者。@KafkaListener
@RetryableTopic
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
您可以在同一类中指定一个方法来处理 dlt 消息,方法是使用注释对其进行注释。
如果未提供 DltHandler 方法,则会创建一个默认使用者,该使用者仅记录消耗。@DltHandler
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果未指定kafkaTemplate名称,则将查找具有name的 bean。
如果未找到 bean,则会引发异常。retryTopicDefaultKafkaTemplate |
使用 beanRetryTopicConfiguration
您还可以通过在带注释的类中创建 bean 来配置非阻塞重试支持。RetryTopicConfiguration
@Configuration
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为使用默认配置注释有 '@KafkaListener' 的方法中的所有主题创建重试主题和 dlt 以及相应的使用者。该实例是消息转发所必需的。KafkaTemplate
为了对如何处理每个主题的非阻塞重试进行更精细的控制,可以提供多个 bean。RetryTopicConfiguration
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(5)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
重试主题和 dlt 的使用者将被分配到一个使用者组,该使用者组的组 ID 是您在注释参数中提供的 ID 与主题后缀的组合。如果您不提供任何 URL,它们都将属于同一个组,并且对重试主题进行再平衡将导致对主主题进行不必要的再平衡。groupId @KafkaListener |
如果使用者配置了 ErrorHandlingDeserializer ,为了处理去序列化异常,重要的是使用 serializer 配置 the 及其 producer,该序列化器可以处理正常对象以及由反序列化异常导致的原始值。
模板的 generic value type 应为 。
一种方法是使用 ;示例如下:KafkaTemplate byte[] Object DelegatingByTypeSerializer |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
多个 annotation 可以用于同一主题,无论是否手动分配分区以及非阻塞重试,但只有一个配置将用于给定主题。
最好使用单个 bean 来配置此类主题;如果多个 Comments 用于同一个主题,则所有 Comments 都应该具有相同的值,否则其中一个 Comments 将应用于该主题的所有侦听器,而其他 Annotation 的值将被忽略。@KafkaListener RetryTopicConfiguration @RetryableTopic |
4.4.4. 功能
大多数功能都可用于 Comments 和 Bean。@RetryableTopic
RetryTopicConfiguration
BackOff 配置
BackOff 配置依赖于项目中的接口。BackOffPolicy
Spring Retry
它包括:
-
固定退后
-
指数回退
-
Random Exponential Back Off(随机指数回退)
-
统一随机回退
-
无退缩
-
自定义回退
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(4)
.build();
}
你还可以提供 Spring Retry 接口的自定义实现:SleepingBackOffPolicy
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackOff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.build();
}
默认回退策略最多尝试 3 次,间隔为 1000 毫秒。FixedBackOffPolicy |
默认最大延迟为 30 秒。
如果您的回退策略需要值大于该值的延迟,请相应地调整 maxDelay 属性。ExponentialBackOffPolicy |
第一次尝试对 进行计数,因此,如果您提供的值为 4,则原始尝试将加上 3 次重试。maxAttempts maxAttempts |
单主题固定延迟重试
如果您使用的是固定延迟策略,例如 or,则可以使用单个主题来完成非阻塞重试。
本主题将以 provided 或 default 后缀为后缀,并且不会附加 index 或 delay 值。FixedBackOffPolicy
NoBackOffPolicy
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.build();
}
默认行为是为每次尝试创建单独的重试主题,并附加其索引值:retry-0、retry-1、... |
全局超时
您可以为重试过程设置全局超时。 如果达到该时间,则下次使用者引发异常时,消息将直接发送到 DLT,或者如果没有可用的 DLT,则直接结束处理。
@RetryableTopic(backoff = @Backoff(2000), timeout = 5000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(2000)
.timeoutAfter(5000)
.build();
}
默认值是没有设置超时,这也可以通过提供 -1 作为 timeout 值来实现。 |
异常分类器
您可以指定要重试的异常和不重试的异常。 您还可以将其设置为遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // Will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
默认行为是对所有异常重试,而不是遍历原因。 |
从 2.8.3 开始,有一个致命异常的全局列表,这将导致记录被发送到 DLT 而无需任何重试。 有关致命异常的默认列表,请参见DefaultErrorHandler。 您可以通过以下方式在此列表中添加或删除例外:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicationContext,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
ddtr.addNotRetryableExceptions(MyFatalException.class);
ddtr.removeNotRetryableException(ConversionException.class);
return ddtr;
}
要禁用致命异常的分类,请使用 中的方法清除默认列表。setClassifications DefaultDestinationTopicResolver |
包含和排除主题
您可以通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics) .excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法来决定 Bean 将处理和不处理哪些主题。RetryTopicConfiguration
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
默认行为是包含所有主题。 |
主题自动创建
除非另有指定,否则框架将使用 Bean 使用的 bean 自动创建所需的主题。
您可以指定分区数和用于创建主题的复制因子,并且可以关闭此功能。NewTopic
KafkaAdmin
请注意,如果您不使用 Spring Boot,则必须提供 KafkaAdmin bean 才能使用此功能。 |
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
默认情况下,主题是使用一个分区和复制因子 1 自动创建的。 |
失败标头管理
在考虑如何管理失败标头(原始标头和异常标头)时,框架会委托 来决定是附加还是替换标头。DeadLetterPublishingRecover
默认情况下,它显式设置为 .appendOriginalHeaders
false
stripPreviousExceptionHeaders
DeadLetterPublishingRecover
这意味着只有第一个 “original” 和最后一个 exception headers 会保留 default configuration。 这是为了避免在涉及许多重试步骤时创建过大的消息(例如,由于堆栈跟踪标头)。
有关更多信息,请参阅 管理死信记录标头 。
要重新配置框架以对这些属性使用不同的设置,请通过添加以下选项来替换标准 Bean:DeadLetterPublishingRecovererFactory
recovererCustomizer
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver);
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
dlpr.appendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
return factory;
}
从版本 2.8.4 开始,如果您希望添加自定义 headers(除了工厂添加的重试信息 headers 之外,您还可以向工厂添加一个 -headersFunction
factory.setHeadersFunction((rec, ex) → { … })
4.4.5. 组合阻塞和非阻塞重试
从 2.8.4 开始,您可以将框架配置为同时使用阻塞和非阻塞重试。
例如,您可以有一组异常,这些异常也可能在下一条记录上触发错误,例如 ,因此您可以在将同一记录发送到重试主题或直接发送到 DLT 之前重试几次。DatabaseAccessException
要配置阻塞重试,您只需通过 bean 中的方法添加要重试的异常,如下所示。
默认策略为 ,有 9 次重试,并且它们之间没有延迟。
或者,您可以提供自己的 back off policy (退避策略)。addRetryableExceptions
ListenerContainerFactoryConfigurer
FixedBackOff
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
如果需要进一步优化异常分类,可以通过该方法设置自己的分类,例如:Map
ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()
lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
结合全局可重试主题的致命异常分类,您可以为所需的任何行为配置框架,例如让某些异常同时触发阻塞和非阻塞重试,仅触发一种或另一种,或者直接进入 DLT 而不进行任何类型的重试。 |
下面是两个配置协同工作的示例:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
return lcfc;
}
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
return ddtr;
}
在此示例中:
-
ShouldRetryOnlyBlockingException.class
将仅通过阻止重试,如果所有重试都失败,将直接进入 DLT。 -
ShouldRetryViaBothException.class
将通过阻止重试,如果所有阻止重试都失败,则会转发到下一个重试主题,以进行另一组尝试。 -
ShouldSkipBothRetriesException.class
永远不会以任何方式重试,如果第一次处理尝试失败,则会直接转到 DLT。
请注意,阻止重试行为是允许列表 - 您添加确实要以这种方式重试的例外;而非阻塞重试分类是针对 FATAL 异常的,因此是 denylist - 您添加的异常不想执行非阻塞重试,而是直接发送到 DLT。 |
非阻塞异常分类行为还取决于特定主题的配置。 |
4.4.6. 主题命名
重试主题和 DLT 的命名方式是在主主题后加上提供的或默认值,并附加该主题的延迟或索引。
例子:
“my-topic” → “my-topic-retry-0”, “my-topic-retry-1”, ..., “my-topic-dlt”
“my-other-topic” → “my-topic-myRetrySuffix-1000”, “my-topic-myRetrySuffix-2000”, ..., “my-topic-myDltSuffix”。
重试主题和 Dlt 后缀
您可以指定 retry 和 dlt 主题将使用的后缀。
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
默认后缀为 “-retry” 和 “-dlt”,分别用于重试主题和 dlt。 |
附加主题的索引或延迟
您可以在后缀后附加主题的索引或 delay 值。
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
默认行为是使用延迟值作为后缀,但具有多个主题的固定延迟配置除外,在这种情况下,主题以主题的索引为后缀。 |
自定义命名策略
更复杂的命名策略可以通过注册实现 .默认实现是,可以通过以下方式注册不同的实现:RetryTopicNamesProviderFactory
SuffixingRetryTopicNamesProviderFactory
@Bean
public RetryTopicNamesProviderFactory myRetryNamingProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
例如,除了标准后缀之外,以下实现还为 retry/dl 主题名称添加了前缀:
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if(properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}
4.4.7. 分布式账本策略
该框架提供了一些使用 DLT 的策略。您可以提供 DLT 处理方法、使用默认日志记录方法或根本不使用 DLT。此外,您还可以选择在 DLT 处理失败时会发生什么情况。
DLT 加工方法
您可以指定用于处理主题的 Dlt 的方法,以及处理失败时的行为。
为此,您可以在带有 annotation 的类的方法中使用 annotation。
请注意,相同的方法将用于该类中的所有带 Comments 的方法。@DltHandler
@RetryableTopic
@RetryableTopic
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
DLT 处理程序方法也可以通过 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) 方法提供,将应处理 DLT 消息的 Bean 名称和方法名称作为参数传递。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltProcessor("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
如果未提供 DLT 处理程序,则使用默认的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod。 |
从版本 2.8 开始,如果你根本不想在这个应用程序中使用 DLT,包括通过默认处理程序(或者你希望推迟消耗),你可以控制 DLT 容器是否启动,独立于容器工厂的属性。autoStartup
使用批注时,将属性设置为 ;使用 Configuration Builder 时,请使用 。@RetryableTopic
autoStartDltHandler
false
.autoStartDltHandler(false)
您可以稍后通过 .KafkaListenerEndpointRegistry
DLT 失败行为
如果 DLT 处理失败,则有两种可能的行为可用:和 .ALWAYS_RETRY_ON_ERROR
FAIL_ON_ERROR
在前者中,记录被转发回 DLT 主题,因此它不会阻止其他 DLT 记录的处理。 在后者中,使用者结束执行而不转发消息。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
默认行为是 。ALWAYS_RETRY_ON_ERROR |
从版本 2.8.3 开始,如果记录导致引发致命异常,则不会将记录路由回 DLT。
例如 a 因为,通常,此类异常将始终被抛出。ALWAYS_RETRY_ON_ERROR DeserializationException |
被视为致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
您可以使用 Bean 上的方法向此列表添加异常和从中删除异常。DestinationTopicResolver
有关更多信息,请参阅 Exception Classifier。
配置无 DLT
该框架还提供了不为主题配置 DLT 的可能性。 在这种情况下,在用尽重试后,处理将结束。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotConfigureDlt()
.create(template);
}
4.4.8. 指定 ListenerContainerFactory
默认情况下,RetryTopic 配置将使用 Comments 中提供的工厂,但您可以指定一个不同的工厂来创建重试主题和 dlt 侦听器容器。@KafkaListener
对于 Comments,您可以提供工厂的 Bean 名称,使用 Bean 您可以提供 Bean 名称或实例本身。@RetryableTopic
RetryTopicConfiguration
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template,
ConcurrentKafkaListenerContainerFactory<Integer, MyPojo> factory) {
return RetryTopicConfigurationBuilder
.newInstance()
.listenerFactory(factory)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.listenerFactory("my-retry-topic-factory")
.create(template);
}
从 2.8.3 开始,您可以对可重试和不可重试的主题使用相同的工厂。 |
如果需要将出厂配置行为恢复到 2.8.3 之前的版本,则可以替换标准 bean 并设置为 ,例如:RetryTopicConfigurer
useLegacyFactoryConfigurer
true
@Bean(name = RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryResolver containerFactoryResolver,
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
BeanFactory beanFactory,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
RetryTopicConfigurer retryTopicConfigurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, retryTopicNamesProviderFactory);
retryTopicConfigurer.useLegacyFactoryConfigurer(true);
return retryTopicConfigurer;
}
4.4.9. 更改 KafkaBackOffException 日志记录级别
当重试主题中的消息不应使用时,将引发 a。默认情况下,此类异常记录在 level 上,但您可以通过在 in a 类中设置错误处理程序定制器来更改此行为。KafkaBackOffException
DEBUG
ListenerContainerFactoryConfigurer
@Configuration
例如,要将日志记录级别更改为 WARN,您可以添加:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer listenerContainer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer configurer = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
configurer.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN));
return configurer;
}