参考指南

本指南介绍了 Spring Cloud Stream Binder 的 Apache Kafka 实现。 它包含有关其设计、使用和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 Apache Kafka 特定结构的信息。 此外,本指南还介绍了 Spring Cloud Stream 的 Kafka Streams 绑定能力。spring-doc.cn

1. Apache Kafka 活页夹

1.1. 用法

要使用 Apache Kafka Binder,您需要将依赖项添加到您的 Spring Cloud Stream 应用程序中,如以下示例中的 Maven 所示:spring-cloud-stream-binder-kafkaspring-doc.cn

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

或者,您也可以使用 Spring Cloud Stream Kafka Starter,如下面的 Maven 示例所示:spring-doc.cn

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

1.2. 概述

下图显示了 Apache Kafka Binder 如何运行的简化图:spring-doc.cn

kafka binder
图 1.Kafka 活页夹

Apache Kafka Binder 实现将每个目标映射到 Apache Kafka 主题。 使用者组直接映射到相同的 Apache Kafka 概念。 分区也直接映射到 Apache Kafka 分区。spring-doc.cn

该 Binder 当前使用 Apache Kafka version 。 此客户端可以与较旧的代理通信(请参阅 Kafka 文档),但某些功能可能不可用。 例如,对于早于 0.11.x.x 的版本,不支持本机标头。 此外,0.11.x.x 不支持该属性。kafka-clients2.3.1autoAddPartitionsspring-doc.cn

1.3. 配置选项

本节包含 Apache Kafka 活页夹使用的配置选项。spring-doc.cn

有关与 Binder 相关的常见配置选项和属性,请参阅核心文档spring-doc.cn

1.3.1. Kafka Binder 属性

spring.cloud.stream.kafka.binder.brokers

Kafka Binder 连接到的代理列表。spring-doc.cn

违约:。localhostspring-doc.cn

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers允许指定带或不带端口信息的主机(例如,)。 这将在 broker 列表中未配置端口时设置默认端口。host1,host2:port2spring-doc.cn

违约:。9092spring-doc.cn

spring.cloud.stream.kafka.binder.configuration

传递给 Binder 创建的所有客户端的客户端属性(生产者和使用者)的键/值映射。 由于这些属性由生产者和使用者都使用,因此应仅限于通用属性(例如,安全性设置)。 通过此配置提供的未知 Kafka 生产者或使用者属性将被过滤掉,不允许传播。 这里的 properties 取代了 boot 中设置的任何 properties。spring-doc.cn

默认值:空地图。spring-doc.cn

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 客户端使用者属性的键/值映射。 除了支持已知的 Kafka 使用者属性外,这里还允许未知的使用者属性。 此处的 properties 取代在 boot 和 上述 property 中设置的任何属性。configurationspring-doc.cn

默认值:空地图。spring-doc.cn

spring.cloud.stream.kafka.binder.headers

由 Binder 传输的自定义标头的列表。 仅当与版本< 0.11.0.0 的旧版应用程序 (⇐ 1.3.x) 通信时才需要。较新版本本身支持标头。kafka-clientsspring-doc.cn

默认值:空。spring-doc.cn

spring.cloud.stream.kafka.binder.healthTimeout的

等待获取分区信息的时间,以秒为单位。 如果此计时器过期,则运行状况报告为 down。spring-doc.cn

默认值:10。spring-doc.cn

spring.cloud.stream.kafka.binder.requiredAcks

代理上所需的 ack 数。 请参阅 Kafka 文档以了解 producer 属性。acksspring-doc.cn

违约:。1spring-doc.cn

spring.cloud.stream.kafka.binder.minPartitionCount

仅当设置了 或 时有效。 Binder 在其生成或使用数据的主题上配置的全局最小分区数。 它可以被 producer 的设置或 producer 的 settings 值(如果其中任何一个更大)取代。autoCreateTopicsautoAddPartitionspartitionCountinstanceCount * concurrencyspring-doc.cn

违约:。1spring-doc.cn

spring.cloud.stream.kafka.binder.producerProperties

任意 Kafka 客户端生产者属性的键/值映射。 除了支持已知的 Kafka 生产者属性外,这里还允许未知的生产者属性。 此处的 properties 取代在 boot 和 上述 property 中设置的任何属性。configurationspring-doc.cn

默认值:空地图。spring-doc.cn

spring.cloud.stream.kafka.binder.replicationFactor

自动创建的主题的复制因子(如果处于活动状态)。 可以在每个绑定上覆盖。autoCreateTopicsspring-doc.cn

违约:。1spring-doc.cn

spring.cloud.stream.kafka.binder.autoCreate主题

如果设置为 ,则 Binder 会自动创建新主题。 如果设置为 ,则 Binder 依赖于已配置的主题。 在后一种情况下,如果主题不存在,则 Binder 无法启动。truefalsespring-doc.cn

此设置独立于 broker 的设置,不会影响 broker 的设置。 如果服务器设置为自动创建主题,则可以使用默认代理设置将它们作为元数据检索请求的一部分创建。auto.create.topics.enable

违约:。truespring-doc.cn

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为 ,则 Binders 会根据需要创建新分区。 如果设置为 ,则 Binder 依赖于已配置的主题的分区大小。 如果目标主题的分区计数小于预期值,则 Binder 无法启动。truefalsespring-doc.cn

违约:。falsespring-doc.cn

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

在 Binder 中启用事务。请参阅 Kafka 文档中的 Transactions 和文档中的 Transactions。 启用事务后,将忽略各个属性,并且所有生产者都使用这些属性。transaction.idspring-kafkaproducerspring.cloud.stream.kafka.binder.transaction.producer.*spring-doc.cn

默认 (无事务)nullspring-doc.cn

spring.cloud.stream.kafka.binder.transaction.producer.*

事务 Binder 中生成者的全局生产者属性。 请参阅 Kafka 生产者属性 以及所有 Binders 支持的常规生产者属性。spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixspring-doc.cn

默认值:请参阅各个生产者属性。spring-doc.cn

spring.cloud.stream.kafka.binder.headerMapperBeanName

用于将 Headers 与 Kafka Headers 进行 Map 的 bean 名称。 例如,如果您希望在对 Headers 使用 JSON 反序列化的 bean 中自定义受信任的包,请使用此选项。 如果使用此属性无法将此自定义 Bean 提供给 Binder,则 Binder 将查找名称为 类型的 Header映射器 Bean,然后再回退到 Binder 创建的默认值。KafkaHeaderMapperspring-messagingBinderHeaderMapperBinderHeaderMapperkafkaBinderHeaderMapperBinderHeaderMapperBinderHeaderMapperspring-doc.cn

默认值:none。spring-doc.cn

1.3.2. Kafka Consumer 属性

为避免重复,Spring Cloud Stream 支持以 .spring.cloud.stream.kafka.default.consumer.<property>=<value>

以下属性仅适用于 Kafka 使用者,并且 必须以 为前缀。spring.cloud.stream.kafka.bindings.<channelName>.consumer.spring-doc.cn

admin.configuration 的

从版本 2.1.1 开始,此属性已被弃用,取而代之的是 ,并且在将来的版本中将删除对它的支持。topic.propertiesspring-doc.cn

admin.replicas-分配

从版本 2.1.1 开始,此属性已被弃用,取而代之的是 ,并且在将来的版本中将删除对它的支持。topic.replicas-assignmentspring-doc.cn

admin.replication-factor (管理员复制因子)

从版本 2.1.1 开始,此属性已被弃用,取而代之的是 ,并且在将来的版本中将删除对它的支持。topic.replication-factorspring-doc.cn

autoRebalance已启用

当 时,topic partitions 会在使用者组的成员之间自动重新平衡。 当 时,将为每个使用者分配一组基于 和 的固定分区。 这需要在每个启动的实例上适当设置 和 属性。 在这种情况下,属性的值通常必须大于 1。truefalsespring.cloud.stream.instanceCountspring.cloud.stream.instanceIndexspring.cloud.stream.instanceCountspring.cloud.stream.instanceIndexspring.cloud.stream.instanceCountspring-doc.cn

违约:。truespring-doc.cn

ackEachRecord 的

When is ,此设置指示在处理每条记录后是否提交偏移量。 默认情况下,在处理完 返回的 batch of records 中的所有记录后,将提交偏移量。 轮询返回的记录数可以通过 Kafka 属性进行控制,该属性通过 consumer 属性进行设置。 将此项设置为可能会导致性能下降,但这样做会降低发生故障时重新传送记录的可能性。 此外,请参阅 binder 属性,该属性也会影响提交偏移量的性能。autoCommitOffsettrueconsumer.poll()max.poll.recordsconfigurationtruerequiredAcksspring-doc.cn

违约:。falsespring-doc.cn

autoCommitOffset

是否在处理消息时自动提交偏移量。 如果设置为 ,则入站消息中存在具有 header 类型键的 Header。 应用程序可以使用此标头来确认消息。 有关详细信息,请参阅 examples 部分。 当此属性设置为 时,Kafka Binder 会将 ack 模式设置为 ,并且应用程序负责确认记录。 另请参阅 。falsekafka_acknowledgmentorg.springframework.kafka.support.Acknowledgmentfalseorg.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUALackEachRecordspring-doc.cn

违约:。truespring-doc.cn

autoCommitOnError

仅当设置为 时有效。 如果设置为 ,则禁止对导致错误的邮件进行自动提交,并仅对成功的邮件进行提交。它允许流在持续失败的情况下自动从上次成功处理的消息重播。 如果设置为 ,则始终自动提交(如果启用了自动提交)。 如果未设置(默认值),则它实际上具有相同的值,如果错误消息被发送到 DLQ,则自动提交错误消息,否则不提交错误消息。autoCommitOffsettruefalsetrueenableDlqspring-doc.cn

默认值:未设置。spring-doc.cn

resetOffsets

是否将 Consumer 的 Offset 重置为 startOffset 提供的值。 如果提供了 a,则必须为 false;参见使用 KafkaRebalanceListener。KafkaRebalanceListenerspring-doc.cn

违约:。falsespring-doc.cn

startOffset

新组的起始偏移量。 允许的值: 和 . 如果为使用者 'binding' (through) 显式设置了使用者组,则 'startOffset' 设置为 。否则,它将设置为使用者组。 另请参阅 (此列表的前面)。earliestlatestspring.cloud.stream.bindings.<channelName>.groupearliestlatestanonymousresetOffsetsspring-doc.cn

默认值:null(相当于 )。earliestspring-doc.cn

enableDlq

当设置为 true 时,它将为使用者启用 DLQ 行为。 默认情况下,导致错误的消息将转发到名为 . DLQ 主题名称可以通过设置属性来配置。 这为更常见的 Kafka 重放场景提供了一个替代选项,用于错误数量相对较少并且重放整个原始主题可能太麻烦的情况。 有关更多信息,请参阅 死信主题处理 。 从版本 2.0 开始,发送到 DLQ 主题的消息通过以下标头进行增强:、 和 as 。 默认情况下,失败的记录将发送到 DLQ 主题中与原始记录相同的分区号。 有关如何更改该行为,请参阅 Dead-Letter Topic Partition SelectiondestinationIsPatterntrue 时不允许。error.<destination>.<group>dlqNamex-original-topicx-exception-messagex-exception-stacktracebyte[]spring-doc.cn

