此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

从版本 1.1.4 开始,Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。 要在 Spring 应用程序中使用它,jar 必须存在于类路径中。 它是 Spring for Apache Kafka 项目的可选依赖项,不会以可传递方式下载。kafka-streamsSpring中文文档

基本

参考 Apache Kafka Streams 文档建议了以下 API 使用方式:Spring中文文档

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

因此,我们有两个主要组成部分:Spring中文文档

单个实例向实例公开的所有实例都会同时启动和停止,即使它们具有不同的逻辑。 换言之,由 a 定义的所有流都与单个生命周期控件相关联。 实例一旦被关闭,就无法重新启动。 相反,必须创建一个新实例来重新启动流处理。KStreamKafkaStreamsStreamsBuilderStreamsBuilderKafkaStreamsstreams.close()KafkaStreams
单个实例向实例公开的所有实例都会同时启动和停止,即使它们具有不同的逻辑。 换言之,由 a 定义的所有流都与单个生命周期控件相关联。 实例一旦被关闭,就无法重新启动。 相反,必须创建一个新实例来重新启动流处理。KStreamKafkaStreamsStreamsBuilderStreamsBuilderKafkaStreamsstreams.close()KafkaStreams

弹簧管理

为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了 . 这是一个将单例实例公开为 Bean 的实现。 下面的示例创建这样的 Bean:StreamsBuilderFactoryBeanAbstractFactoryBeanStreamsBuilderSpring中文文档

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在以对象的形式提供,而不是 .KafkaStreamsConfigurationStreamsConfig

还实现了管理内部实例的生命周期。 与 Kafka Streams API 类似,您必须在启动 . 这也适用于 Spring API for Kafka Streams。 因此,在 上使用 default 时,必须在刷新应用程序上下文之前声明 上的实例。 例如,可以是常规的 Bean 定义,而使用 Kafka Streams API 时不会产生任何影响。 以下示例演示如何执行此操作:StreamsBuilderFactoryBeanSmartLifecycleKafkaStreamsKStreamKafkaStreamsautoStartup = trueStreamsBuilderFactoryBeanKStreamStreamsBuilderKStreamSpring中文文档

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果要手动控制生命周期(例如,在某个条件下停止和启动),可以使用工厂 bean () 前缀直接引用 Bean。 由于使用其内部实例,因此可以安全地停止并重新启动它。 在每个 . 如果要单独控制实例的生命周期,还可以考虑使用不同的实例。StreamsBuilderFactoryBean&StreamsBuilderFactoryBeanKafkaStreamsKafkaStreamsstart()StreamsBuilderFactoryBeanKStreamSpring中文文档

您还可以在 上指定 、 和 选项,这些选项将委托给内部实例。 此外,从版本 2.1.5 开始,除了间接设置这些选项外,还可以使用回调接口来配置内部实例。 请注意,这将覆盖 提供的选项。 如果需要直接执行某些操作,可以使用 访问该内部实例。 您可以按类型自动连接 Bean,但应确保在 Bean 定义中使用完整类型,如以下示例所示:KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListenerStreamsBuilderFactoryBeanKafkaStreamsStreamsBuilderFactoryBeanKafkaStreamsCustomizerKafkaStreamsKafkaStreamsCustomizerStreamsBuilderFactoryBeanKafkaStreamsKafkaStreamsStreamsBuilderFactoryBean.getKafkaStreams()StreamsBuilderFactoryBeanSpring中文文档

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

或者,如果使用接口 Bean 定义,则可以按名称添加注入。 以下示例演示如何执行此操作:@QualifierSpring中文文档

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

从 2.4.1 版开始,工厂 Bean 具有 type 的新属性;这允许在创建流之前自定义(例如添加状态存储)和/或 。infrastructureCustomizerKafkaStreamsInfrastructureCustomizerStreamsBuilderTopologySpring中文文档

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

提供了默认的无操作实现,以避免在不需要时必须实现这两种方法。Spring中文文档

当您需要应用多个定制器时,将提供 A。CompositeKafkaStreamsInfrastructureCustomizerSpring中文文档

从版本 2.2 开始,流配置现在以对象的形式提供,而不是 .KafkaStreamsConfigurationStreamsConfig

KafkaStreams 千分尺支持

在 V2.5.3 中引入,您可以配置 a 以自动注册由工厂 Bean 管理的对象的千分尺:KafkaStreamsMicrometerListenerKafkaStreamsSpring中文文档

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

流 JSON 序列化和反序列化

为了在以 JSON 格式读取或写入主题或状态存储时序列化和反序列化数据,Spring for Apache Kafka 提供了一个使用 JSON 的实现,委托给序列化、反序列化和消息转换中所述的 和。 该实现通过其构造函数(目标类型或 )提供相同的配置选项。 在以下示例中,我们使用 to 序列化和反序列化 Kafka 流的有效负载(只要需要实例,就可以以类似的方式使用 ):JsonSerdeJsonSerializerJsonDeserializerJsonSerdeObjectMapperJsonSerdeCatJsonSerdeSpring中文文档

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

