对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
配置主题
如果您定义了KafkaAdmin
bean 时,它可以自动向 broker 添加主题。
为此,您可以添加NewTopic
@Bean
对于每个主题添加到应用程序上下文。
版本 2.3 引入了一个新类TopicBuilder
使此类 bean 的创建更加方便。
以下示例显示了如何执行此作:
-
Java
-
Kotlin
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, List.of(0, 1))
.assignReplicas(1, List.of(1, 2))
.assignReplicas(2, List.of(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))
@Bean
fun topic1() =
TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build()
@Bean
fun topic2() =
TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
@Bean
fun topic3() =
TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
从版本 2.6 开始,您可以省略partitions()
和/或replicas()
代理默认值将应用于这些属性。
代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅 KIP-464。
-
Java
-
Kotlin
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()
@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()
@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
从版本 2.7 开始,您可以声明多个NewTopic
s 的KafkaAdmin.NewTopics
bean 定义:
-
Java
-
Kotlin
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build()
)
使用 Spring Boot 时,KafkaAdmin bean 会自动注册,因此您只需要NewTopic (和/或NewTopics ) @Bean s. |
默认情况下,如果代理不可用,则会记录一条消息,但会继续加载上下文。
您可以通过编程方式调用管理员的initialize()
方法稍后重试。
如果您希望这种情况被视为致命的,请将管理员的fatalIfBrokerNotAvailable
property 设置为true
.
然后,上下文无法初始化。
如果 broker 支持它(1.0.0 或更高版本),则如果发现现有主题的分区数少于NewTopic.numPartitions . |
从版本 2.7 开始,KafkaAdmin
提供在运行时创建和检查主题的方法。
-
createOrModifyTopics
-
describeTopics
要获得更高级的功能,您可以使用AdminClient
径直。
以下示例显示了如何执行此作:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
从版本 2.9.10、3.0.9 开始,您可以提供Predicate<NewTopic>
,这可用于确定特定的NewTopic
应考虑创建或修改 bean。
这很有用,例如,如果您有多个KafkaAdmin
实例指向不同的集群,并且您希望选择应由每个管理员创建或修改的主题。
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));