对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4spring-doc.cn

对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4spring-doc.cn

从版本 1.1.4 开始, Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。 要从 Spring 应用程序中使用它,jar 必须存在于 Classpath 中。 它是 Spring for Apache Kafka 项目的可选依赖项,不能传递下载。kafka-streamsspring-doc.cn

基本

参考 Apache Kafka Streams 文档建议了以下使用 API 的方法:spring-doc.cn

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

所以,我们有两个主要组件:spring-doc.cn

  • StreamsBuilder:使用 API 构建(或)实例。KStreamKTablespring-doc.cn

  • KafkaStreams:管理这些实例的生命周期。spring-doc.cn

单个实例向实例公开的所有实例将同时启动和停止,即使它们具有不同的逻辑。 换句话说,由 a 定义的所有流都与单个生命周期控制相关联。 实例一旦被 关闭,就无法重新启动。 相反,必须创建一个新实例来重新启动流处理。KStreamKafkaStreamsStreamsBuilderStreamsBuilderKafkaStreamsstreams.close()KafkaStreams
单个实例向实例公开的所有实例将同时启动和停止,即使它们具有不同的逻辑。 换句话说,由 a 定义的所有流都与单个生命周期控制相关联。 实例一旦被 关闭,就无法重新启动。 相反,必须创建一个新实例来重新启动流处理。KStreamKafkaStreamsStreamsBuilderStreamsBuilderKafkaStreamsstreams.close()KafkaStreams

Spring 管理

为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了 . 这是一个将单例实例公开为 bean 的实现。 下面的示例创建这样的 bean:StreamsBuilderFactoryBeanAbstractFactoryBeanStreamsBuilderspring-doc.cn

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在作为对象提供,而不是 .KafkaStreamsConfigurationStreamsConfig

还实现了 管理内部实例的生命周期。 与 Kafka Streams API 类似,您必须在启动 . 这也适用于 Spring API for Kafka Streams。 因此,当您在 上使用 default 时,必须在刷新应用程序上下文之前声明 实例。 例如,可以是常规的 bean 定义,而使用 Kafka Streams API 不会产生任何影响。 以下示例显示了如何执行此操作:StreamsBuilderFactoryBeanSmartLifecycleKafkaStreamsKStreamKafkaStreamsautoStartup = trueStreamsBuilderFactoryBeanKStreamStreamsBuilderKStreamspring-doc.cn

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果要手动控制生命周期(例如,通过某些条件停止和启动),则可以使用工厂 bean () 前缀直接引用 bean。 由于使用其内部实例,因此可以安全地停止并再次重新启动它。 在每个 上创建一个新的 。 如果您想单独控制实例的生命周期,您还可以考虑使用不同的实例。StreamsBuilderFactoryBean&StreamsBuilderFactoryBeanKafkaStreamsKafkaStreamsstart()StreamsBuilderFactoryBeanKStreamspring-doc.cn

您还可以在 上指定 、 和 选项,这些选项将委派给内部实例。 此外,除了间接设置这些选项外,从版本 2.1.5 开始,您还可以使用回调接口来配置内部实例。 请注意,会覆盖 提供的选项。 如果需要直接执行某些操作,则可以使用 . 你可以按类型自动装配 bean,但你应该确保在 bean 定义中使用 full 类型,如下例所示:KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListenerStreamsBuilderFactoryBeanKafkaStreamsStreamsBuilderFactoryBeanKafkaStreamsCustomizerKafkaStreamsKafkaStreamsCustomizerStreamsBuilderFactoryBeanKafkaStreamsKafkaStreamsStreamsBuilderFactoryBean.getKafkaStreams()StreamsBuilderFactoryBeanspring-doc.cn

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

或者,如果使用接口 bean 定义,则可以按名称添加 for injection。 以下示例显示了如何执行此操作:@Qualifierspring-doc.cn

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

从版本 2.4.1 开始,工厂 bean 具有 type 为 ;这允许在创建流之前自定义 (例如,添加 state store) 和/或 。infrastructureCustomizerKafkaStreamsInfrastructureCustomizerStreamsBuilderTopologyspring-doc.cn

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

提供默认的 no-op 实现,以避免在不需要一种方法时必须同时实现这两种方法。spring-doc.cn

A 用于需要应用多个定制器的情况。CompositeKafkaStreamsInfrastructureCustomizerspring-doc.cn

从版本 2.2 开始,流配置现在作为对象提供,而不是 .KafkaStreamsConfigurationStreamsConfig

KafkaStreams Micrometer 支持

在 2.5.3 版中引入,您可以配置 a 以自动为工厂 bean 管理的对象注册千分尺:KafkaStreamsMicrometerListenerKafkaStreamsspring-doc.cn

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

Streams JSON 序列化和反序列化

