以下 Spring 应用程序事件由侦听器容器及其使用者发布:Spring中文文档

  • ConsumerStartingEvent:在消费者线程首次启动时发布,在开始轮询之前发布。Spring中文文档

  • ConsumerStartedEvent:在使用者即将开始轮询时发布。Spring中文文档

  • ConsumerFailedToStartEvent:如果容器属性中未发布,则已发布。 此事件可能表示配置的任务执行程序没有足够的线程来支持使用它的容器及其并发性。 发生这种情况时,还会记录一条错误消息。ConsumerStartingEventconsumerStartTimeoutSpring中文文档

  • ListenerContainerIdleEvent:在未收到任何消息时发布(如果已配置)。idleIntervalSpring中文文档

  • ListenerContainerNoLongerIdleEvent:在先前发布 .ListenerContainerIdleEventSpring中文文档

  • ListenerContainerPartitionIdleEvent:在未收到来自该分区的消息时发布(如果已配置)。idlePartitionEventIntervalSpring中文文档

  • ListenerContainerPartitionNoLongerIdleEvent:当从以前发布过 .ListenerContainerPartitionIdleEventSpring中文文档

  • NonResponsiveConsumerEvent:当使用者在方法中似乎被阻止时发布。pollSpring中文文档

  • ConsumerPartitionPausedEvent:当分区暂停时,由每个使用者发布。Spring中文文档

  • ConsumerPartitionResumedEvent:由每个使用者在恢复分区时发布。Spring中文文档

  • ConsumerPausedEvent:容器暂停时由每个使用者发布。Spring中文文档

  • ConsumerResumedEvent:由每个使用者在容器恢复时发布。Spring中文文档

  • ConsumerStoppingEvent:由每个消费者在停止前发布。Spring中文文档

  • ConsumerStoppedEvent:在使用者关闭后发布。 请参阅线程安全Spring中文文档

  • ConsumerRetryAuthEvent:当使用者的身份验证或授权失败且正在重试时发布。Spring中文文档

  • ConsumerRetryAuthSuccessfulEvent:在成功重试身份验证或授权时发布。只有在之前有过的情况下才会发生。ConsumerRetryAuthEventSpring中文文档

  • ContainerStoppedEvent:在所有使用者都停止时发布。Spring中文文档

默认情况下,应用程序上下文的事件多播程序在调用线程上调用事件侦听器。 如果将多播程序更改为使用异步执行程序,则当事件包含对使用者的引用时,不得调用任何方法。Consumer

具有以下属性:ListenerContainerIdleEventSpring中文文档

具有相同的属性,但 和 除外。ListenerContainerNoLongerIdleEventidleTimepausedSpring中文文档

具有以下属性:ListenerContainerPartitionIdleEventSpring中文文档

具有相同的属性,但 和 除外。ListenerContainerPartitionNoLongerIdleEventidleTimepausedSpring中文文档

具有以下属性:NonResponsiveConsumerEventSpring中文文档

、 和 事件具有以下属性:ConsumerPausedEventConsumerResumedEventConsumerStoppingSpring中文文档

事件具有以下属性:ConsumerPartitionPausedEventConsumerPartitionResumedEventSpring中文文档

该事件具有以下属性:ConsumerRetryAuthEventSpring中文文档

、 、 、 和 事件具有以下属性:ConsumerStartingEventConsumerStartedEventConsumerFailedToStartEventConsumerStoppedEventConsumerRetryAuthSuccessfulEventContainerStoppedEventSpring中文文档

所有容器(无论是子容器还是父容器)都发布 . 对于父容器,源和容器属性是相同的。ContainerStoppedEventSpring中文文档

此外,还具有以下附加属性:ConsumerStoppedEventSpring中文文档

在遇到此类情况后,可以使用此事件重新启动容器:Spring中文文档

if (event.getReason.equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}
默认情况下,应用程序上下文的事件多播程序在调用线程上调用事件侦听器。 如果将多播程序更改为使用异步执行程序,则当事件包含对使用者的引用时,不得调用任何方法。Consumer

