从版本 1.1.4 开始, Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。
要从 Spring 应用程序中使用它,jar 必须存在于 Classpath 中。
它是 Spring for Apache Kafka 项目的可选依赖项,不能传递下载。kafka-streams
基本
参考 Apache Kafka Streams 文档建议了以下使用 API 的方法:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
所以,我们有两个主要组件:
-
StreamsBuilder
:使用 API 构建(或)实例。KStream
KTable
-
KafkaStreams
:管理这些实例的生命周期。
单个实例向实例公开的所有实例将同时启动和停止,即使它们具有不同的逻辑。
换句话说,由 a 定义的所有流都与单个生命周期控制相关联。
实例一旦被 关闭,就无法重新启动。
相反,必须创建一个新实例来重新启动流处理。KStream KafkaStreams StreamsBuilder StreamsBuilder KafkaStreams streams.close() KafkaStreams |
单个实例向实例公开的所有实例将同时启动和停止,即使它们具有不同的逻辑。
换句话说,由 a 定义的所有流都与单个生命周期控制相关联。
实例一旦被 关闭,就无法重新启动。
相反,必须创建一个新实例来重新启动流处理。KStream KafkaStreams StreamsBuilder StreamsBuilder KafkaStreams streams.close() KafkaStreams |
Spring 管理
为了从 Spring 应用程序上下文的角度简化 Kafka Streams 的使用,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了 .
这是一个将单例实例公开为 bean 的实现。
下面的示例创建这样的 bean:StreamsBuilderFactoryBean
AbstractFactoryBean
StreamsBuilder
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在作为对象提供,而不是 .KafkaStreamsConfiguration StreamsConfig |
还实现了 管理内部实例的生命周期。
与 Kafka Streams API 类似,您必须在启动 .
这也适用于 Spring API for Kafka Streams。
因此,当您在 上使用 default 时,必须在刷新应用程序上下文之前声明 实例。
例如,可以是常规的 bean 定义,而使用 Kafka Streams API 不会产生任何影响。
以下示例显示了如何执行此操作:StreamsBuilderFactoryBean
SmartLifecycle
KafkaStreams
KStream
KafkaStreams
autoStartup = true
StreamsBuilderFactoryBean
KStream
StreamsBuilder
KStream
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果要手动控制生命周期(例如,通过某些条件停止和启动),则可以使用工厂 bean () 前缀直接引用 bean。
由于使用其内部实例,因此可以安全地停止并再次重新启动它。
在每个 上创建一个新的 。
如果您想单独控制实例的生命周期,您还可以考虑使用不同的实例。StreamsBuilderFactoryBean
&
StreamsBuilderFactoryBean
KafkaStreams
KafkaStreams
start()
StreamsBuilderFactoryBean
KStream
您还可以在 上指定 、 和 选项,这些选项将委派给内部实例。
此外,除了间接设置这些选项外,从版本 2.1.5 开始,您还可以使用回调接口来配置内部实例。
请注意,会覆盖 提供的选项。
如果需要直接执行某些操作,则可以使用 .
你可以按类型自动装配 bean,但你应该确保在 bean 定义中使用 full 类型,如下例所示:KafkaStreams.StateListener
Thread.UncaughtExceptionHandler
StateRestoreListener
StreamsBuilderFactoryBean
KafkaStreams
StreamsBuilderFactoryBean
KafkaStreamsCustomizer
KafkaStreams
KafkaStreamsCustomizer
StreamsBuilderFactoryBean
KafkaStreams
KafkaStreams
StreamsBuilderFactoryBean.getKafkaStreams()
StreamsBuilderFactoryBean
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,如果使用接口 bean 定义,则可以按名称添加 for injection。
以下示例显示了如何执行此操作:@Qualifier
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从版本 2.4.1 开始,工厂 bean 具有 type 为 ;这允许在创建流之前自定义 (例如,添加 state store) 和/或 。infrastructureCustomizer
KafkaStreamsInfrastructureCustomizer
StreamsBuilder
Topology
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供默认的 no-op 实现,以避免在不需要一种方法时必须同时实现这两种方法。
A 用于需要应用多个定制器的情况。CompositeKafkaStreamsInfrastructureCustomizer
从版本 2.2 开始,流配置现在作为对象提供,而不是 .KafkaStreamsConfiguration StreamsConfig |
KafkaStreams Micrometer 支持
在 2.5.3 版中引入,您可以配置 a 以自动为工厂 bean 管理的对象注册千分尺:KafkaStreamsMicrometerListener
KafkaStreams
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
Streams JSON 序列化和反序列化
为了在以 JSON 格式读取或写入主题或状态存储时对数据进行序列化和反序列化,Spring for Apache Kafka 提供了一种使用 JSON 的实现,委托给 和 序列化、反序列化和消息转换中所述。
该实现通过其构造函数(target type 或 )提供相同的配置选项。
在以下示例中,我们使用 the 来序列化和反序列化 Kafka 流的有效负载(在需要实例的地方可以以类似的方式使用 the):JsonSerde
JsonSerializer
JsonDeserializer
JsonSerde
ObjectMapper
JsonSerde
Cat
JsonSerde
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
当以编程方式构造序列化器/反序列化器以在 producer/consumer 工厂中使用时,从版本 2.3 开始,你可以使用 Fluent API,这简化了配置。
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
用KafkaStreamBrancher
该类引入了一种更方便的方法,可以在 .KafkaStreamBrancher
KStream
请考虑以下示例,该示例不使用 :KafkaStreamBrancher
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用 :KafkaStreamBrancher
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
配置
要配置 Kafka Streams 环境,需要一个实例。
有关所有可能的选项,请参阅 Apache Kafka 文档。StreamsBuilderFactoryBean
KafkaStreamsConfiguration
从版本 2.2 开始,流配置现在作为对象提供,而不是作为 .KafkaStreamsConfiguration StreamsConfig |
为了在大多数情况下避免样板代码,尤其是在开发微服务时, Spring for Apache Kafka 提供了注释,您应该将其放在类上。
您只需声明一个名为 .
名为 的 bean 将在应用程序上下文中自动声明。
您也可以声明和使用任何其他 bean。
您可以通过提供实现 .
如果有多个这样的 bean,它们将根据它们的 property 进行应用。@EnableKafkaStreams
@Configuration
KafkaStreamsConfiguration
defaultKafkaStreamsConfig
StreamsBuilderFactoryBean
defaultKafkaStreamsBuilder
StreamsBuilderFactoryBean
StreamsBuilderFactoryBeanConfigurer
Ordered.order
清理和停止配置
当工厂停止时,使用 2 个参数调用 :KafkaStreams.close()
-
closeTimeout :等待线程关闭的时间(默认设置为 10 秒)。可以使用 进行配置。
DEFAULT_CLOSE_TIMEOUT
StreamsBuilderFactoryBean.setCloseTimeout()
-
leaveGroupOnClose :触发来自组的使用者离开调用(默认为 )。可以使用 进行配置。
false
StreamsBuilderFactoryBean.setLeaveGroupOnClose()
默认情况下,当工厂 Bean 停止时,将调用该方法。
从版本 2.1.2 开始,工厂 bean 具有额外的构造函数,采用具有属性的对象,以控制是否在期间调用该方法,还是在两者之间调用。
从版本 2.7 开始,默认值是永不清理本地状态。KafkaStreams.cleanUp()
CleanupConfig
cleanUp()
start()
stop()
从版本 2.2 开始,流配置现在作为对象提供,而不是作为 .KafkaStreamsConfiguration StreamsConfig |
标头扩充器
版本 3.0 添加了 ;提供与 deprecated 相同的功能,后者实现了 deprecated 接口。
这可用于在流处理中添加标头;标头值是 SPEL 表达式;表达式求值的根对象有 3 个属性:HeaderEnricherProcessor
ContextualProcessor
HeaderEnricher
Transformer
-
record
- 的 (、、、org.apache.kafka.streams.processor.api.Record
key
value
timestamp
headers
) -
key
- 当前记录的 Key -
value
- 当前记录的值 -
context
- 的 ,允许访问当前记录元数据ProcessorContext
表达式必须返回 a 或 a (将转换为 using )。byte[]
String
byte[]
UTF-8
要在流中使用扩充器:
.process(() -> new HeaderEnricherProcessor(expressions))
处理器不会更改 或 ;它只是添加标题。key
value
每条记录都需要一个新实例。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
下面是一个简单的示例,添加了一个 Literal 标头和一个变量:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
每条记录都需要一个新实例。 |
MessagingProcessor
版本 3.0 添加了 的扩展,提供了与实现已弃用接口的已弃用版本相同的功能。
这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。
转换器需要 .MessagingProcessor
ContextualProcessor
MessagingTransformer
Transformer
MessagingFunction
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 使用其 .
它还需要将键、值和元数据(包括 headers)与 Spring Messaging 相互转换。
有关更多信息,请参见 [从 KStream
调用 Spring 集成流]。GatewayProxyFactoryBean
MessagingMessageConverter
Message<?>
从反序列化异常中恢复
版本 2.3 引入了 which 可以在发生反序列化异常时采取一些操作。
请参阅有关 的 Kafka 文档,其中 是实现。
配置了 implementation.
该框架提供了将失败的记录发送到死信主题。
有关此恢复程序的更多信息,请参阅 Publishing Dead-letter Records。RecoveringDeserializationExceptionHandler
DeserializationExceptionHandler
RecoveringDeserializationExceptionHandler
RecoveringDeserializationExceptionHandler
ConsumerRecordRecoverer
DeadLetterPublishingRecoverer
要配置恢复器,请将以下属性添加到您的 streams 配置中:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,该 bean 可以是你自己的 实现。recoverer()
ConsumerRecordRecoverer
交互式查询支持
从版本 3.2 开始, Spring for Apache Kafka 提供了在 Kafka Streams 中进行交互式查询所需的基本工具。
交互式查询在有状态 Kafka Streams 应用程序中非常有用,因为它们提供了一种持续查询应用程序中有状态存储的方法。
因此,如果应用程序想要实现所考虑的系统的当前视图,交互式查询提供了一种实现此目的的方法。
要了解有关交互式查询的更多信息,请参阅此文。
Spring 中对 Apache Kafka 的支持以一个名为 API 为中心,该 API 是 Kafka Streams 库中交互式查询 API 的门面。
应用程序可以将此服务的实例创建为 bean,然后使用它来按其名称检索状态存储。KafkaStreamsInteractiveQueryService
以下代码片段显示了一个示例。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
return kafkaStreamsInteractiveQueryService;
}
假设 Kafka Streams 应用程序具有一个名为 的状态存储,则可以通过 API 检索该存储,如下所示。app-store
KafkStreamsInteractiveQuery
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
一旦应用程序获得对状态存储的访问权限,它就可以从中查询键值信息。
在这种情况下,应用程序使用的状态存储是只读键值存储。
Kafka Streams 应用程序还可以使用其他类型的状态存储。
例如,如果应用程序更喜欢查询基于窗口的存储,它可以在 Kafka Streams 应用程序业务逻辑中构建该存储,然后检索它。
因此,用于检索可查询存储的 API 具有泛型存储类型签名,以便最终用户可以分配适当的类型。KafkaStreamsInteractiveQueryService
下面是来自 API 的类型签名。
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
调用此方法时,用户可以指定请求正确的 state store 类型,就像我们在上面的例子中所做的那样。
重试状态存储检索
尝试使用 检索状态存储时,由于各种原因,可能会找不到状态存储。
如果这些原因是暂时的,则提供一个选项,通过允许注入 custom 来重试检索状态存储。
默认情况下,用于的最大尝试次数为 3 次,固定回退为 1 秒。KafkaStreamsInteractiveQueryService
KafkaStreamsInteractiveQueryService
RetryTemplate
RetryTemmplate
KafkaStreamsInteractiveQueryService
下面介绍如何将自定义注入到最大尝试次数为 10 次。RetryTemmplate
KafkaStreamsInteractiveQueryService
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
查询远程状态存储
上面显示的用于检索状态存储的 API 适用于本地可用的键值状态存储。
在生产设置中,Kafka Streams 应用程序很可能根据分区数量进行分布。
如果一个主题有四个分区,并且有四个相同的 Kafka Streams 处理器实例正在运行,则每个实例可能负责处理该主题的单个分区。
在这种情况下,调用 might not give the correct result that an instance is looking ,尽管它可能会返回一个有效的存储。
假设具有四个分区的主题包含有关各种键的数据,并且单个分区始终负责特定键。
如果调用的实例正在查找有关此实例不托管的键的信息,则它不会收到任何数据。
这是因为当前的 Kafka Streams 实例对此密钥一无所知。
要解决此问题,调用实例首先需要确保它们具有托管特定密钥的 Kafka Streams 处理器实例的主机信息。
这可以从任何 Kafka Streams 实例中检索,如下所示。retrieveQueryableStore
retrieveQueryableStore
retrieveQueryableStore
application.id
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
在上面的示例代码中,调用实例正在从名为 的状态存储中查询特定键。
API 还需要相应的密钥序列化器,在本例中为 .
Kafka Streams 会查看同一实例下的所有实例,并尝试查找托管此特定键的实例。
找到后,它将该主机信息作为对象返回。12345
app-store
IntegerSerializer
application.id
HostInfo
API 如下所示:
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
当以这样的分布式方式使用相同的 Kafka Streams 处理器的多个实例时,应用程序应该提供一个 RPC 层,可以通过 RPC 端点(如 REST 端点)查询状态存储。
有关此内容的更多详细信息,请参阅此文章。
使用 Spring for Apache Kafka 时,使用 spring-web 技术添加基于 Spring 的 REST 端点非常容易。
一旦有了 REST 端点,就可以使用它从任何 Kafka Streams 实例查询状态存储,前提是该实例知道密钥的托管位置。application.id
HostInfo
如果托管实例的密钥是当前实例,则应用程序不需要调用 RPC 机制,而是进行 JVM 内调用。
但是,问题在于应用程序可能不知道进行调用的实例是托管密钥的位置,因为特定服务器可能会因使用者重新平衡而丢失分区。
为了解决这个问题,提供了一个方便的 API,用于通过返回当前 .
其思路是,应用程序可以首先获取有关密钥保存位置的信息,然后将 与有关当前实例的信息进行比较。
如果数据匹配,则可以通过 继续进行简单的 JVM 调用,否则请选择 RPC 选项。KafkaStreamsInteractiveQueryService
getCurrentKafkaStreamsApplicationHostInfo()
HostInfo
HostInfo
HostInfo
retrieveQueryableStore
Kafka Streams 示例
以下示例结合了我们在本章中介绍的各种主题:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}