从版本 2.3 开始,以编程方式构造序列化程序/反序列化程序以在生产者/使用者工厂中使用时,可以使用 Fluent API,从而简化配置。Spring中文文档

stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

KafkaStreamBrancher

该类引入了一种更方便的方法,用于在 之上构建条件分支。KafkaStreamBrancherKStreamSpring中文文档

请考虑以下不使用 :KafkaStreamBrancherSpring中文文档

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下示例使用:KafkaStreamBrancherSpring中文文档

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

配置

要配置 Kafka Streams 环境,需要一个实例。 有关所有可能的选项,请参阅 Apache Kafka 文档StreamsBuilderFactoryBeanKafkaStreamsConfigurationSpring中文文档

从版本 2.2 开始,流配置现在以对象的形式提供,而不是以 .KafkaStreamsConfigurationStreamsConfig

为了避免在大多数情况下使用样板代码,尤其是在开发微服务时,Spring for Apache Kafka 提供了注释,您应该将其放在类上。 您只需要声明一个名为 的 Bean 即可。 在应用程序上下文中自动声明名为 的 Bean。 您也可以声明和使用任何其他 bean。 您可以通过提供实现 的 Bean 来执行该 Bean 的其他定制。 如果有多个这样的豆子,它们将根据它们的属性进行应用。@EnableKafkaStreams@ConfigurationKafkaStreamsConfigurationdefaultKafkaStreamsConfigStreamsBuilderFactoryBeandefaultKafkaStreamsBuilderStreamsBuilderFactoryBeanStreamsBuilderFactoryBeanConfigurerOrdered.orderSpring中文文档

清理和停止配置

当工厂停止时,使用 2 个参数调用:KafkaStreams.close()Spring中文文档

  • closeTimeout :等待线程关闭的时间(默认设置为 10 秒)。可以使用 进行配置。DEFAULT_CLOSE_TIMEOUTStreamsBuilderFactoryBean.setCloseTimeout()Spring中文文档

  • leaveGroupOnClose :触发来自组的使用者离职呼叫(默认为 )。可以使用 进行配置。falseStreamsBuilderFactoryBean.setLeaveGroupOnClose()Spring中文文档

缺省情况下,当工厂 Bean 停止时,将调用该方法。 从 V2.1.2 开始,工厂 Bean 具有其他构造函数,采用具有属性的对象来控制方法是在期间调用还是两者都不调用。 从版本 2.7 开始,默认设置是从不清理本地状态。KafkaStreams.cleanUp()CleanupConfigcleanUp()start()stop()Spring中文文档

从版本 2.2 开始,流配置现在以对象的形式提供,而不是以 .KafkaStreamsConfigurationStreamsConfig

标头 Enricher

版本 3.0 添加了 ;提供与实现已弃用接口的已弃用接口相同的功能。 这可用于在流处理中添加标头;标头值是 SpEL 表达式;表达式计算的根对象具有 3 个属性:HeaderEnricherProcessorContextualProcessorHeaderEnricherTransformerSpring中文文档

表达式必须返回 a 或 a(将转换为 using )。byte[]Stringbyte[]UTF-8Spring中文文档

要在流中使用 enricher,请执行以下操作:Spring中文文档

.process(() -> new HeaderEnricherProcessor(expressions))

处理器不会更改 或 ;它只是添加标题。keyvalueSpring中文文档

每条记录都需要一个新实例。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

下面是一个简单的示例,添加一个文本标头和一个变量:Spring中文文档

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);
每条记录都需要一个新实例。

MessagingProcessor

版本 3.0 添加了 的扩展,提供与实现已弃用接口的已弃用接口相同的功能。 这允许 Kafka Streams 拓扑与 Spring Messaging 组件(如 Spring Integration 流)进行交互。 变压器需要实现 。MessagingProcessorContextualProcessorMessagingTransformerTransformerMessagingFunctionSpring中文文档

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration 使用其 . 它还需要将键、值和元数据(包括标头)转换为Spring Messaging/从Spring Messaging转换。 有关更多信息,请参见 [从 KStream 调用 Spring 集成流]。GatewayProxyFactoryBeanMessagingMessageConverterMessage<?>Spring中文文档

从反序列化异常中恢复

版本 2.3 引入了在发生反序列化异常时可以执行某些操作的功能。 请参阅 的 Kafka 文档,其中 是实现。 配置了实现。 该框架提供了将失败的记录发送到死信主题的 WHICH 。 有关此恢复器的详细信息,请参阅发布死信记录RecoveringDeserializationExceptionHandlerDeserializationExceptionHandlerRecoveringDeserializationExceptionHandlerRecoveringDeserializationExceptionHandlerConsumerRecordRecovererDeadLetterPublishingRecovererSpring中文文档

要配置恢复器,请将以下属性添加到流配置中:Spring中文文档

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

当然,bean 也可以是你自己的实现。recoverer()ConsumerRecordRecovererSpring中文文档

交互式查询支持

