此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
分区
Spring Cloud Stream 支持在给定应用程序的多个实例之间对数据进行分区。 在分区方案中,物理通信介质(例如代理主题)被视为结构化为多个分区。 一个或多个创建者应用程序实例将数据发送到多个使用者应用程序实例,并确保由共同特征标识的数据由同一使用者实例处理。
Spring Cloud Stream 为以统一的方式实现分区处理用例提供了一个通用的抽象。 因此,无论代理本身是否自然分区(例如 Kafka)或是否(例如 RabbitMQ),都可以使用分区。
分区是有状态处理中的一个关键概念,其中确保所有相关数据一起处理至关重要(出于性能或一致性原因)。 例如,在时间窗口平均计算示例中,来自任何给定传感器的所有测量值都必须由同一应用程序实例处理,这一点很重要。
要设置分区处理方案,必须同时配置数据生成端和数据使用端。 |
Spring Cloud Stream 中的分区包括两个任务:
配置用于分区的输出绑定
您可以通过设置一个且仅一个 or 属性及其属性来配置输出绑定以发送分区数据。partitionKeyExpression
partitionKeyExtractorName
partitionCount
例如,以下是有效且典型的配置:
spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
根据该示例配置,使用以下逻辑将数据发送到目标分区。
分区键的值是根据 .
这是一个 SPEL 表达式,根据出站消息(在前面的示例中,它是 from 消息标头的值)进行评估,用于提取分区键。partitionKeyExpression
partitionKeyExpression
id
如果 SPEL 表达式不足以满足您的需要,则可以改为通过提供 Bean 的实现并将其配置为 Bean(通过使用 Comments)来计算分区键值。
如果在 Application Context 中有多个 bean 类型可用,则可以通过使用属性指定其名称来进一步过滤它,如以下示例所示:org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy
@Bean
org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy
partitionKeyExtractorName
--spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
return new CustomPartitionKeyExtractorClass();
}
在早期版本的 Spring Cloud Stream 中,您可以通过设置属性来指定 的实现。
从版本 3.0 开始,此属性已删除。org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass |
计算消息键后,分区选择过程会将目标分区确定为 和 之间的值。
适用于大多数方案的默认计算基于以下公式:。
这可以在绑定上进行自定义,方法是将 SPEL 表达式设置为针对“键”(通过属性)进行求值,或者将 的实现配置为 bean(通过使用 @Bean 注释)。
与 ,当 Application Context 中有多个此类型的 bean 可用时,您可以使用该属性进一步过滤它,如以下示例所示:0
partitionCount - 1
key.hashCode() % partitionCount
partitionSelectorExpression
org.springframework.cloud.stream.binder.PartitionSelectorStrategy
PartitionKeyExtractorStrategy
spring.cloud.stream.bindings.output.producer.partitionSelectorName
--spring.cloud.stream.bindings.func-out-0.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
return new CustomPartitionSelectorClass();
}
在早期版本的 Spring Cloud Stream 中,您可以通过设置属性来指定 的实现。
从版本 3.0 开始,此属性已删除。org.springframework.cloud.stream.binder.PartitionSelectorStrategy spring.cloud.stream.bindings.output.producer.partitionSelectorClass |
配置用于分区的输入绑定
输入绑定(使用 binding name )配置为通过设置其属性以及应用程序本身的 和 属性来接收分区数据,如以下示例所示:uppercase-in-0
partitioned
instanceIndex
instanceCount
spring.cloud.stream.bindings.uppercase-in-0.consumer.partitioned=true spring.cloud.stream.instanceIndex=3 spring.cloud.stream.instanceCount=5
该值表示应在其之间对数据进行分区的应用程序实例的总数。
这必须是多个实例中的唯一值,值介于 和 之间。
实例索引可帮助每个应用程序实例识别它从中接收数据的唯一分区。
使用本身不支持分区的技术的 Binders 需要它。
例如,使用 RabbitMQ 时,每个分区都有一个队列,队列名称包含实例索引。
使用 Kafka 时,如果为 (默认),Kafka 负责在实例之间分配分区,并且这些属性不是必需的。
如果设置为 false,则 Binder 使用 和 来确定实例订阅的分区(您的分区数必须至少与实例数一样多)。
Binder 分配分区而不是 Kafka。
如果您希望特定分区的消息始终发送到同一实例,这可能很有用。
当 Binder 配置需要它们时,正确设置这两个值非常重要,以确保所有数据都被使用,并且应用程序实例接收互斥的数据集。instanceCount
instanceIndex
0
instanceCount - 1
autoRebalanceEnabled
true
autoRebalanceEnabled
instanceCount
instanceIndex
虽然使用多个实例进行分区数据处理的场景在独立情况下设置可能很复杂,但 Spring Cloud Dataflow 可以通过正确填充输入和输出值并让您依赖运行时基础设施提供有关实例索引和实例计数的信息来显着简化该过程。