违约:。falsespring-doc.cn

dlq分区

如果为 true,并且未设置此属性,则会创建一个分区数与主主题相同数量的死信主题。 通常,死信记录会发送到与原始记录相同的死信主题中的分区。 此行为可以更改;请参阅 Dead-Letter Topic Partition Selection。 如果此属性设置为且没有 bean,则所有死信记录都将写入 partition 。 如果此属性大于 ,则必须提供 bean。 请注意,实际分区计数受 Binder 属性的影响。enableDlq1DqlPartitionFunction01DlqPartitionFunctionminPartitionCountspring-doc.cn

违约:nonespring-doc.cn

配置

使用包含通用 Kafka 使用者属性的键/值对进行映射。 除了具有 Kafka 使用者属性之外,还可以在此处传递其他配置属性。 例如,应用程序所需的一些属性,如 .spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=barspring-doc.cn

默认值:空地图。spring-doc.cn

dlq名称

用于接收错误消息的 DLQ 主题的名称。spring-doc.cn

默认值:null(如果未指定,则导致错误的邮件将转发到名为 ) 的主题。error.<destination>.<group>spring-doc.cn

dlqProducerProperties

使用此功能,可以设置特定于 DLQ 的创建者属性。 通过 kafka producer 属性提供的所有属性都可以通过此属性进行设置。 当 consumer 上启用了原生解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。 这必须以 和 的形式提供。dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializerspring-doc.cn

默认值:默认 Kafka 生产者属性。spring-doc.cn

standardHeaders 的

指示入站通道适配器填充哪些标准头。 允许的值: 、 、 或 . 如果使用本机反序列化并且接收消息的第一个组件需要一个(例如配置为使用 JDBC 消息存储的聚合器),则很有用。noneidtimestampbothidspring-doc.cn

违约:nonespring-doc.cn

converterBean名称

实现 .在入站通道适配器中用于替换默认的 .RecordMessageConverterMessagingMessageConverterspring-doc.cn

违约:nullspring-doc.cn

idleEventInterval

指示最近未收到任何消息的事件之间的时间间隔(以毫秒为单位)。 使用 an 接收这些事件。 有关使用示例,请参阅示例:暂停和恢复使用者ApplicationListener<ListenerContainerIdleEvent>spring-doc.cn

违约:30000spring-doc.cn

destinationIsPattern

如果为 true,则目标将被视为代理用于匹配主题名称的正则表达式。 如果为 true,则不会预置主题,并且不允许使用主题,因为 Binder 在预置阶段不知道主题名称。 请注意,检测与模式匹配的新主题所花费的时间由 consumer 属性控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。 这可以使用上面的属性进行配置。PatternenableDlqmetadata.max.age.msconfigurationspring-doc.cn

违约:falsespring-doc.cn

topic.properties

预置新主题时使用的 Kafka 主题属性之一,例如Mapspring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0spring-doc.cn

默认值:none。spring-doc.cn

topic.replicas-assignment

副本分配的 Map<Integer、List<Integer>>,键是分区,值是分配。 在预置新主题时使用。 请参阅 jar 中的 Javadocs。NewTopickafka-clientsspring-doc.cn

默认值:none。spring-doc.cn

topic.replication-factor

预置主题时要使用的复制因子。覆盖 binder 范围的设置。 如果存在,则忽略。replicas-assignmentsspring-doc.cn

默认值:none (使用 binder 范围的默认值 1)。spring-doc.cn

轮询超时

用于轮询使用者中的轮询的超时。spring-doc.cn

默认值:5 秒。spring-doc.cn

事务管理器

用于覆盖此绑定的 Binder 事务管理器的 Bean 名称。 如果要将另一个事务与 Kafka 事务同步,通常需要使用 . 要实现记录的 Just Once 使用和生成,Consumer 和 Producer 绑定都必须使用相同的事务管理器进行配置。KafkaAwareTransactionManagerChainedKafkaTransactionManaagerspring-doc.cn

默认值:none。spring-doc.cn

1.3.3. 消费 Batch

从版本 3.0 开始,当设置为 时,轮询 Kafka 收到的所有记录都将作为 to 侦听器方法显示。 否则,将一次使用一条记录调用该方法。 批处理的大小由 Kafka 使用者属性 , , ;有关更多信息,请参阅 Kafka 文档。spring.cloud.stream.binding.<name>.consumer.batch-modetrueConsumerList<?>max.poll.recordsmin.fetch.bytesfetch.max.wait.msspring-doc.cn

使用批处理模式时,不支持在 Binder 中重试,因此将被覆盖为 1。 您可以配置 (使用 ) 以实现类似的功能,以便在 Binder 中重试。 您还可以使用 manual 和 call 来提交部分批处理的偏移量,并重新交付其余记录。 有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档maxAttemptsSeekToCurrentBatchErrorHandlerListenerContainerCustomizerAckModeAckowledgment.nack(index, sleep)

1.3.4. Kafka 生产者属性

为避免重复,Spring Cloud Stream 支持以 .spring.cloud.stream.kafka.default.producer.<property>=<value>

以下属性仅适用于 Kafka 生产者,并且 必须以 为前缀。spring.cloud.stream.kafka.bindings.<channelName>.producer.spring-doc.cn

admin.configuration 的

从版本 2.1.1 开始,此属性已被弃用,取而代之的是 ,并且在将来的版本中将删除对它的支持。topic.propertiesspring-doc.cn

admin.replicas-分配

从版本 2.1.1 开始,此属性已被弃用,取而代之的是 ,并且在将来的版本中将删除对它的支持。topic.replicas-assignmentspring-doc.cn

admin.replication-factor (管理员复制因子)

从版本 2.1.1 开始,此属性已被弃用,取而代之的是 ,并且在将来的版本中将删除对它的支持。topic.replication-factorspring-doc.cn

缓冲区大小

Kafka 创建者在发送之前尝试批处理的数据量的上限(以字节为单位)。spring-doc.cn

违约:。16384spring-doc.cn

同步

生产者是否同步。spring-doc.cn

违约:。falsespring-doc.cn

发送超时表达式

根据传出消息评估的 SpEL 表达式,用于评估启用同步发布时等待确认的时间,例如。 超时的值以毫秒为单位。 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用 . 现在,在转换有效负载之前计算表达式。headers['mySendTimeout']byte[]spring-doc.cn

违约:。nonespring-doc.cn

batch超时

在发送消息之前,创建者等待多长时间以允许更多消息在同一批次中累积。 (通常,创建者根本不等待,而只是发送在上一次发送过程中累积的所有消息。非零值可能会以延迟为代价增加吞吐量。spring-doc.cn

违约:。0spring-doc.cn

messageKey表达式

根据用于填充生成的 Kafka 消息的键的传出消息计算的 SPEL 表达式,例如 . 在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用 . 现在,在转换有效负载之前计算表达式。headers['myKey']byte[]spring-doc.cn

违约:。nonespring-doc.cn

headerPatterns 的

一个以逗号分隔的简单模式列表,用于匹配要映射到 . 模式可以以通配符(星号)开头或结尾。 模式可以通过前缀 来否定。 匹配在第一个匹配项 (正或负) 之后停止。 例如,将传递但不会传递。 并且永远不会被映射。HeadersProducerRecord!!ask,as*ashaskidtimestampspring-doc.cn

默认值:(所有标头 - 除了 和*idtimestamp)spring-doc.cn

配置

Map 包含通用 Kafka 生产者属性的键/值对。spring-doc.cn

默认值:空地图。spring-doc.cn

topic.properties

预置新主题时使用的 Kafka 主题属性之一,例如Mapspring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0spring-doc.cn

topic.replicas-assignment

副本分配的 Map<Integer、List<Integer>>,键是分区,值是分配。 在预置新主题时使用。 请参阅 jar 中的 Javadocs。NewTopickafka-clientsspring-doc.cn

默认值:none。spring-doc.cn

topic.replication-factor

预置主题时要使用的复制因子。覆盖 binder 范围的设置。 如果存在,则忽略。replicas-assignmentsspring-doc.cn

默认值:none (使用 binder 范围的默认值 1)。spring-doc.cn

useTopicHeader

设置为 to 以使用出站消息中消息标头的值覆盖默认绑定目标(主题名称)。 如果标头不存在,则使用默认绑定目标。 违约:。trueKafkaHeaders.TOPICfalsespring-doc.cn

recordMetadataChannel

成功发送结果应发送到的 bean 名称;该 Bean 必须存在于应用程序上下文中。 发送到通道的消息是已发送的消息(转换后,如果有),带有一个附加 Headers 。 标头包含 Kafka 客户端提供的对象;它包括主题中写入记录的 partition 和 offset。MessageChannelKafkaHeaders.RECORD_METADATARecordMetadataspring-doc.cn

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)spring-doc.cn

失败的发送将转到 producer 错误通道(如果已配置);请参见错误通道。 默认值:nullspring-doc.cn

Kafka Binders 使用创建者的设置作为提示来创建具有给定分区计数的主题(与 ,两者中的最大值是正在使用的值)。 在为 Binder 和应用程序配置时要小心,因为会使用较大的值。 如果已存在分区计数较小的主题,并且已禁用(默认值),则 Binder 无法启动。 如果已存在分区计数较小的主题并已启用,则会添加新分区。 如果已存在分区数大于最大值 ( 或 ) 的主题,则使用现有分区计数。partitionCountminPartitionCountminPartitionCountpartitionCountautoAddPartitionsautoAddPartitionsminPartitionCountpartitionCount
压缩

设置 producer 属性。 支持的值为 、 和 。 如果您将 jar 覆盖到 2.1.0(或更高版本),如 Spring for Apache Kafka 文档中所述,并希望使用压缩,请使用 .compression.typenonegzipsnappylz4kafka-clientszstdspring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstdspring-doc.cn

违约:。nonespring-doc.cn

事务管理器

用于覆盖此绑定的 Binder 事务管理器的 Bean 名称。 如果要将另一个事务与 Kafka 事务同步,通常需要使用 . 要实现记录的 Just Once 使用和生成,Consumer 和 Producer 绑定都必须使用相同的事务管理器进行配置。KafkaAwareTransactionManagerChainedKafkaTransactionManaagerspring-doc.cn

默认值:none。spring-doc.cn

1.3.5. 使用示例

在本节中,我们将展示上述属性在特定情况下的用法。spring-doc.cn

示例:将 autoCommitOffset 设置为 false 并依赖手动确认

此示例说明了如何在使用者应用程序中手动确认偏移量。spring-doc.cn

此示例要求 设置为 。 使用相应的 input channel name 作为示例。spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffsetfalsespring-doc.cn

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}
示例:安全配置