检测空闲和无响应的使用者

虽然效率很高,但异步使用者的一个问题是检测它们何时处于空闲状态。 如果一段时间内没有消息到达,您可能需要执行一些操作。Spring中文文档

您可以将侦听器容器配置为在一段时间后没有消息传递时发布。 当容器处于空闲状态时,每毫秒发布一个事件。ListenerContainerIdleEventidleEventIntervalSpring中文文档

若要配置此功能,请在容器上设置。 以下示例演示如何执行此操作:idleEventIntervalSpring中文文档

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

以下示例演示如何设置 for :idleEventInterval@KafkaListenerSpring中文文档

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

在每种情况下,当容器处于空闲状态时,每分钟发布一次事件。Spring中文文档

如果由于某种原因,使用者方法未退出,则不会收到任何消息,并且无法生成空闲事件(这是无法访问代理时早期版本的问题)。 在这种情况下,如果轮询未在属性中返回,则容器会发布。 默认情况下,此检查在每个容器中每 30 秒执行一次。 您可以通过在配置侦听器容器时设置(默认为 30 秒)和(默认为 3.0)属性来修改此行为。 应大于以避免由于争用条件而产生虚假事件。 接收此类事件后,您可以停止容器,从而唤醒使用者,使其可以停止。poll()kafka-clientsNonResponsiveConsumerEvent3xpollTimeoutmonitorIntervalnoPollThresholdContainerPropertiesnoPollThreshold1.0Spring中文文档

从版本 2.6.2 开始,如果容器已发布 ,它将在随后收到记录时发布 。ListenerContainerIdleEventListenerContainerNoLongerIdleEventSpring中文文档

事件消耗

您可以通过实现来捕获这些事件 - 一个普通的侦听器或一个缩小到仅接收此特定事件的侦听器。 您还可以使用 Spring Framework 4.2 中引入的 。ApplicationListener@EventListenerSpring中文文档

下一个示例合并到一个类中。 您应该了解应用程序侦听器会获取所有容器的事件,因此,如果要根据哪个容器处于空闲状态执行特定操作,则可能需要检查侦听器 ID。 您也可以将 's 用于此目的。@KafkaListener@EventListener@EventListenerconditionSpring中文文档

有关事件属性的信息,请参阅应用程序事件Spring中文文档

该事件通常发布在使用者线程上,因此可以安全地与对象交互。ConsumerSpring中文文档

以下示例同时使用 和 :@KafkaListener@EventListenerSpring中文文档

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
事件侦听器可以查看所有容器的事件。 因此,在前面的示例中,我们根据侦听器 ID 缩小接收到的事件范围。 由于为支持并发而创建的容器,因此实际容器的名称为 其中 是支持并发的每个实例的唯一值。 这就是我们在条件中使用的原因。@KafkaListenerid-nnstartsWith
如果希望使用 idle 事件停止 lister 容器,则不应调用调用侦听器的线程。 这样做会导致延迟和不必要的日志消息。 相反,应将事件移交给其他线程,然后该线程可以停止容器。 此外,如果容器实例是子容器,则不应使用容器实例。 应改为停止并发容器。container.stop()stop()

空闲时的当前位置

请注意,当检测到空闲时,您可以通过在侦听器中实现来获取当前位置。 在搜索中查看。ConsumerSeekAwareonIdleContainer()Spring中文文档

事件侦听器可以查看所有容器的事件。 因此,在前面的示例中,我们根据侦听器 ID 缩小接收到的事件范围。 由于为支持并发而创建的容器,因此实际容器的名称为 其中 是支持并发的每个实例的唯一值。 这就是我们在条件中使用的原因。@KafkaListenerid-nnstartsWith
如果希望使用 idle 事件停止 lister 容器,则不应调用调用侦听器的线程。 这样做会导致延迟和不必要的日志消息。 相反,应将事件移交给其他线程,然后该线程可以停止容器。 此外,如果容器实例是子容器,则不应使用容器实例。 应改为停止并发容器。container.stop()stop()