Spring Cloud Stream 支持在给定应用程序的多个实例之间对数据进行分区。 在分区场景中,物理通信介质(如代理主题)被视为结构化为多个分区。 一个或多个生产者应用程序实例将数据发送到多个使用者应用程序实例,并确保由共同特征标识的数据由同一个使用者实例处理。Spring中文文档

Spring Cloud Stream 提供了一个通用的抽象,用于以统一的方式实现分区处理用例。 因此,无论代理本身是否自然分区(例如,Kafka)或不进行分区(例如,RabbitMQ),都可以使用分区。Spring中文文档

SCSt 分区
图 1.Spring Cloud Stream 分区

分区是有状态处理中的一个关键概念,其中(出于性能或一致性原因)确保所有相关数据一起处理至关重要。 例如,在时间窗口平均计算示例中,来自任何给定传感器的所有测量值都必须由同一应用程序实例处理。Spring中文文档

若要设置分区处理方案,必须同时配置数据生成端和数据使用端。

Spring Cloud Stream 中的分区由两个任务组成:Spring中文文档

若要设置分区处理方案,必须同时配置数据生成端和数据使用端。

配置分区的输出绑定

可以通过设置一个且仅一个 or 属性及其属性来配置输出绑定以发送分区数据。partitionKeyExpressionpartitionKeyExtractorNamepartitionCountSpring中文文档

例如,以下是有效且典型的配置:Spring中文文档

spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id
spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5

根据该示例配置,使用以下逻辑将数据发送到目标分区。Spring中文文档

分区键的值是针对发送到分区输出绑定的每条消息计算的。 这是一个 SpEL 表达式,它根据用于提取分区键的出站消息(在前面的示例中是 from 消息标头的值)进行评估。partitionKeyExpressionpartitionKeyExpressionidSpring中文文档

如果 SpEL 表达式不足以满足您的需求,则可以通过提供 Bean 的实现并将其配置为 Bean(通过使用注释)来计算分区键值。 如果应用程序上下文中有多个类型的 Bean,则可以通过使用属性指定其名称来进一步筛选它,如以下示例所示:org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy@Beanorg.springframework.cloud.stream.binder.PartitionKeyExtractorStrategypartitionKeyExtractorNameSpring中文文档

--spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
    return new CustomPartitionKeyExtractorClass();
}
在早期版本的 Spring Cloud Stream 中,可以通过设置属性来指定实现。 从版本 3.0 开始,将删除此属性。org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategyspring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass

计算消息键后,分区选择过程将目标分区确定为介于 和 之间的值。 默认计算(适用于大多数方案)基于以下公式:。 这可以在绑定上自定义,方法是设置要针对“键”计算的 SpEL 表达式(通过属性),或者通过配置 as bean 的实现(通过使用 @Bean 注释)。 与 类似,当 Application Context 中有多个此类型的 Bean 可用时,可以使用该属性进一步对其进行筛选,如以下示例所示:0partitionCount - 1key.hashCode() % partitionCountpartitionSelectorExpressionorg.springframework.cloud.stream.binder.PartitionSelectorStrategyPartitionKeyExtractorStrategyspring.cloud.stream.bindings.output.producer.partitionSelectorNameSpring中文文档

--spring.cloud.stream.bindings.func-out-0.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
    return new CustomPartitionSelectorClass();
}
在早期版本的 Spring Cloud Stream 中,您可以通过设置属性来指定实现。 从版本 3.0 开始,将删除此属性。org.springframework.cloud.stream.binder.PartitionSelectorStrategyspring.cloud.stream.bindings.output.producer.partitionSelectorClass
在早期版本的 Spring Cloud Stream 中,可以通过设置属性来指定实现。 从版本 3.0 开始,将删除此属性。org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategyspring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass
在早期版本的 Spring Cloud Stream 中,您可以通过设置属性来指定实现。 从版本 3.0 开始,将删除此属性。org.springframework.cloud.stream.binder.PartitionSelectorStrategyspring.cloud.stream.bindings.output.producer.partitionSelectorClass

配置用于分区的输入绑定

输入绑定(绑定名称)配置为通过设置其属性以及应用程序本身的 and 属性来接收分区数据,如以下示例所示:uppercase-in-0partitionedinstanceIndexinstanceCountSpring中文文档

spring.cloud.stream.bindings.uppercase-in-0.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

该值表示应在数据之间进行分区的应用程序实例的总数。 必须是跨多个实例的唯一值,其值介于 和 之间。 实例索引可帮助每个应用程序实例标识从中接收数据的唯一分区。 使用不支持本机分区的技术的活页夹需要它。 例如,使用 RabbitMQ,每个分区都有一个队列,队列名称包含实例索引。 使用 Kafka,if is(默认),Kafka 负责在实例之间分配分区,这些属性不是必需的。 如果设置为 false,则绑定程序使用 和 来确定实例订阅的分区(您必须至少具有与实例一样多的分区)。 活页夹分配分区而不是 Kafka。 如果您希望特定分区的消息始终转到同一实例,这可能很有用。 当绑定配置需要它们时,必须正确设置这两个值,以确保使用所有数据,并且应用程序实例接收互斥数据集。instanceCountinstanceIndex0instanceCount - 1autoRebalanceEnabledtrueautoRebalanceEnabledinstanceCountinstanceIndexSpring中文文档

虽然在独立情况下使用多个实例进行分区数据处理的场景可能很复杂,但 Spring Cloud Dataflow 可以通过正确填充输入和输出值并让您依赖运行时基础结构来提供有关实例索引和实例计数的信息来显着简化该过程。Spring中文文档

测试

Spring Cloud Stream 支持在不连接到消息传递系统的情况下测试微服务应用程序。Spring中文文档