对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
应用程序事件
以下 Spring 应用程序事件由侦听器容器及其使用者发布:
-
ConsumerStartingEvent
:在使用线程首次启动时发布,在它开始轮询之前。 -
ConsumerStartedEvent
:在使用者即将开始轮询时发布。 -
ConsumerFailedToStartEvent
:如果在容器属性中发布 NO,则发布。 此事件可能表示配置的任务执行程序没有足够的线程来支持使用它的容器及其并发性。 出现此情况时,还会记录错误消息。ConsumerStartingEvent
consumerStartTimeout
-
ListenerContainerIdleEvent
:在未收到任何消息时发布(如果已配置)。idleEventInterval
-
ListenerContainerNoLongerIdleEvent
:在之前发布 .ListenerContainerIdleEvent
-
ListenerContainerPartitionIdleEvent
:未从中的该分区收到任何消息时发布(如果已配置)。idlePartitionEventInterval
-
ListenerContainerPartitionNoLongerIdleEvent
:当从之前发布 .ListenerContainerPartitionIdleEvent
-
NonResponsiveConsumerEvent
:当使用者似乎在方法中被阻止时发布。poll
-
ConsumerPartitionPausedEvent
:当分区暂停时,由每个使用者发布。 -
ConsumerPartitionResumedEvent
:每个 Consumer 在恢复分区时发布的 -
ConsumerPausedEvent
:在容器暂停时由每个使用者发布。 -
ConsumerResumedEvent
:容器恢复时由每个使用者发布。 -
ConsumerStoppingEvent
:由每个使用者在停止前发布。 -
ConsumerStoppedEvent
:在 Consumer 关闭后发布。 请参见线程安全。 -
ConsumerRetryAuthEvent
:当消费者的身份验证或授权失败并正在重试时发布。 -
ConsumerRetryAuthSuccessfulEvent
:成功重试身份验证或授权时发布。只有在之前有 时才会发生。ConsumerRetryAuthEvent
-
ContainerStoppedEvent
:当所有使用者都已停止时发布。
默认情况下,应用程序上下文的事件 multicaster 在调用线程上调用事件侦听器。
如果将 multicaster 更改为使用异步执行程序,则当事件包含对使用者的引用时,不得调用任何方法。Consumer |
具有以下属性:ListenerContainerIdleEvent
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
idleTime
:发布事件时容器处于空闲状态的时间。 -
topicPartitions
:在生成事件时为容器分配的主题和分区。 -
consumer
:对 Kafka 对象的引用。 例如,如果之前调用了使用者的方法,则在收到事件时可以调用该方法。Consumer
pause()
resume()
-
paused
:容器当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
具有相同的属性,但 和 除外。ListenerContainerNoLongerIdleEvent
idleTime
paused
具有以下属性:ListenerContainerPartitionIdleEvent
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
idleTime
:发布事件时分区消耗处于空闲状态的时间。 -
topicPartition
:触发事件的主题和分区。 -
consumer
:对 Kafka 对象的引用。 例如,如果之前调用了使用者的方法,则在收到事件时可以调用该方法。Consumer
pause()
resume()
-
paused
:该使用者的分区使用当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
具有相同的属性,但 和 除外。ListenerContainerPartitionNoLongerIdleEvent
idleTime
paused
具有以下属性:NonResponsiveConsumerEvent
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id
:侦听器 ID(或容器 Bean 名称)。 -
timeSinceLastPoll
:容器上次调用 之前的时间。poll()
-
topicPartitions
:在生成事件时为容器分配的主题和分区。 -
consumer
:对 Kafka 对象的引用。 例如,如果之前调用了使用者的方法,则在收到事件时可以调用该方法。Consumer
pause()
resume()
-
paused
:容器当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
、 和 事件 具有以下属性:ConsumerPausedEvent
ConsumerResumedEvent
ConsumerStopping
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partitions
:涉及的实例。TopicPartition
事件的 , 具有以下属性:ConsumerPartitionPausedEvent
ConsumerPartitionResumedEvent
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partition
:涉及的实例。TopicPartition
该事件具有以下属性:ConsumerRetryAuthEvent
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
reason
:-
AUTHENTICATION
- 由于身份验证异常,事件已发布。 -
AUTHORIZATION
- 事件因授权异常而发布。
-
、 和 事件 具有以下属性:ConsumerStartingEvent
ConsumerStartingEvent
ConsumerFailedToStartEvent
ConsumerStoppedEvent
ConsumerRetryAuthSuccessfulEvent
ContainerStoppedEvent
-
source
:发布事件的侦听器容器实例。 -
container
:侦听器容器或父侦听器容器(如果源容器是子容器)。
所有容器(无论是子容器还是父容器)都会发布 。
对于父容器,source 和 container 属性相同。ContainerStoppedEvent
此外,它还具有以下附加属性:ConsumerStoppedEvent
-
reason
:-
NORMAL
- 消费者正常停止 (容器已停止)。 -
ERROR
- A 被抛出。java.lang.Error
-
FENCED
- 事务生成者已屏蔽,容器属性为 .stopContainerWhenFenced
true
-
AUTH
- 引发了 OR 且 未配置。AuthenticationException
AuthorizationException
authExceptionRetryInterval
-
NO_OFFSET
- 分区没有偏移量,策略为 。auto.offset.reset
none
-
在出现此类情况后,您可以使用此事件重新启动容器:
if (event.getReason.equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的使用者
虽然效率很高,但异步使用者的一个问题是检测它们何时处于空闲状态。 如果在一段时间内没有消息到达,您可能需要采取一些措施。
您可以将侦听器容器配置为在一段时间后没有消息传递时发布。
当容器处于空闲状态时,每毫秒发布一次事件。ListenerContainerIdleEvent
idleEventInterval
要配置此功能,请在容器上设置
以下示例显示了如何执行此操作:idleEventInterval
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下示例演示如何为 :idleEventInterval
@KafkaListener
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在上述每种情况下,当容器处于空闲状态时,每分钟发布一次事件。
如果由于某种原因,consumer 方法没有退出,则不会收到任何消息,并且无法生成空闲事件(当无法访问代理时,这是早期版本的问题)。
在这种情况下,如果 poll 未在属性中返回,则容器会发布 。
默认情况下,每个容器中每 30 秒执行一次此检查。
您可以通过在配置侦听器容器时在 中设置 (默认 30 秒) 和 (默认 3.0) 属性来修改此行为。
这应该大于以避免由于争用条件而获得虚假事件。
接收此类事件可让您停止容器,从而唤醒使用者,使其可以停止。poll()
kafka-clients
NonResponsiveConsumerEvent
3x
pollTimeout
monitorInterval
noPollThreshold
ContainerProperties
noPollThreshold
1.0
从版本 2.6.2 开始,如果容器发布了 ,它将在随后收到记录时发布 。ListenerContainerIdleEvent
ListenerContainerNoLongerIdleEvent
事件消耗
您可以通过实现来捕获这些事件 — 通用侦听器或缩小范围以仅接收此特定事件的侦听器。
您还可以使用 Spring Framework 4.2 中引入的 。ApplicationListener
@EventListener
下一个示例将 和 合并为一个类。
您应该了解应用程序侦听器会获取所有容器的事件,因此,如果您想根据哪个容器处于空闲状态来执行特定操作,则可能需要检查侦听器 ID。
您也可以将 用于此目的。@KafkaListener
@EventListener
@EventListener
condition
有关事件属性的信息,请参阅应用程序事件。
该事件通常在使用者线程上发布,因此与对象交互是安全的。Consumer
以下示例同时使用 和 :@KafkaListener
@EventListener
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件侦听器查看所有容器的事件。
因此,在前面的示例中,我们根据侦听器 ID 缩小接收的事件范围。
由于为 创建的容器支持并发,因此实际容器的命名方式为 ,其中 是支持并发的每个实例的唯一值。
这就是我们在 CONDITION 中使用的原因。@KafkaListener id-n n startsWith |
如果您希望使用 idle 事件来停止 lister 容器,则不应在调用侦听器的线程上调用。
这样做会导致延迟和不必要的日志消息。
相反,您应该将事件移交给其他线程,然后该线程可以停止容器。
此外,如果容器实例是子容器,则不应使用容器实例。
您应该改为停止并发容器。container.stop() stop() |
空闲时的当前位置
注意,当检测到 idle 时,你可以通过在 Listener 中实现来获取当前位置。
见 in seek.ConsumerSeekAware
onIdleContainer()