对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
集装箱工厂
如 @KafkaListener
Annotation 中所述,a 用于为带 Comments 的方法创建容器。ConcurrentKafkaListenerContainerFactory
从版本 2.2 开始,您可以使用同一工厂创建任何 .
如果要创建多个具有相似属性的容器,或者希望使用一些外部配置的工厂,例如 Spring Boot 自动配置提供的工厂,这可能很有用。
创建容器后,您可以进一步修改其属性,其中许多属性是使用 设置的。
以下示例配置了一个 :ConcurrentMessageListenerContainer
container.getContainerProperties()
ConcurrentMessageListenerContainer
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
以这种方式创建的容器不会添加到终端节点注册表中。
应将它们创建为定义,以便将它们注册到应用程序上下文中。@Bean |
从版本 2.3.4 开始,您可以在创建和配置每个容器后向工厂添加 以进一步配置每个容器。ContainerCustomizer
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
从版本 3.1 开始,还可以通过在 KafkaListener 注释上指定“ContainerPostProcessor”的 bean 名称,在单个侦听器上应用相同类型的自定义。
@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
return container -> { /* customize the container */ };
}
...
@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)