对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cadn.net.cn

配置主题

如果您定义了KafkaAdminbean 时,它可以自动向 broker 添加主题。 为此,您可以添加NewTopic @Bean对于每个主题添加到应用程序上下文。 版本 2.3 引入了一个新类TopicBuilder使此类 bean 的创建更加方便。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, List.of(0, 1))
            .assignReplicas(1, List.of(1, 2))
            .assignReplicas(2, List.of(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

从版本 2.6 开始,您可以省略partitions()和/或replicas()代理默认值将应用于这些属性。 代理版本必须至少为 2.4.0 才能支持此功能 - 请参阅 KIP-464spring-doc.cadn.net.cn

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}

从版本 2.7 开始,您可以声明多个NewTopics 的KafkaAdmin.NewTopicsbean 定义:spring-doc.cadn.net.cn

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}
使用 Spring Boot 时,KafkaAdminbean 会自动注册,因此您只需要NewTopic(和/或NewTopics) @Beans.

默认情况下,如果代理不可用,则会记录一条消息,但会继续加载上下文。 您可以通过编程方式调用管理员的initialize()方法稍后重试。 如果您希望这种情况被视为致命的,请将管理员的fatalIfBrokerNotAvailableproperty 设置为true. 然后,上下文无法初始化。spring-doc.cadn.net.cn

如果 broker 支持它(1.0.0 或更高版本),则如果发现现有主题的分区数少于NewTopic.numPartitions.

从版本 2.7 开始,KafkaAdmin提供在运行时创建和检查主题的方法。spring-doc.cadn.net.cn

要获得更高级的功能,您可以使用AdminClient径直。 以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

从版本 2.9.10、3.0.9 开始,您可以提供Predicate<NewTopic>,这可用于确定特定的NewTopic应考虑创建或修改 bean。 这很有用,例如,如果您有多个KafkaAdmin实例指向不同的集群,并且您希望选择应由每个管理员创建或修改的主题。spring-doc.cadn.net.cn

admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));

APP信息