为了在以 JSON 格式读取或写入主题或状态存储时对数据进行序列化和反序列化,Spring for Apache Kafka 提供了一种使用 JSON 的实现,委托给 和 序列化、反序列化和消息转换中所述。 该实现通过其构造函数(target type 或 )提供相同的配置选项。 在以下示例中,我们使用 the 来序列化和反序列化 Kafka 流的有效负载(在需要实例的地方可以以类似的方式使用 the):JsonSerdeJsonSerializerJsonDeserializerJsonSerdeObjectMapperJsonSerdeCatJsonSerdespring-doc.cn

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

当以编程方式构造序列化器/反序列化器以在 producer/consumer 工厂中使用时,从版本 2.3 开始,你可以使用 Fluent API,这简化了配置。spring-doc.cn

stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

KafkaStreamBrancher

该类引入了一种更方便的方法,可以在 .KafkaStreamBrancherKStreamspring-doc.cn

请考虑以下示例,该示例不使用 :KafkaStreamBrancherspring-doc.cn

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下示例使用 :KafkaStreamBrancherspring-doc.cn

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

配置

要配置 Kafka Streams 环境,需要一个实例。 有关所有可能的选项,请参阅 Apache Kafka 文档StreamsBuilderFactoryBeanKafkaStreamsConfigurationspring-doc.cn

从版本 2.2 开始,流配置现在作为对象提供,而不是作为 .KafkaStreamsConfigurationStreamsConfig

为了在大多数情况下避免样板代码,尤其是在开发微服务时, Spring for Apache Kafka 提供了注释,您应该将其放在类上。 您只需声明一个名为 . 名为 的 bean 将在应用程序上下文中自动声明。 您也可以声明和使用任何其他 bean。 您可以通过提供实现 . 如果有多个这样的 bean,它们将根据它们的 property 进行应用。@EnableKafkaStreams@ConfigurationKafkaStreamsConfigurationdefaultKafkaStreamsConfigStreamsBuilderFactoryBeandefaultKafkaStreamsBuilderStreamsBuilderFactoryBeanStreamsBuilderFactoryBeanConfigurerOrdered.orderspring-doc.cn

默认情况下,当工厂 Bean 停止时,将调用该方法。 从版本 2.1.2 开始,工厂 bean 具有额外的构造函数,采用具有属性的对象,以控制是否在期间调用该方法,还是在两者之间调用。 从版本 2.7 开始,默认值是永不清理本地状态。KafkaStreams.cleanUp()CleanupConfigcleanUp()start()stop()spring-doc.cn

从版本 2.2 开始,流配置现在作为对象提供,而不是作为 .KafkaStreamsConfigurationStreamsConfig

标头扩充器

版本 3.0 添加了 ;提供与 deprecated 相同的功能,后者实现了 deprecated 接口。 这可用于在流处理中添加标头;标头值是 SPEL 表达式;表达式求值的根对象有 3 个属性:HeaderEnricherProcessorContextualProcessorHeaderEnricherTransformerspring-doc.cn

  • record- 的 (、、、org.apache.kafka.streams.processor.api.Recordkeyvaluetimestampheaders)spring-doc.cn

  • key- 当前记录的 Keyspring-doc.cn

  • value- 当前记录的值spring-doc.cn

  • context- 的 ,允许访问当前记录元数据ProcessorContextspring-doc.cn

表达式必须返回 a 或 a (将转换为 using )。byte[]Stringbyte[]UTF-8spring-doc.cn

要在流中使用扩充器:spring-doc.cn

.process(() -> new HeaderEnricherProcessor(expressions))

处理器不会更改 或 ;它只是添加标题。keyvaluespring-doc.cn

每条记录都需要一个新实例。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

下面是一个简单的示例,添加了一个 Literal 标头和一个变量:spring-doc.cn

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);
每条记录都需要一个新实例。

MessagingProcessor

版本 3.0 添加了 的扩展,提供了与实现已弃用接口的已弃用版本相同的功能。 这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。 转换器需要 .MessagingProcessorContextualProcessorMessagingTransformerTransformerMessagingFunctionspring-doc.cn

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration 使用其 . 它还需要将键、值和元数据(包括 headers)与 Spring Messaging 相互转换。 有关更多信息,请参见 [从 KStream 调用 Spring 集成流]。GatewayProxyFactoryBeanMessagingMessageConverterMessage<?>spring-doc.cn

从反序列化异常中恢复

版本 2.3 引入了 which 可以在发生反序列化异常时采取一些操作。 请参阅有关 的 Kafka 文档,其中 是实现。 配置了 implementation. 该框架提供了将失败的记录发送到死信主题。 有关此恢复程序的更多信息,请参阅 Publishing Dead-letter RecordsRecoveringDeserializationExceptionHandlerDeserializationExceptionHandlerRecoveringDeserializationExceptionHandlerRecoveringDeserializationExceptionHandlerConsumerRecordRecovererDeadLetterPublishingRecovererspring-doc.cn

要配置恢复器,请将以下属性添加到您的 streams 配置中:spring-doc.cn

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

当然,该 bean 可以是你自己的 实现。recoverer()ConsumerRecordRecovererspring-doc.cn

Kafka Streams 示例

以下示例结合了我们在本章中介绍的所有主题:spring-doc.cn

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}