从 3.2 版开始,Spring for Apache Kafka 提供了在 Kafka Streams 中进行交互式查询所需的基本功能。 交互式查询在有状态 Kafka Streams 应用程序中非常有用,因为它们提供了一种持续查询应用程序中的有状态存储的方法。 因此,如果应用程序想要实现所考虑的系统的当前视图,交互式查询提供了一种实现此目的的方法。 若要了解有关交互式查询的详细信息,请参阅此文章。 Spring 中对 Apache Kafka 的支持以一个名为 API 的 API 为中心,该 API 是 Kafka Streams 库中交互式查询 API 的外观。 应用程序可以将此服务的实例创建为 Bean,然后使用它按其名称检索状态存储。KafkaStreamsInteractiveQueryServiceSpring中文文档

以下代码片段显示了一个示例。Spring中文文档

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

假设 Kafka Streams 应用程序有一个名为 的状态存储,则可以通过 API 检索该存储,如下所示。app-storeKafkStreamsInteractiveQuerySpring中文文档

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

一旦应用程序获得对状态存储的访问权限,它就可以从中查询键值信息。Spring中文文档

在这种情况下,应用程序使用的状态存储是只读键值存储。 Kafka Streams 应用程序可以使用其他类型的状态存储。 例如,如果应用程序更喜欢查询基于窗口的存储,则可以在 Kafka Streams 应用程序业务逻辑中构建该存储,然后稍后检索它。 因此,用于检索可查询存储的 API 具有通用存储类型签名,以便最终用户可以分配正确的类型。KafkaStreamsInteractiveQueryServiceSpring中文文档

下面是 API 中的类型签名。Spring中文文档

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

调用此方法时,用户可以指定请求正确的状态存储类型,就像我们在上面的示例中所做的那样。Spring中文文档

重试状态存储检索

尝试使用 检索状态存储时,可能会由于各种原因找不到状态存储。 如果这些原因是暂时的,则提供一个选项,通过允许注入自定义 . 默认情况下,中使用的 that 使用最大尝试次数为 3 次,固定退避时间为 1 秒。KafkaStreamsInteractiveQueryServiceKafkaStreamsInteractiveQueryServiceRetryTemplateRetryTemmplateKafkaStreamsInteractiveQueryServiceSpring中文文档

以下是如何以最大尝试次数为 10 次将自定义注入其中的方法。RetryTemmplateKafkaStreamsInteractiveQueryServiceSpring中文文档

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

查询远程状态存储

上面显示的用于检索状态存储的 API - 适用于本地可用的键值状态存储。 在生产设置中,Kafka Streams 应用程序最有可能根据分区数进行分发。 如果一个主题有四个分区,并且有四个实例正在运行同一个 Kafka Streams 处理器,则每个实例可能负责处理该主题中的单个分区。 在这种情况下,调用可能不会给出实例正在查找的正确结果,尽管它可能会返回有效的存储。 假设具有四个分区的主题具有有关各种键的数据,并且单个分区始终负责特定键。 如果正在调用的实例正在查找有关此实例未托管的密钥的信息,则它不会收到任何数据。 这是因为当前的 Kafka Streams 实例对此密钥一无所知。 若要解决此问题,调用实例首先需要确保它们具有托管特定密钥的 Kafka Streams 处理器实例的主机信息。 这可以从任何 Kafka Streams 实例中检索,如下所示。retrieveQueryableStoreretrieveQueryableStoreretrieveQueryableStoreapplication.idSpring中文文档

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

在上面的示例代码中,调用实例正在从名为 的状态存储中查询特定键。 API 还需要相应的密钥序列化程序,在本例中为 . Kafka Streams 会查看同一下的所有实例,并尝试查找哪个实例托管此特定键, 找到后,它将以对象的形式返回该主机信息。12345app-storeIntegerSerializerapplication.idHostInfoSpring中文文档

这是 API 的样子:Spring中文文档

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

当以这样的分布式方式使用相同的 Kafka Streams 处理器的多个实例时,应用程序应该提供一个 RPC 层,在该层中可以通过 RPC 端点(如 REST 端点)查询状态存储。 有关这方面的更多详细信息,请参阅此文章。 使用 Spring for Apache Kafka 时,使用 spring-web 技术添加基于 Spring 的 REST 端点非常容易。 一旦有了 REST 端点,就可以使用它从任何 Kafka Streams 实例查询状态存储,前提是实例知道密钥的托管位置。application.idHostInfoSpring中文文档

如果托管实例的密钥是当前实例,则应用程序不需要调用 RPC 机制,而是进行 JVM 内调用。 但是,问题在于应用程序可能不知道进行调用的实例是托管密钥的位置,因为特定服务器可能会由于使用者重新平衡而丢失分区。 要解决此问题,请提供一个方便的 API,用于通过返回当前 . 这个想法是应用程序可以首先获取有关密钥保存位置的信息,然后将其与有关当前实例的信息进行比较。 如果数据匹配,则可以通过 继续执行简单的 JVM 调用,否则使用 RPC 选项。KafkaStreamsInteractiveQueryServicegetCurrentKafkaStreamsApplicationHostInfo()HostInfoHostInfoHostInforetrieveQueryableStoreSpring中文文档

Kafka 流示例

以下示例结合了本章中介绍的各种主题:Spring中文文档

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}