Apache Kafka 0.9 支持客户端和代理之间的安全连接。 要利用此功能,请遵循 Apache Kafka 文档中的准则以及 Confluent 文档中的 Kafka 0.9 安全准则。 使用该选项可为 Binder 创建的所有客户端设置安全属性。spring.cloud.stream.kafka.binder.configurationspring-doc.cn

例如,要设置为 ,请设置以下属性:security.protocolSASL_SSLspring-doc.cn

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

所有其他安全属性都可以以类似的方式设置。spring-doc.cn

使用 Kerberos 时,请按照参考文档中的说明创建和引用 JAAS 配置。spring-doc.cn

Spring Cloud Stream 支持通过使用 JAAS 配置文件和使用 Spring Boot 属性将 JAAS 配置信息传递给应用程序。spring-doc.cn

使用 JAAS 配置文件

可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。 以下示例显示了如何使用 JAAS 配置文件启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:spring-doc.cn

 java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性

作为拥有 JAAS 配置文件的替代方法, Spring Cloud Stream 提供了一种机制,用于使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置。spring-doc.cn

以下属性可用于配置 Kafka 客户端的登录上下文:spring-doc.cn

spring.cloud.stream.kafka.binder.jaas.loginModule

登录模块名称。在正常情况下不需要设置。spring-doc.cn

违约:。com.sun.security.auth.module.Krb5LoginModulespring-doc.cn

spring.cloud.stream.kafka.binder.jaas.controlFlag

登录模块的控制标志。spring-doc.cn

违约:。requiredspring-doc.cn

spring.cloud.stream.kafka.binder.jaas.options

使用包含登录模块选项的键/值对进行映射。spring-doc.cn

默认值:空地图。spring-doc.cn

以下示例显示了如何使用 Spring Boot 配置属性启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:spring-doc.cn

 java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的示例表示以下 JAAS 文件的等效项:spring-doc.cn

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[email protected]";
};

如果代理上已存在所需的主题或将由管理员创建,则可以关闭自动创建,而只需要发送客户机 JAAS 属性。spring-doc.cn

不要在同一应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。 如果 system 属性已经存在,则 Spring Cloud Stream 将忽略 Spring Boot 属性。-Djava.security.auth.login.config
将 和 与 Kerberos 一起使用时要小心。 通常,应用程序可能会使用在 Kafka 和 Zookeeper 中没有管理权限的委托人。 因此,依赖 Spring Cloud Stream 创建/修改 Topic 可能会失败。 在安全环境中,我们强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。autoCreateTopicsautoAddPartitions
示例:暂停和恢复使用者

如果您希望暂停消费但不会导致分区重新平衡,则可以暂停和恢复使用者。 通过将 as 参数添加到您的 . 要恢复,您需要一个 for instances。 事件的发布频率由属性控制。 由于使用者不是线程安全的,因此必须在调用线程上调用这些方法。Consumer@StreamListenerApplicationListenerListenerContainerIdleEventidleEventIntervalspring-doc.cn

以下简单应用程序演示如何暂停和恢复:spring-doc.cn

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
		System.out.println(in);
		consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
	}

	@Bean
	public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
		return event -> {
			System.out.println(event);
			if (event.getConsumer().paused().size() > 0) {
				event.getConsumer().resume(event.getConsumer().paused());
			}
		};
	}

}

1.4. 事务性 Binder

通过设置为非空值来启用事务,例如 . 在处理器应用程序中使用时,使用者启动事务;在 Consumer 线程上发送的任何记录都参与同一个事务。 当监听器正常退出时,监听器容器会将偏移量发送给事务并提交。 公共 producer 工厂用于使用 properties 配置的所有 producer 绑定;将忽略单个绑定 Kafka 生产者属性。spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixtx-spring.cloud.stream.kafka.binder.transaction.producer.*spring-doc.cn

事务不支持正常的 Binder 重试(和死信),因为重试将在原始事务中运行,该事务可能会回滚,并且任何已发布的记录也将被回滚。 启用重试后(common 属性大于零),重试属性用于配置 a 以在容器级别启用重试。 同样,此功能不是在事务中发布死信记录,而是移动到侦听器容器,再次通过在主事务回滚后运行。maxAttemptsDefaultAfterRollbackProcessorDefaultAfterRollbackProcessor

如果您希望在源应用程序中使用事务,或者从某个任意线程中为仅限生产者事务(例如 方法),你必须获取对事务性生产者工厂的引用,并使用它定义一个 Bean。@ScheduledKafkaTransactionManagerspring-doc.cn

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

请注意,我们使用 ;当只配置了一个 Binder 时,在第一个参数中使用。 如果配置了多个 Binder,请使用 Binder 名称获取引用。 一旦我们有了对 binder 的引用,我们就可以获取对 the 的引用并创建一个事务管理器。BinderFactorynullProducerFactoryspring-doc.cn

然后你会使用普通的 Spring 事务支持,例如 或 ,例如:TransactionTemplate@Transactionalspring-doc.cn

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果您希望将仅限生产者的事务与来自其他事务管理器的事务同步,请使用 .ChainedTransactionManagerspring-doc.cn

如果部署应用程序的多个实例,则每个实例都需要一个唯一的 .transactionIdPrefix

1.5. 错误通道

从版本 1.3 开始,Binder 无条件地将异常发送到每个 Consumer 目标的错误通道,并且还可以配置为将异步 producer send failures发送到 error 通道。 有关更多信息,请参见[spring-cloud-stream-overview-error-handling]。spring-doc.cn

的有效负载 for a send failure 是 a 具有属性:ErrorMessageKafkaSendFailureExceptionspring-doc.cn

不会自动处理生产者异常(例如发送到死信队列)。 您可以在自己的 Spring 集成流中使用这些异常。spring-doc.cn

1.6. Kafka 指标

Kafka Binder 模块公开了以下指标:spring-doc.cn

spring.cloud.stream.binder.kafka.offset:此指标表示给定使用者组尚未从给定 Binder 的主题中使用多少条消息。 提供的指标基于 Mircometer 指标库。该指标包含使用者组信息、主题以及已提交偏移量与主题上最新偏移量的实际滞后。 此指标对于向 PaaS 平台提供自动扩展反馈特别有用。spring-doc.cn

1.7. Tombstone Records(空记录值)

使用压缩主题时,具有值的记录(也称为逻辑删除记录)表示删除键。 要在方法中接收此类消息,必须将参数标记为不需要才能接收 value 参数。null@StreamListenernullspring-doc.cn

@StreamListener(Sink.INPUT)
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
               @Payload(required = false) Customer customer) {
    // customer is null if a tombstone record
    ...
}

1.8. 使用 KafkaRebalanceListener

应用程序可能希望在最初分配分区时将主题/分区查找到任意偏移量,或在消费者上执行其他操作。 从版本 2.1 开始,如果在应用程序上下文中提供单个 bean,它将被连接到所有 Kafka 使用者绑定中。KafkaRebalanceListenerspring-doc.cn

public interface KafkaBindingRebalanceListener {

