对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cn

暂停和恢复侦听器容器

版本 2.1.3 向侦听器容器添加了 and 方法。 以前,您可以在 a 中暂停使用者,然后通过侦听 a 来恢复它,它提供对对象的访问。 虽然您可以使用事件侦听器在空闲容器中暂停使用者,但在某些情况下,这不是线程安全的,因为无法保证在使用者线程上调用事件侦听器。 要安全地暂停和恢复使用者,您应该在侦听器容器上使用 and 方法。 A 在下一个 ;a 在 current 返回后立即生效。 当容器暂停时,它会继续向使用者发送,从而避免在使用组管理时进行再平衡,但它不会检索任何记录。 有关更多信息,请参阅 Kafka 文档。pause()resume()ConsumerAwareMessageListenerListenerContainerIdleEventConsumerpauseresumepause()poll()resume()poll()poll()spring-doc.cn

从版本 2.1.5 开始,您可以调用 以查看是否已调用。 但是,使用者可能实际上尚未暂停。 如果所有实例实际上都已暂停,则返回 true。isPauseRequested()pause()isConsumerPaused()Consumerspring-doc.cn

此外(也是从 2.1.5 开始),实例以容器作为属性和属性中涉及的实例发布。ConsumerPausedEventConsumerResumedEventsourceTopicPartitionpartitionsspring-doc.cn

从版本 2.9 开始,新的 container property 设置为true时,会导致暂停在处理当前记录后生效。 默认情况下,暂停在处理了上一次轮询中的所有记录后生效。 请参阅 [pauseImmediate]。pauseImmediatespring-doc.cn

以下简单的 Spring Boot 应用程序通过使用容器注册表来获取对方法容器的引用并暂停或恢复其使用者以及接收相应的事件:@KafkaListenerspring-doc.cn

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

下面的清单显示了前面示例的结果:spring-doc.cn

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2