此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

如果在应用程序上下文中定义 Bean,它可以自动向代理添加主题。 为此,您可以将 for each topic 添加到应用程序上下文中。 2.3 版引入了一个新类,使创建此类 Bean 更加方便。 以下示例演示如何执行此操作:KafkaAdminNewTopic@BeanTopicBuilderSpring中文文档

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, List.of(0, 1))
            .assignReplicas(1, List.of(1, 2))
            .assignReplicas(2, List.of(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))

@Bean
fun topic1() =
    TopicBuilder.name("thing1")
        .partitions(10)
        .replicas(3)
        .compact()
        .build()

@Bean
fun topic2() =
    TopicBuilder.name("thing2")
        .partitions(10)
        .replicas(3)
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

@Bean
fun topic3() =
    TopicBuilder.name("thing3")
        .assignReplicas(0, Arrays.asList(0, 1))
        .assignReplicas(1, Arrays.asList(1, 2))
        .assignReplicas(2, Arrays.asList(2, 0))
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

从 V2.6 开始,您可以省略和/或,代理缺省值将应用于这些属性。 代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅 KIP-464partitions()replicas()Spring中文文档

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()

@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()

@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()

从 V2.7 开始,您可以在单个 Bean 定义中声明多个 s:NewTopicKafkaAdmin.NewTopicsSpring中文文档

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
    TopicBuilder.name("defaultBoth")
        .build(),
    TopicBuilder.name("defaultPart")
        .replicas(1)
        .build(),
    TopicBuilder.name("defaultRepl")
        .partitions(3)
        .build()
)
使用 Spring Boot 时,bean 会自动注册,因此您只需要 (和/或 ) s。KafkaAdminNewTopicNewTopics@Bean
使用 Spring Boot 时,bean 会自动注册,因此您只需要 (和/或 ) s。KafkaAdminNewTopicNewTopics@Bean

缺省情况下,如果代理不可用,那么会记录一条消息,但上下文会继续加载。 可以通过编程方式调用管理员的方法,以便稍后重试。 如果希望将此情况视为致命条件,请将管理员的属性设置为 。 然后,上下文无法初始化。initialize()fatalIfBrokerNotAvailabletrueSpring中文文档

如果代理支持它(1.0.0 或更高版本),则如果发现现有主题的分区少于 .NewTopic.numPartitions
如果代理支持它(1.0.0 或更高版本),则如果发现现有主题的分区少于 .NewTopic.numPartitions

从版本 2.7 开始,提供了在运行时创建和检查主题的方法。KafkaAdminSpring中文文档

对于更高级的功能,您可以直接使用 。 以下示例演示如何执行此操作:AdminClientSpring中文文档

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

从版本 2.9.10 和 3.0.9 开始,您可以提供一个 用于确定是否应考虑创建或修改特定 Bean。 例如,如果您有多个实例指向不同的集群,并且您希望选择应由每个管理员创建或修改的主题,这将非常有用。Predicate<NewTopic>NewTopicKafkaAdminSpring中文文档

admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));