	/**
	 * Invoked by the container before any pending offsets are committed.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 */
	default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions) {

	}

	/**
	 * Invoked by the container after any pending offsets are committed.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 */
	default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {

	}

	/**
	 * Invoked when partitions are initially assigned or after a rebalance.
	 * Applications might only want to perform seek operations on an initial assignment.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
			boolean initial) {

	}

}

不能将 consumer 属性设置为在提供再平衡侦听器时。resetOffsetstruespring-doc.cn

1.9. 死信主题处理

1.9.1. 死信主题分区选择

默认情况下,使用与原始记录相同的分区将记录发布到 Dead-Letter 主题。 这意味着 Dead-Letter 主题必须至少具有与原始记录一样多的分区。spring-doc.cn

要更改此行为,请将 implementation 作为 添加到 application context。 只能存在一个这样的 bean。 该函数随 consumer group、failed 和 exception 一起提供。 例如,如果您始终希望路由到分区 0,则可以使用:DlqPartitionFunction@BeanConsumerRecordspring-doc.cn

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果将使用者绑定的属性设置为 1 (并且 Binder 的 is equal to ),则无需提供 ;框架将始终使用分区 0。 如果将使用者绑定的属性设置为大于(或 Binder 的 is greater than)的值,则必须提供 bean,即使分区计数与原始主题的分区计数相同。dlqPartitionsminPartitionCount1DlqPartitionFunctiondlqPartitions1minPartitionCount1DlqPartitionFunction

1.9.2. 处理死信主题中的记录

由于框架无法预测用户希望如何处理死信消息,因此它不提供任何标准机制来处理它们。 如果死信的原因是暂时的,您可能希望将消息路由回原始主题。 但是,如果问题是永久性问题,则可能会导致无限循环。 本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但它在三次尝试后将它们移动到 “parking lot” 主题。 该应用程序是另一个spring-cloud-stream应用程序,它从死信主题中读取。 当 5 秒内未收到任何消息时,它将终止。spring-doc.cn

这些示例假定原始目标是 ,使用者组是 。so8400outso8400spring-doc.cn

有几种策略需要考虑:spring-doc.cn

  • 请考虑仅在主应用程序未运行时运行 rerouting。 否则,暂时性错误的重试将很快用完。spring-doc.cn

  • 或者,使用两阶段方法:使用此应用程序路由到第三个主题,使用另一个应用程序从那里路由回主要主题。spring-doc.cn

以下代码清单显示了示例应用程序:spring-doc.cn

application.properties
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries
应用
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, terminating");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}

1.10. 使用 Kafka Binder 进行分区

Apache Kafka 原生支持主题分区。spring-doc.cn

有时,将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时(特定客户的所有消息都应发送到同一分区)。spring-doc.cn

以下示例显示了如何配置生产者和使用者端:spring-doc.cn

@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12
必须预置主题以具有足够的分区,以实现所有使用者组所需的并发性。 上述配置最多支持 12 个 Consumer 实例(如果有 2 个则为 6 个,如果并发数为 3 则为 4 个,依此类推)。 通常,最好 “过度预置” 分区,以允许将来使用者或并发性的增加。concurrency
前面的配置使用默认的分区 ()。 这可能会也可能不会提供适当平衡的算法,具体取决于键值。 您可以使用 or 属性覆盖此默认值。key.hashCode() % partitionCountpartitionSelectorExpressionpartitionSelectorClass

由于分区由 Kafka 原生处理,因此消费者端不需要特殊配置。 Kafka 跨实例分配分区。spring-doc.cn

Spring Spring Boot 应用程序侦听 Kafka 流并打印(到控制台)每条消息发送到的分区 ID:spring-doc.cn

@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(in + " received from partition " + partition);
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: partitioned.topic
          group: myGroup

您可以根据需要添加实例。 Kafka 重新平衡分区分配。 如果实例计数 (或 ) 超过分区数,则某些消费者处于空闲状态。instance count * concurrencyspring-doc.cn

2. Kafka Streams 活页夹

2.1. 用法

要使用 Kafka Streams Binders,您只需使用以下 maven 坐标将其添加到 Spring Cloud Stream 应用程序中:spring-doc.cn

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

为 Kafka Streams Binder 引导新项目的一种快速方法是使用 Spring Initializr,然后选择“Cloud Streams”和“Spring for Kafka Streams”,如下所示spring-doc.cn

spring initializr kafka streams

2.2. 概述

Spring Cloud Stream 包括一个专门为 Apache Kafka Streams 绑定设计的 Binder 实现。 通过这种原生集成, Spring Cloud Stream“处理器”应用程序可以直接在核心业务逻辑中使用 Apache Kafka Streams API。spring-doc.cn

Kafka Streams Binder 实现建立在 Spring for Apache Kafka 项目提供的基础之上。spring-doc.cn

Kafka Streams Binder 为 Kafka Streams 中的三种主要类型(和 )提供了绑定功能。KStreamKTableGlobalKTablespring-doc.cn

Kafka Streams 应用程序通常遵循一个模型,在该模型中,从入站主题中读取记录,应用业务逻辑,然后将转换后的记录写入出站主题。 或者,也可以定义没有出站目标的 Processor 应用程序。spring-doc.cn

在以下部分中,我们将了解 Spring Cloud Stream 与 Kafka Streams 集成的详细信息。spring-doc.cn

2.3. 编程模型

当使用 Kafka Streams Binder 提供的编程模型时,高级 Streams DSL 以及更高级别和较低级别 Processor-API 的混合都可以用作选项。 当混合使用更高级别和较低级别的 API 时,这通常是通过调用 或 API 方法来实现的。transformprocessKStreamspring-doc.cn

2.3.1. 函数式风格

从 Spring Cloud Stream 开始,Kafka Streams Binder 允许使用 Java 8 中提供的函数式编程样式来设计和开发应用程序。 这意味着应用程序可以简洁地表示为类型为 或 的 lambda 表达式。3.0.0java.util.function.Functionjava.util.function.Consumerspring-doc.cn

让我们举一个非常基本的例子。spring-doc.cn

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

尽管简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。 这是一个没有出站绑定,只有一个入站绑定的使用者应用程序。 应用程序使用数据,它只是在标准输出上记录 key 和 value 中的信息。 该应用程序包含批注和标记为 . Bean 方法的类型是使用 进行参数化的。 然后,在实现中,我们返回一个 Consumer 对象,它本质上是一个 lambda 表达式。 在 lambda 表达式中,提供了用于处理数据的代码。KStreamSpringBootApplicationBeanjava.util.function.ConsumerKStreamspring-doc.cn

在此应用程序中,有一个类型为 的输入绑定。 Binders 使用 name 为应用程序创建此绑定,即函数 Bean 名称的名称后跟一个短划线字符 () 和文字,后跟另一个短划线,然后是参数的序号位置。 您可以使用此绑定名称来设置其他属性,例如 destination。 例如。KStreamprocess-in-0-inspring.cloud.stream.bindings.process-in-0.destination=my-topicspring-doc.cn

如果未在绑定上设置 destination 属性,则会创建一个与绑定同名的主题(如果应用程序有足够的权限),或者该主题应该已经可用。

一旦构建为 uber-jar(例如 ),您就可以运行上面的示例,如下所示。kstream-consumer-app.jarspring-doc.cn

java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

这是另一个示例,其中它是一个同时具有输入和输出绑定的完整处理器。 这是一个经典的字数统计示例,其中应用程序从主题接收数据,然后在滚动时间窗口中计算每个单词的出现次数。spring-doc.cn

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

同样,这是一个完整的 Spring Boot 应用程序。这里与第一个应用程序的不同之处在于 bean 方法的类型为 . 的第一个参数化类型用于输入,第二个参数化类型用于输出。 在方法主体中,提供了一个 lambda 表达式,该表达式的类型为实现,并给出了实际的业务逻辑。 与前面讨论的基于 Consumer 的应用程序类似,此处的 input 绑定命名为 by default。对于输出,绑定名称也会自动设置为 .java.util.function.FunctionFunctionKStreamFunctionprocess-in-0process-out-0spring-doc.cn

一旦构建为 uber-jar(例如 ),您就可以运行上面的示例,如下所示。wordcount-processor.jarspring-doc.cn

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

此应用程序将使用来自 Kafka 主题的消息,并将计算结果发布到输出 主题。wordscountsspring-doc.cn

Spring Cloud Stream 将确保来自传入和传出主题的消息都自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写逻辑 在处理器中是必需的。设置 Kafka Streams 基础架构所需的 Kafka Streams 特定配置 由框架自动处理。spring-doc.cn

我们上面看到的两个示例具有单个 input binding。在这两种情况下,绑定都接收来自单个主题的记录。 如果要将多个主题多路复用到单个绑定中,可以在下面提供逗号分隔的 Kafka 主题作为目标。KStreamKStreamspring-doc.cn

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3spring-doc.cn

此外,如果要将主题与常规 exression 匹配,还可以提供主题模式作为目标。spring-doc.cn

spring.cloud.stream.bindings.process-in-0.destination=input.*spring-doc.cn

多个输入绑定

许多重要的 Kafka Streams 应用程序经常通过多个绑定使用来自多个主题的数据。 例如,一个主题被消费为 ,另一个主题被消费为 or 。 应用程序可能希望以表类型接收数据的原因有很多。 考虑一个用例,其中底层主题是通过数据库中的变更数据捕获 (CDC) 机制填充的,或者应用程序可能只关心下游处理的最新更新。 如果应用程序指定需要将数据绑定为 or ,则 Kafka Streams Binder 会将目标正确绑定到 or,并使它们可供应用程序操作。 我们将了解如何在 Kafka Streams Binder 中处理多个输入绑定的几种不同场景。KstreamKTableGlobalKTableKTableGlobalKTableKTableGlobalKTablespring-doc.cn

Kafka Streams Binder 中的 BiFunction

下面是一个示例,其中有两个 inputs 和一个 output。在这种情况下,应用程序可以利用 。java.util.function.BiFunctionspring-doc.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

在这里,基本主题与前面的示例相同,但这里我们有两个输入。 Java 的支持用于将输入绑定到所需的目标。 Binder 为 Inputs 生成的默认绑定名称分别为 和 。默认输出绑定为 . 在此示例中,第一个参数 of 绑定为第一个输入的 a,第二个参数绑定为第二个输入的 a。BiFunctionprocess-in-0process-in-1process-out-0BiFunctionKStreamKTablespring-doc.cn

Kafka Streams Binder 中的 BiConsumer

如果有两个 inputs,但没有 outputs,在这种情况下,我们可以使用如下所示。java.util.function.BiConsumerspring-doc.cn

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}
超出两个输入

如果您有两个以上的输入怎么办? 在某些情况下,您需要两个以上的输入。在这种情况下,Binders 允许您链接部分函数。 在函数式编程术语中,这种技术通常称为柯里化。 随着 Java 8 中添加了函数式编程支持,Java 现在允许您编写柯里化函数。 Spring Cloud Stream Kafka Streams Binder 可以利用此功能来启用多个 Importing 绑定。spring-doc.cn

让我们看一个例子。spring-doc.cn

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

让我们看看上面介绍的绑定模型的详细信息。 在这个模型中,我们在入站上有 3 个部分应用的函数。我们把它们称为 和 。 如果我们在真正的数学函数的意义上扩展这些函数,它将如下所示: . 变量代表 ,变量代表 ,变量代表 。 第一个函数具有应用程序 () 的第一个输入绑定,其输出是函数 f(y)。 该函数具有应用程序 () 的第二个输入绑定,其输出是另一个函数 。 该函数的输入是应用程序 () 的第三个输入,其输出是应用程序的最终输出绑定。 来自三个部分函数(分别是 、 、)的输入在方法主体中可供您使用,用于将业务逻辑作为 lambda 表达式的一部分实施。f(x)f(y)f(z)f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>xKStream<Long, Order>yGlobalKTable<Long, Customer>zGlobalKTable<Long, Product>f(x)KStream<Long, Order>f(y)GlobalKTable<Long, Customer>f(z)f(z)GlobalKTable<Long, Product>KStream<Long, EnrichedOrder>KStreamGlobalKTableGlobalKTablespring-doc.cn

输入绑定分别命名为 和 。输出绑定命名为 。enrichOrder-in-0enrichOrder-in-1enrichOrder-in-2enrichOrder-out-0spring-doc.cn

使用柯里化函数,您几乎可以拥有任意数量的 Importing。但是,请记住,在 Java 中,除了较少数量的输入和部分应用的函数之外,任何其他内容都可能导致代码不可读。 因此,如果您的 Kafka Streams 应用程序需要的输入绑定数量超过合理较少的数量,并且您希望使用此函数模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。spring-doc.cn

多个输出绑定

Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。 使用多个输出绑定时,您需要提供 KStream () 数组作为出站返回类型。KStream[]spring-doc.cn

下面是一个示例:spring-doc.cn

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> input
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, value) -> value)
            .windowedBy(TimeWindows.of(5000))
            .count(Materialized.as("WordCounts-branch"))
            .toStream()
            .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                    new Date(key.window().start()), new Date(key.window().end()))))
            .branch(isEnglish, isFrench, isSpanish);
}

编程模型保持不变,但出站参数化类型为 。 默认输出绑定名称分别为 、 、 。 Binders 生成三个 output binding 的原因是因为它检测返回的数组的长度。KStream[]process-out-0process-out-1process-out-2KStreamspring-doc.cn

Kafka Streams 的基于函数的编程样式摘要

总之,下表显示了可在函数范例中使用的各种选项。spring-doc.cn

输入数量 输出数量 要使用的组件

1spring-doc.cn

0spring-doc.cn

java.util.function.Consumerspring-doc.cn

2spring-doc.cn

0spring-doc.cn

java.util.function.BiConsumerspring-doc.cn

1spring-doc.cn

1..nspring-doc.cn

java.util.function.Functionspring-doc.cn

2spring-doc.cn

1..nspring-doc.cn

java.util.function.BiFunctionspring-doc.cn

>= 3spring-doc.cn

0..nspring-doc.cn

使用柯里化函数spring-doc.cn

  • 如果此表中有多个输出,则类型将变为 。KStream[]spring-doc.cn

2.3.2. 命令式编程模型。

尽管上面概述的函数式编程模型是首选方法,但如果您愿意,您仍然可以使用基于经典的方法。StreamListenerspring-doc.cn

以下是一些示例。spring-doc.cn

以下是使用 的字数统计示例的等效示例。StreamListenerspring-doc.cn

@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {

    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<?, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }

    public static void main(String[] args) {
        SpringApplication.run(WordCountProcessorApplication.class, args);
    }

如你所见,这有点冗长,因为你需要提供和其他额外的注释,比如 and 才能使其成为一个完整的应用程序。 是指定包含绑定的 Binding 接口的位置。 在本例中,我们使用的是具有以下 Contract 的 stock binding 接口。EnableBindingStreamListenerSendToEnableBindingKafkaStreamsProcessorspring-doc.cn

public interface KafkaStreamsProcessor {

	@Input("input")
	KStream<?, ?> input();

	@Output("output")
	KStream<?, ?> output();

}

Binder 将为输入和输出创建绑定,因为你正在使用包含这些声明的绑定接口。KStreamKStreamspring-doc.cn

除了函数式样式中提供的编程模型的明显差异之外,这里需要提到的一点是,绑定名称是您在绑定接口中指定的名称。 例如,在上面的应用程序中,由于我们使用的是 ,因此绑定名称是 和 。 Binding 属性需要使用这些名称。例如 ,等等。 请记住,这与函数式样式有根本的不同,因为 Binders 会为应用程序生成绑定名称。 这是因为应用程序没有在功能模型中使用 .KafkaStreamsProcessorinputoutputspring.cloud.stream.bindings.input.destinationspring.cloud.stream.bindings.output.destinationEnableBindingspring-doc.cn

这是另一个 sink 示例,其中有两个 inputs。spring-doc.cn

@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
                    @Input("inputTable") KTable<Long, Song> songTable) {
                    ....
                    ....
}

interface KStreamKTableBinding {

    @Input("inputStream")
    KStream<?, ?> inputStream();

    @Input("inputTable")
    KTable<?, ?> inputTable();
}

以下是我们上面看到的相同基于处理器的等效项。StreamListenerBiFunctionspring-doc.cn

@EnableBinding(KStreamKTableBinding.class)
....
....

@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
                                     @Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}

interface KStreamKTableBinding extends KafkaStreamsProcessor {

    @Input("inputX")
    KTable<?, ?> inputTable();
}

最后,这里相当于具有三个 inputs 和 curried 函数的应用程序。StreamListenerspring-doc.cn

@EnableBinding(CustomGlobalKTableProcessor.class)
...
...
    @StreamListener
    @SendTo("output")
    public KStream<Long, EnrichedOrder> process(
            @Input("input-1") KStream<Long, Order> ordersStream,
            @Input("input-"2) GlobalKTable<Long, Customer> customers,
            @Input("input-3") GlobalKTable<Long, Product> products) {

        KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
                customers, (orderId, order) -> order.getCustomerId(),
                (order, customer) -> new CustomerOrder(customer, order));

        return customerOrdersStream.join(products,
                (orderId, customerOrder) -> customerOrder.productId(),
                (customerOrder, product) -> {
                    EnrichedOrder enrichedOrder = new EnrichedOrder();
                    enrichedOrder.setProduct(product);
                    enrichedOrder.setCustomer(customerOrder.customer);
                    enrichedOrder.setOrder(customerOrder.order);
                    return enrichedOrder;
                });
        }

    interface CustomGlobalKTableProcessor {

            @Input("input-1")
            KStream<?, ?> input1();

            @Input("input-2")
            GlobalKTable<?, ?> input2();

            @Input("input-3")
            GlobalKTable<?, ?> input3();

            @Output("output")
            KStream<?, ?> output();
    }

你可能会注意到,上面两个例子更加冗长,因为除了 provide 之外,你还需要编写自己的自定义绑定接口。 使用功能模型,您可以避免所有这些仪式细节。EnableBindingspring-doc.cn

在我们继续查看 Kafka Streams Binder 提供的通用编程模型之前,这里是多个输出绑定的版本。StreamListenerspring-doc.cn

EnableBinding(KStreamProcessorWithBranches.class)
public static class WordCountProcessorApplication {

    @Autowired
    private TimeWindows timeWindows;

    @StreamListener("input")
    @SendTo({"output1","output2","output3"})
    public KStream<?, WordCount>[] process(KStream<Object, String> input) {

			Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
			Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
			Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

			return input
					.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
					.groupBy((key, value) -> value)
					.windowedBy(timeWindows)
					.count(Materialized.as("WordCounts-1"))
					.toStream()
					.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
					.branch(isEnglish, isFrench, isSpanish);
    }

    interface KStreamProcessorWithBranches {

    		@Input("input")
    		KStream<?, ?> input();

    		@Output("output1")
    		KStream<?, ?> output1();

    		@Output("output2")
    		KStream<?, ?> output2();

    		@Output("output3")
    		KStream<?, ?> output3();
    	}
}

回顾一下,我们已经回顾了使用 Kafka Streams Binder 时的各种编程模型选择。spring-doc.cn

Binder 为 和 input 提供绑定功能。 和 bindings 仅在 input 上可用。 Binder 支持 .KStreamKTableGlobalKTableKTableGlobalKTableKStreamspring-doc.cn

Kafka Streams Binder 编程模型的结果是,Binder 为您提供了使用全功能编程模型或使用基于命令式方法的灵活性。StreamListenerspring-doc.cn

2.4. 编程模型的辅助设备

2.4.1. 单个应用程序中的多个 Kafka Streams 处理器

Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。 您可以按以下方式进行申请。spring-doc.cn

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在这种情况下,Binder 将创建 3 个具有不同应用程序 ID 的单独 Kafka Streams 对象(更多内容见下文)。 但是,如果应用程序中有多个处理器,则必须告诉 Spring Cloud Stream,需要激活哪些功能。 以下是激活功能的方法。spring-doc.cn

spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcessspring-doc.cn

如果您希望某些功能不立即激活,您可以将其从此列表中删除。spring-doc.cn

当您在同一应用程序中拥有单个 Kafka Streams 处理器和其他类型的 bean 时,这些 bean 通过不同的 Binders(例如,基于常规 Kafka Message Channel Binders 的函数 Bean)进行处理时,也是如此Functionspring-doc.cn

2.4.2. Kafka Streams 应用程序 ID

应用程序 ID 是您需要为 Kafka Streams 应用程序提供的必需属性。 Spring Cloud Stream Kafka Streams Binder 允许您以多种方式配置此应用程序 ID。spring-doc.cn

如果您只有一个处理器或在应用程序中,则可以使用以下属性在 Binder 级别进行设置:StreamListenerspring-doc.cn

spring.cloud.stream.kafka.streams.binder.applicationId.spring-doc.cn

为方便起见,如果您只有一个处理器,您还可以用作 the property 来委托应用程序 ID。spring.application.namespring-doc.cn

如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。 对于函数模型,您可以将其作为属性附加到每个函数。spring-doc.cn

例如,假设您有以下功能。spring-doc.cn

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然后,您可以使用以下 Binder 级别属性为每个应用程序设置应用程序 ID。spring-doc.cn

spring.cloud.stream.kafka.streams.binder.functions.process.applicationIdspring-doc.cn

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationIdspring-doc.cn

对于 ,您需要在处理器上的第一个 input binding 上设置此项。StreamListenerspring-doc.cn

例如,假设您有以下两个基于处理器的处理器。StreamListenerspring-doc.cn

@StreamListener
@SendTo("output")
public KStream<String, String> process(@Input("input") <KStream<Object, String>> input) {
   ...
}

@StreamListener
@SendTo("anotherOutput")
public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Object, String>> input) {
   ...
}

然后,您必须使用以下 binding 属性为此设置应用程序 ID。spring-doc.cn

spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationIdspring-doc.cn

spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationIdspring-doc.cn

对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也适用。 但是,如果您使用的是函数模型,则在上面看到的 binder 级别设置每个函数要容易得多。spring-doc.cn

对于生产部署,强烈建议通过配置显式指定应用程序 ID。 如果您要自动扩展应用程序,这一点尤其重要,在这种情况下,您需要确保使用相同的应用程序 ID 部署每个实例。spring-doc.cn

如果应用程序未提供应用程序 ID,则在这种情况下,Binder 将为您自动生成静态应用程序 ID。 这在开发方案中很方便,因为它避免了显式提供应用程序 ID 的需要。 以这种方式生成的应用程序 ID 在应用程序重启时将是静态的。 在函数模型的情况下,生成的应用程序 ID 将是函数 bean 名称,后跟文字 ,例如,如果是函数 bean 名称。 在 的情况下,生成的应用程序 ID 将使用包含类名称,后跟方法名称,后跟文字 ,而不是使用函数 bean 名称。applicationIDprocess-applicationIDprocessStreamListenerapplicationIdspring-doc.cn

应用程序 ID 设置摘要
  • 默认情况下,Binder 将为每个函数或方法自动生成应用程序 ID。StreamListenerspring-doc.cn

  • 如果只有一个处理器,则可以使用 或 。spring.kafka.streams.applicationIdspring.application.namespring.cloud.stream.kafka.streams.binder.applicationIdspring-doc.cn

  • 如果您有多个处理器,则可以使用属性 - 为每个函数设置应用程序 ID。 在 的情况下,可以使用 来完成此操作,假设输入绑定名称为 。spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationIdStreamListenerspring.cloud.stream.kafka.streams.bindings.input.applicationIdinputspring-doc.cn

2.4.3. 使用函数式样式覆盖 Binder 生成的默认绑定名称

默认情况下,当使用函数式样式时,Binders 使用上面讨论的策略来生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。 如果要覆盖这些绑定名称,可以通过指定以下属性来实现。spring-doc.cn

spring.cloud.stream.function.bindings.<default binding name>.默认绑定名称是 Binder 生成的原始绑定名称。spring-doc.cn

例如,假设您有这个函数。spring-doc.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

Binder 将生成名称为 、 和 的绑定。 现在,如果你想把它们完全改成别的,也许是更多特定于域的绑定名称,那么你可以按如下方式进行。process-in-0process-in-1process-out-0spring-doc.cn

springc.cloud.stream.function.bindings.process-in-0=usersspring-doc.cn

springc.cloud.stream.function.bindings.process-in-0=regionsspring-doc.cn

spring.cloud.stream.function.bindings.process-out-0=clicksspring-doc.cn

之后,您必须在这些新绑定名称上设置所有绑定级别属性。spring-doc.cn

请记住,对于上述函数式编程模型,在大多数情况下,遵守默认绑定名称是有意义的。 您可能仍然希望进行此覆盖的唯一原因是,当您拥有大量配置属性并且希望将绑定映射到对域更友好的内容时。spring-doc.cn

2.4.4. 设置 bootstrap 服务器配置

运行 Kafka Streams 应用程序时,您必须提供 Kafka 代理服务器信息。 如果您不提供此信息,则 Binder 会期望您以默认 . 如果不是这种情况,那么您需要覆盖它。有几种方法可以做到这一点。localhost:9092spring-doc.cn

  • 使用 boot 属性 -spring.kafka.bootstrapServersspring-doc.cn

  • Binder 级别属性 -spring.cloud.stream.kafka.streams.binder.brokersspring-doc.cn

当涉及到 Binder 级别属性时,是否使用通过常规 Kafka Binder - 提供的 broker 属性并不重要。 Kafka Streams Binder 将首先检查是否设置了 Kafka Streams Binder 特定的代理属性 (),如果未找到,则查找 .spring.cloud.stream.kafka.binder.brokersspring.cloud.stream.kafka.streams.binder.brokersspring.cloud.stream.kafka.binder.brokersspring-doc.cn

2.5. 记录序列化和反序列化

Kafka Streams Binder 允许您以两种方式序列化和反序列化记录。 一个是 Kafka 提供的原生序列化和反序列化工具,另一个是 Spring Cloud Stream 框架的消息转换能力。 让我们看看一些细节。spring-doc.cn

2.5.1. 入站反序列化

键总是使用本机 Serdes 反序列化。spring-doc.cn

对于值,默认情况下,入站上的反序列化由 Kafka 本机执行。 请注意,这是对以前版本的 Kafka Streams Binder 默认行为的重大更改,其中反序列化是由框架完成的。spring-doc.cn

Kafka Streams Binder 将尝试通过查看 或 的类型签名来推断匹配的类型。 这是它与 Serdes 匹配的顺序。Serdejava.util.function.Function|ConsumerStreamListenerspring-doc.cn

  • 如果应用程序提供了 type 的 bean,并且返回类型使用传入键或值类型的实际类型进行参数化,则它将使用它进行入站反序列化。 例如,如果应用程序中有以下内容,则 Binder 会检测到 Importing 的值类型与在 Bean 上参数化的类型匹配。 它将使用它进行入站反序列化。SerdeSerdeKStreamSerdespring-doc.cn

@Bean
public Serde<Foo() customSerde{
 ...
}

@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
  • 接下来,它查看类型,并查看它们是否是 Kafka Streams 公开的类型之一。如果是这样,请使用它们。 以下是 Binder 将尝试从 Kafka Streams 匹配的 Serde 类型。spring-doc.cn

    Integer, Long, Short, Double, Float, byte[], UUID and String.
  • 如果 Kafka Streams 提供的任何 Serdes 都不匹配类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,Binder 假定类型是 JSON 友好的。 如果您有多个值对象作为输入,这非常有用,因为 Binder 会在内部推断它们以更正 Java 类型。 在回退到之前,Binders 会默认检查它是否可以与传入的 KStream 类型匹配。JsonSerdeSerde`s set in the Kafka Streams configuration to see if it is a `Serdespring-doc.cn

如果上述策略均无效,则应用程序必须通过配置提供 'Serde'。 这可以通过两种方式进行配置 - binding 或 default。spring-doc.cn

首先,Binder 将查看是否在绑定级别提供了 a。 例如,如果您有以下处理器,Serdespring-doc.cn

@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}

然后,您可以使用以下内容提供绑定级别:Serdespring-doc.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果为每个输入绑定提供 as abover,则这将具有更高的优先级,并且 Binder 将远离任何推理。SerdeSerde

如果您希望将默认键/值 Serdes 用于入站反序列化,则可以在 Binder 级别执行此操作。spring-doc.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果您不想要 Kafka 提供的原生解码,可以依赖 Spring Cloud Stream 提供的消息转换功能。 由于原生解码是默认的,为了让 Spring Cloud Stream 反序列化入站值对象,你需要显式禁用原生解码。spring-doc.cn

例如,如果你有与上述相同的 BiFunction 处理器,那么你需要单独禁用所有输入的原生解码。否则,本机解码仍将应用于您未禁用的那些。spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: falsespring-doc.cn

默认情况下,Spring Cloud Stream 将用作内容类型并使用适当的 json 消息转换器。 您可以通过使用以下属性和适当的 Bean 来使用自定义消息转换器。application/jsonMessageConverterspring-doc.cn

spring.cloud.stream.bindings.process-in-0.contentType

2.5.2. 出站序列化

出站序列化几乎遵循与上述入站反序列化相同的规则。 与入站反序列化一样,与以前版本的 Spring Cloud Stream 相比,一个主要变化是出站上的序列化由 Kafka 本地处理。 在 Binder 的 3.0 版本之前,这是由框架本身完成的。spring-doc.cn

出站上的键始终由 Kafka 使用由 Binder 推断的匹配项进行序列化。 如果它无法推断出键的类型,则需要使用 configuration 指定。Serdespring-doc.cn

值 serdes 是使用用于入站反序列化的相同规则推断的。 首先,它进行匹配以查看出站类型是否来自应用程序中提供的 bean。 如果不匹配,它会检查它是否与 Kafka 公开的匹配项匹配,例如 - 、 和 。 如果这不起作用,那么它就会回退到 Spring Kafka 项目提供的,但首先查看默认配置以查看是否有匹配项。 请记住,所有这些都对应用程序是透明的。 如果这些都不起作用,则用户必须提供 to use by configuration。SerdeIntegerLongShortDoubleFloatbyte[]UUIDStringJsonSerdeSerdeSerdespring-doc.cn

假设您使用的是与上述相同的处理器。然后,您可以按如下方式配置出站键/值 Serdes。BiFunctionspring-doc.cn

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

如果 Serde 推理失败,并且未提供绑定级别 Serdes,则 Binders 将回退到 ,但请查看默认 Serdes 进行匹配。JsonSerdespring-doc.cn

默认 serdes 的配置方式与上面相同,在 deserialization 下描述。spring-doc.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serdespring-doc.cn

如果您的应用程序使用分支功能并具有多个输出绑定,则必须为每个绑定配置这些绑定。 同样,如果 Binder 能够推断类型,则无需执行此配置。Serdespring-doc.cn

如果您不需要 Kafka 提供的原生编码,但想使用框架提供的消息转换,则需要显式禁用原生编码,因为原生编码是默认的。 例如,如果你有与上述相同的 BiFunction 处理器,那么你需要在分支的情况下单独禁用所有输出的原生编码。否则,本机编码仍将应用于您未禁用的那些。spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: falsespring-doc.cn

当 Spring Cloud Stream 完成转换时,默认情况下,它将用作内容类型并使用适当的 json 消息转换器。 您可以通过使用以下属性和相应的 Bean 来使用自定义消息转换器。application/jsonMessageConverterspring-doc.cn

spring.cloud.stream.bindings.process-out-0.contentType

当禁用本机编码/解码时, binder 不会像本机 Serdes 那样进行任何推理。 应用程序需要显式提供所有配置选项。 因此,在编写 Spring Cloud Stream Kafka Streams 应用程序时,通常建议保留默认的反序列化/序列化选项,并坚持使用 Kafka Streams 提供的原生反/序列化。 您必须使用框架提供的消息转换功能的一种情况是,当上游生产者使用特定的序列化策略时。 在这种情况下,您需要使用匹配的反序列化策略,因为本机机制可能会失败。 当依赖默认机制时,应用程序必须确保 Binders 有办法使用适当的 正确映射入站和出站,否则可能会失败。SerdeSerdespring-doc.cn

值得一提的是,上面概述的数据取消/序列化方法仅适用于处理器的边缘,即 - 入站和出站。 您的业务逻辑可能仍需要调用明确需要对象的 Kafka Streams API。 这些仍然是应用程序的责任,必须由开发人员进行相应的处理。Serdespring-doc.cn

2.6. 错误处理

Apache Kafka Streams 提供了本机处理反序列化错误异常的功能。 有关此支持的详细信息,请参阅此处。 Apache Kafka Streams 提供了两种开箱即用的反序列化异常处理程序 - 和 . 顾名思义,前者将记录错误并继续处理下一条记录,而后者将记录错误并失败。 是默认的反序列化异常处理程序。LogAndContinueExceptionHandlerLogAndFailExceptionHandlerLogAndFailExceptionHandlerspring-doc.cn

2.6.1. 在 Binder 中处理反序列化异常

Kafka Streams Binder 允许使用以下属性指定上述反序列化异常处理程序。spring-doc.cn

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

除了上述两个反序列化异常处理程序之外,Binder 还提供了第三个处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。 以下是启用此 DLQ 异常处理程序的方法。spring-doc.cn

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

当设置了上述属性时,所有 disserialization error 中的记录都会自动发送到 DLQ 主题。spring-doc.cn

您可以设置发布 DLQ 消息的主题名称,如下所示。spring-doc.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

如果设置了此选项,则错误记录将发送到主题 。 如果未设置,则它将创建一个名为 . 例如,如果绑定的目标主题是 ,应用程序 ID 是 ,则默认 DLQ 主题是 。 如果您打算启用 DLQ,则始终建议为每个输入绑定显式创建一个 DLQ 主题。custom-dlqerror.<input-topic-name>.<application-id>inputTopicprocess-applicationIderror.inputTopic.process-applicationIdspring-doc.cn

2.6.2. 每个输入使用者绑定的 DLQ

该属性适用于整个应用程序。 这意味着,如果同一应用程序中有多个函数或方法,则此属性将应用于所有函数或方法。 但是,如果单个处理器中有多个处理器或多个输入绑定,则可以使用 Binders 为每个输入使用者绑定提供的更精细的 DLQ 控件。spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandlerStreamListenerspring-doc.cn

如果您有以下处理器,spring-doc.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

并且您只想在第一个输入绑定上启用 DLQ,在第二个绑定上启用 logAndSkip,那么您可以在使用者上执行此操作,如下所示。spring-doc.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: logAndSkipspring-doc.cn

以这种方式设置反序列化异常处理程序的优先级高于在 Binder 级别设置。spring-doc.cn

2.6.3. DLQ 分区

默认情况下,使用与原始记录相同的分区将记录发布到 Dead-Letter 主题。 这意味着 Dead-Letter 主题必须至少具有与原始记录一样多的分区。spring-doc.cn

要更改此行为,请将 implementation 作为 添加到 application context。 只能存在一个这样的 bean。 该函数提供 Consumer Group(大多数情况下与应用程序 ID 相同)、failed 和 exception。 例如,如果您始终希望路由到分区 0,则可以使用:DlqPartitionFunction@BeanConsumerRecordspring-doc.cn

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果将使用者绑定的属性设置为 1 (并且 Binder 的 is equal to ),则无需提供 ;框架将始终使用分区 0。 如果将使用者绑定的属性设置为大于(或 Binder 的 is greater than)的值,则必须提供 bean,即使分区计数与原始主题的分区计数相同。dlqPartitionsminPartitionCount1DlqPartitionFunctiondlqPartitions1minPartitionCount1DlqPartitionFunction

在 Kafka Streams Binder 中使用异常处理功能时,需要记住几点。spring-doc.cn

  • 该属性适用于整个应用程序。 这意味着,如果同一应用程序中有多个函数或方法,则此属性将应用于所有函数或方法。spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandlerStreamListenerspring-doc.cn

  • 反序列化的异常处理与本机反序列化和框架提供的消息转换一致。spring-doc.cn

2.6.4. 在 Binder 中处理 production 异常

与上述对反序列化异常处理程序的支持不同,Binders 不提供此类处理 生产异常的一类机制。 但是,您仍然可以使用定制器配置生产异常处理程序,您可以在下面的后续部分中找到更多详细信息。StreamsBuilderFactoryBeanspring-doc.cn

2.7. 状态存储

当使用高级 DSL 时,Kafka Streams 会自动创建状态存储,并进行适当的调用以触发状态存储。spring-doc.cn

如果要将传入绑定具体化为命名状态存储,则可以使用以下策略来实现。KTablespring-doc.cn

假设您有以下函数。spring-doc.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

然后,通过设置以下属性,传入的数据将被具体化到命名的状态存储中。KTablespring-doc.cn

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

您可以在应用程序中将自定义状态存储定义为 bean,这些 bean 将被 Binder 检测并添加到 Kafka Streams 构建器中。 尤其是在使用处理器 API 时,需要手动注册状态存储。 为此,您可以在应用程序中将 StateStore 创建为 Bean。 以下是定义此类 bean 的示例。spring-doc.cn

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

然后,应用程序可以直接访问这些状态存储。spring-doc.cn

在引导过程中,上述 bean 将由 Binder 处理并传递给 Streams 构建器对象。spring-doc.cn

访问 state store:spring-doc.cn

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

这在注册全局 state store 时不起作用。 要注册全局状态存储,请参阅以下有关自定义的部分。StreamsBuilderFactoryBeanspring-doc.cn

2.8. 交互式查询

Kafka Streams Binder API 公开了一个调用的类,用于以交互方式查询状态存储。 您可以在应用程序中将其作为 Spring bean 进行访问。从应用程序访问此 bean 的一种简单方法是访问 bean。InteractiveQueryServiceautowirespring-doc.cn

@Autowired
private InteractiveQueryService interactiveQueryService;

一旦你获得了对这个 bean 的访问权限,那么你就可以查询你感兴趣的特定状态存储了。见下文。spring-doc.cn

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

在启动期间,用于检索存储的上述方法调用可能会失败。 例如,它可能仍处于初始化 state store 的过程中。 在这种情况下,重试此操作将非常有用。 Kafka Streams Binder 提供了一种简单的重试机制来适应这种情况。spring-doc.cn

以下是可用于控制此重试的两个属性。spring-doc.cn

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为 .1spring-doc.cn

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为毫秒。1000spring-doc.cn

如果有多个 kafka streams 应用程序实例正在运行,则在以交互方式查询它们之前,您需要确定哪个应用程序实例托管您正在查询的特定密钥。 API 提供了用于识别主机信息的方法。InteractiveQueryServicespring-doc.cn

为此,您必须按如下方式配置属性:application.serverspring-doc.cn

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

以下是一些代码片段:spring-doc.cn

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

2.9. 健康指标

运行状况指示器需要 dependency 。对于 maven,请使用:spring-boot-starter-actuatorspring-doc.cn

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Spring Cloud Stream Kafka Streams Binder 提供了一个健康指示器来检查底层流线程的状态。 Spring Cloud Stream 定义了一个属性来启用健康指示器。请参阅 Spring Cloud Stream 文档management.health.binders.enabledspring-doc.cn

运行状况指示器为每个流线程的元数据提供以下详细信息:spring-doc.cn

默认情况下,只有全局状态可见( 或 )。要显示详细信息,必须将该属性设置为 或 。 有关运行状况信息的更多详细信息,请参阅 Spring Boot Actuator 文档UPDOWNmanagement.endpoint.health.show-detailsALWAYSWHEN_AUTHORIZEDspring-doc.cn

运行状况指示器的状态是注册的所有 Kafka 线程是否都处于该状态。UPRUNNING

由于 Kafka Streams Binder (和 )中有三个单独的 Binders,它们都将报告运行状况。 启用 后,报告的某些信息可能是冗余的。KStreamKTableGlobalKTableshow-detailsspring-doc.cn

当同一应用程序中存在多个 Kafka Streams 处理器时,将报告所有处理器的运行状况检查,并按 Kafka Streams 的应用程序 ID 进行分类。spring-doc.cn

2.10. 访问 Kafka Streams 指标

Spring Cloud Stream Kafka Streams Binder 提供了一种基本机制,用于访问通过 Micrometer 导出的 Kafka Streams 指标。 Binder 将 Passport 将可用的 Kafka Streams 指标导出到此计量注册表。 导出的指标来自使用者、创建者、管理员客户端和流本身。MeterRegistryKafkaStreams#metrics()spring-doc.cn

Binder 导出的指标的格式为:指标组名称,后跟一个点,然后是实际的指标名称。 原始指标信息中的所有破折号都将替换为点。spring-doc.cn

例如,度量组中的度量名称在千分尺注册表中为 . 同样,度量 from 也可用作 .network-io-totalconsumer-metricsconsumer.metrics.network.io.totalcommit-totalstream-metricsstream.metrics.commit.totalspring-doc.cn

如果同一应用程序中有多个 Kafka Streams 处理器,则指标名称前面将加上 Kafka Streams 的相应应用程序 ID。 在这种情况下,应用程序 ID 将保持原样,即不会将破折号转换为点等。 例如,如果第一个处理器的应用程序 ID 为 ,则度量值组中的度量名称在千分尺注册表中显示为 。processor-1network-io-totalconsumer-metricsprocessor-1.consumer.metrics.network.io.totalspring-doc.cn

您可以通过编程方式访问应用程序中的 Micrometer,然后迭代可用的仪表,也可以使用 Spring Boot actuator 通过 REST 端点访问指标。 通过 Boot Actuator 端点访问时,请确保将 添加到属性 . 然后,您可以访问以获取所有可用指标的列表,然后可以通过相同的 URI () 单独访问这些指标。MeterRegistrymetricsmanagement.endpoints.web.exposure.include/acutator/metrics/actuator/metrics/<metric-name>spring-doc.cn

除了通过 , 提供的信息级别指标之外的任何内容(例如调试级别指标)仍然只能在将 设置为 后通过 JMX 使用。 默认情况下,Kafka Streams 将此级别设置为 。有关更多详细信息,请参阅 Kafka Streams 文档中的此部分。 在未来的发行版中,Binder 可能支持通过 Micrometer 导出这些 DEBUG 级别度量。KafkaStreams#metrics()metrics.recording.levelDEBUGINFOspring-doc.cn

2.11. 混合高级 DSL 和低级处理器 API

Kafka Streams 提供两种 API 变体。 它有一个更高级别的 DSL,类似于 API,您可以在其中链接许多函数式程序员可能熟悉的各种操作。 Kafka Streams 还提供对低级处理器 API 的访问。 处理器 API 虽然非常强大,并且能够在较低级别控制事物,但本质上是必不可少的。 Kafka Streams Binders for Spring Cloud Stream 允许您使用高级 DSL 或混合使用 DSL 和处理器 API。 混合使用这两种变体为您提供了很多选项来控制应用程序中的各种用例。 应用程序可以使用 or 方法 API 调用来访问处理器 API。transformprocessspring-doc.cn

下面我们来看看如何使用 API 在 Spring Cloud Stream 应用程序中结合 DSL 和处理器 API。processspring-doc.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

下面是一个使用 API 的示例。transformspring-doc.cn

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

API 方法调用是终端操作,而 API 是非终端操作,并为您提供潜在的转换,您可以使用该转换继续使用 DSL 或处理器 API 进行进一步处理。processtransformKStreamspring-doc.cn

2.12. 出站的分区支持

Kafka Streams 处理器通常会将处理后的输出发送到出站 Kafka 主题中。 如果出站主题已分区,并且处理器需要将传出数据发送到特定分区,则应用程序需要提供 type 为 的 bean。 有关更多详细信息,请参阅 StreamPartitioner。 让我们看一些例子。StreamPartitionerspring-doc.cn

这是我们已经多次看到的同一个处理器,spring-doc.cn

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

以下是输出绑定目标:spring-doc.cn

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

如果主题有 4 个分区,如果您不提供分区策略,Kafka Streams 将使用默认分区策略,根据特定用例,这可能不是您想要的结果。 假设您希望将匹配到的任何键发送到分区 0、分区 1、分区 2,并将其他所有内容发送到分区 3。 这是您需要在应用程序中执行的操作。outputTopicspringcloudstreamspring-doc.cn

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

这是一个基本的实现,但是,您可以访问记录的键/值、主题名称和分区总数。 因此,如果需要,您可以实施复杂的分区策略。spring-doc.cn

您还需要提供此 bean 名称以及应用程序配置。spring-doc.cn

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

应用程序中的每个输出主题都需要像这样单独配置。spring-doc.cn

2.13. StreamsBuilderFactoryBean 定制器

通常需要自定义创建对象的 。 基于 Spring Kafka 提供的底层支持,binder 允许您自定义 . 您可以使用 来自定义 本身。 然后,一旦您通过此定制器访问 ,您就可以使用 自定义相应的 。 这两个定制器都是 Spring for Apache Kafka 项目的一部分。StreamsBuilderFactoryBeanKafkaStreamsStreamsBuilderFactoryBeanStreamsBuilderFactoryBeanCustomizerStreamsBuilderFactoryBeanStreamsBuilderFactoryBeanKafkaStreamsKafkaStreamsCustomzierspring-doc.cn

以下是使用 .StreamsBuilderFactoryBeanCustomizerspring-doc.cn

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

以上显示了您可以执行的操作来自定义 . 您基本上可以从中调用任何可用的 mutation 操作来自定义它。 此定制器将在工厂 Bean 启动之前由 Binders 调用。StreamsBuilderFactoryBeanStreamsBuilderFactoryBeanspring-doc.cn

访问 后,您还可以自定义基础对象。 这是执行此操作的蓝图。StreamsBuilderFactoryBeanKafkaStreamsspring-doc.cn

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer将在底层开始之前被 right 调用。StreamsBuilderFactoryBeabnKafkaStreamsspring-doc.cn

整个应用程序中只能有一个。 那么,我们如何考虑多个 Kafka Streams 处理器,因为它们每个处理器都由单独的对象备份呢? 在这种情况下,如果这些处理器的自定义需要不同,则应用程序需要根据应用程序 ID 应用一些过滤器。StreamsBuilderFactoryBeanCustomizerStreamsBuilderFactoryBeanspring-doc.cn

例如,spring-doc.cn

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {

    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

2.13.1. 使用 Customizer 注册全局状态存储

如上所述,Binders 没有提供将全局状态存储注册为功能的第一类方法。 为此,您需要使用定制器。 这是如何做到这一点的。spring-doc.cn

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        try {
            final StreamsBuilder streamsBuilder = fb.getObject();
            streamsBuilder.addGlobalStore(...);
        }
        catch (Exception e) {

        }
    };
}

同样,如果您有多个处理器,则需要使用上述应用程序 ID 过滤掉其他对象,从而将全局状态存储附加到右侧。StreamsBuilderStreamsBuilderFactoryBeanspring-doc.cn

2.13.2. 使用 customizer 注册生产异常处理程序

在错误处理部分中,我们指出了 Binder 没有提供处理生产异常的第一类方法。 尽管如此,您仍然可以使用定制器来注册生产异常处理程序。见下文。StreamsBuilderFacotryBeanspring-doc.cn

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

同样,如果您有多个处理器,则可能需要针对正确的 进行适当设置。 您还可以使用 configuration 属性添加此类 production 异常处理程序(有关更多信息,请参见下文),但如果您选择使用编程方法,则这是一个选项。StreamsBuilderFactoryBeanspring-doc.cn

2.14. 时间戳提取器

Kafka Streams 允许您根据各种时间戳概念控制使用者记录的处理。 默认情况下,Kafka Streams 会提取嵌入在使用者记录中的时间戳元数据。 您可以通过为每个 input binding 提供不同的实现来更改此默认行为。 以下是有关如何执行此操作的一些详细信息。TimestampExtractorspring-doc.cn

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
    return orderStream ->
            customers ->
                products -> orderStream;
}

@Bean
public TimestampExtractor timestampExtractor() {
    return new WallclockTimestampExtractor();
}

然后,您为每个使用者绑定设置上述 bean 名称。TimestampExtractorspring-doc.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"

如果您跳过用于设置自定义时间戳提取器的输入使用者绑定,则该使用者将使用默认设置。spring-doc.cn

2.15. 基于 Kafka Streams 的 Binder 和常规 Kafka Binder 的多 Binder

您可以有一个应用程序,其中同时具有基于常规 Kafka Binder 的函数/使用者/供应商和基于 Kafka Streams 的处理器。 但是,您不能在单个函数或使用者中混合使用它们。spring-doc.cn

下面是一个示例,其中同一应用程序中有两个基于 Binder 的组件。spring-doc.cn

@Bean
public Function<String, String> process() {
    return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

    return input -> input;
}

以下是配置中的相关部分:spring-doc.cn

spring.cloud.stream.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar

如果你有与上述相同的应用程序,但正在处理两个不同的 Kafka 集群,那么事情就会变得有点复杂,例如,常规处理器同时作用于 Kafka 集群 1 和集群 2(从集群 1 接收数据并发送到集群 2),而 Kafka Streams 处理器作用于 Kafka 集群 2。 然后,您必须使用 Spring Cloud Stream 提供的多 Binder 工具。processspring-doc.cn

以下是您的配置在这种情况下可能会发生的变化。spring-doc.cn

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.stream.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3

注意上述配置。 我们有两种 Binders,但总共有 3 个 Binder,第一个是基于集群 1 () 的常规 Kafka Binder,然后是另一个基于集群 2 () 的 Kafka Binder,最后是 () 。 应用程序中的第一个处理器从两个 Binders 都基于常规 Kafka Binder 但不同的集群接收数据并发布到其中。 第二个处理器是 Kafka Streams 处理器,它使用的数据与 是相同的集群,但 Binder 类型不同。kafka1kafka2kstreamkafka3kafka1kafka2kafka3kafka2spring-doc.cn

由于 Kafka Streams 系列绑定器中提供了三种不同的 Binder 类型:- 和 - 如果您的应用程序具有基于这些 Binder 中的任何一个的多个绑定,则需要将其显式提供为 Binder 类型。kstreamktableglobalktablespring-doc.cn

例如,如果您有如下处理器,spring-doc.cn

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    ...
}

然后,必须在多 Binder 场景中配置此 ID,如下所示。 请注意,只有当您有一个真正的多 Binder 方案,其中有多个处理器在单个应用程序中处理多个集群时,才需要这样做。 在这种情况下,需要显式地为 Binders 提供 bindings,以区别于其他处理器的 Binder 类型和集群。spring-doc.cn

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1  #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2  #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3  #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.

2.16. 状态清理

默认情况下,该方法在绑定停止时调用。 请参阅 Spring Kafka 文档。 要修改此行为,只需将单个(配置为在启动、停止或两者都不清理)添加到应用程序上下文中;该 bean 将被检测到并连接到工厂 bean。Kafkastreams.cleanup()CleanupConfig@Beanspring-doc.cn

2.17. Kafka Streams 拓扑可视化

Kafka Streams Binder 提供了以下 Actuator 终端节点,用于检索拓扑描述,您可以使用外部工具可视化拓扑。spring-doc.cn

/actuator/topologyspring-doc.cn

/actuator/topology/<applicaiton-id of the processor>spring-doc.cn

您需要包含来自 Spring Boot 的 actuator 和 Web 依赖项才能访问这些端点。 此外,您还需要添加到 property。 默认情况下,终端节点处于禁用状态。topologymanagement.endpoints.web.exposure.includetopologyspring-doc.cn

2.18. 配置选项

本节包含 Kafka Streams Binders 使用的配置选项。spring-doc.cn

有关与 Binder 相关的常见配置选项和属性,请参阅核心文档spring-doc.cn

2.18.1. Kafka Streams Binder 属性

以下属性在 Binder 级别可用,并且必须以spring.cloud.stream.kafka.streams.binder.spring-doc.cn

配置

Map 包含与 Apache Kafka Streams API 相关的属性的键/值对。 此属性必须以 为前缀。 以下是使用此属性的一些示例。spring.cloud.stream.kafka.streams.binder.spring-doc.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关可能进入 streams 配置的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 JavaDocs。 您可以设置的所有配置都可以通过它进行设置。 使用此属性时,它适用于整个应用程序,因为这是 Binder 级别属性。 如果应用程序中有多个处理器,则所有处理器都将获取这些属性。 对于像这样的属性,这将成为问题,因此您必须仔细检查如何使用此 Binder 级别属性映射 中的属性。StreamsConfigStreamsConfigapplication.idStreamsConfigconfigurationspring-doc.cn

functions.<function-bean-name>.applicationId

仅适用于功能式处理器。 这可用于设置应用程序中每个函数的应用程序 ID。 在有多个函数的情况下,这是设置应用程序 ID 的便捷方法。spring-doc.cn

functions.<function-bean-name>.configuration

仅适用于功能式处理器。 Map 包含与 Apache Kafka Streams API 相关的属性的键/值对。 这类似于上面描述的 binder 级别属性,但此级别的属性仅限于命名函数。 当您有多个处理器并且想要根据特定功能限制对配置的访问时,您可能希望使用它。 所有属性都可以在此处使用。configurationconfigurationStreamsConfigspring-doc.cn

经纪人

代理 URLspring-doc.cn

违约:localhostspring-doc.cn

zk节点

Zookeeper 网址spring-doc.cn

违约:localhostspring-doc.cn

deserializationExceptionHandler

反序列化错误处理程序类型。 此处理程序在 Binder 级别应用,因此应用于应用程序中的所有 input 绑定。 有一种方法可以在 Consumer 绑定级别以更精细的方式控制它。 可能的值为 - 或logAndContinuelogAndFailsendToDlqspring-doc.cn

违约:logAndFailspring-doc.cn

应用程序 ID

在 Binder 级别全局设置 Kafka Streams 应用程序 application.id 的便捷方法。 如果应用程序包含多个函数或方法,则应以不同的方式设置应用程序 ID。 请参阅上文,其中详细讨论了设置应用程序 ID。StreamListenerspring-doc.cn

Default:application 将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。spring-doc.cn

stateStoreRetry.maxAttempts

尝试连接到状态存储的最大尝试次数。spring-doc.cn

默认值:1spring-doc.cn

stateStoreRetry.backoffPeriod

重试时尝试连接到 state store 时的 Backoff period。spring-doc.cn

默认值:1000 毫秒spring-doc.cn

2.18.2. Kafka Streams 生产者属性

以下属性仅适用于 Kafka Streams 创建者,并且必须以 为前缀为方便,如果有多个输出绑定,并且它们都需要一个通用值,则可以使用 prefix 进行配置。spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.spring.cloud.stream.kafka.streams.default.producer.spring-doc.cn

keySerde

要使用的 Key Serdespring-doc.cn

默认:请参阅上面关于消息取消/序列化的讨论spring-doc.cn

valueSerde

value serde to usespring-doc.cn

默认:请参阅上面关于消息取消/序列化的讨论spring-doc.cn

useNativeEncoding

启用/禁用本机编码的标志spring-doc.cn

违约:。truespring-doc.cn

streamPartitionerBeanName 中: 要在使用者处使用的自定义出站分区器 bean 名称。 应用程序可以提供 custom 作为 Spring bean,并且可以将此 bean 的名称提供给生产者使用,而不是默认名称。StreamPartitionerspring-doc.cn

+ Default:请参阅上面关于出站分区支持的讨论。spring-doc.cn

2.18.3. Kafka Streams 消费者属性

以下属性可供 Kafka Streams 使用者使用,并且必须以 为前缀为方便,如果有多个输入绑定,并且它们都需要一个通用值,则可以使用 prefix 进行配置 。spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.spring.cloud.stream.kafka.streams.default.consumer.spring-doc.cn

应用程序 ID

设置 application.id per input binding。这仅适用于基于函数的处理器,对于基于函数的处理器,请参阅上面概述的其他方法。StreamListenerspring-doc.cn

默认值:见上文。spring-doc.cn

keySerde

要使用的 Key Serdespring-doc.cn

默认:请参阅上面关于消息取消/序列化的讨论spring-doc.cn

valueSerde

value serde to usespring-doc.cn

默认:请参阅上面关于消息取消/序列化的讨论spring-doc.cn

materialized作为

state store 在使用传入的 KTable 类型时实现spring-doc.cn

违约:。nonespring-doc.cn

useNative解码

启用/禁用本机解码的标志spring-doc.cn

违约:。truespring-doc.cn

dlq名称

DLQ 主题名称。spring-doc.cn

默认值:请参阅上面对错误处理和 DLQ 的讨论。spring-doc.cn

startOffset

如果没有要消耗的已提交偏移量,则从 offset 开始。 这主要在使用者第一次使用某个主题时使用。 Kafka Streams 用作默认策略,而 Binder 使用相同的默认策略。 这可以重写为使用此属性。earliestlatestspring-doc.cn

违约:。earliestspring-doc.cn

注意:对使用者使用不会对 Kafka Streams Binder 产生任何影响。 与基于消息通道的 Binder 不同,Kafka Streams Binder 不会按需开始或结束。resetOffsetsspring-doc.cn

deserializationExceptionHandler

反序列化错误处理程序类型。 此处理程序按使用者绑定应用,而不是前面描述的 Binder 级别属性。 可能的值为 - 或logAndContinuelogAndFailsendToDlqspring-doc.cn

违约:logAndFailspring-doc.cn

时间戳ExtractorBeanName

要在使用者处使用的特定时间戳提取器 bean 名称。 应用程序可以作为 Spring bean 提供,并且可以将此 bean 的名称提供给使用者使用,而不是默认名称。TimestampExtractorspring-doc.cn

Default:请参阅上面关于时间戳提取器的讨论。spring-doc.cn

2.18.4. 关于并发的特别说明

在 Kafka Streams 中,您可以使用该属性控制处理器可以创建的线程数。 为此,您可以使用上面在 binder、functions、producer 或 consumer 级别下描述的各种选项来完成。 您还可以使用核心 Spring Cloud Stream 提供的属性来实现此目的。 使用时,您需要在消费者上使用它。 当您在 function 或 中有多个 input bindings 时,请在第一个 input 绑定上设置此项。 例如,当设置 时,它将被 Binder 翻译为 。num.stream.threadsconfigurationconcurrencyStreamListenerspring.cloud.stream.bindings.process-in-0.consumer.concurrencynum.stream.threadsspring-doc.cn