RabbitMQ 不支持本机分区。Spring中文文档

有时,将数据发送到特定分区是有利的 - 例如,当您要严格排序消息处理时,特定客户的所有消息都应转到同一分区。Spring中文文档

通过将每个分区的队列绑定到目标交换来提供分区。RabbitMessageChannelBinderSpring中文文档

以下 Java 和 YAML 示例演示如何配置生产者:Spring中文文档

制作人
@SpringBootApplication
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            generate-out-0:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

前面示例中的配置使用默认分区 ()。 这可能会也可能不会提供适当平衡的算法,具体取决于键值。 可以使用 or 属性覆盖此默认值。key.hashCode() % partitionCountpartitionSelectorExpressionpartitionSelectorClassSpring中文文档

仅当需要在部署生产者时预配使用者队列时,才需要该属性。 否则,发送到分区的任何消息都将丢失,直到部署相应的使用者。required-groupsSpring中文文档

前面示例中的配置使用默认分区 ()。 这可能会也可能不会提供适当平衡的算法,具体取决于键值。 可以使用 or 属性覆盖此默认值。key.hashCode() % partitionCountpartitionSelectorExpressionpartitionSelectorClassSpring中文文档

仅当需要在部署生产者时预配使用者队列时,才需要该属性。 否则,发送到分区的任何消息都将丢失,直到部署相应的使用者。required-groupsSpring中文文档

以下配置预配主题交换:Spring中文文档

零件交换

以下队列绑定到该交换:Spring中文文档

零件队列

以下绑定将队列关联到 Exchange:Spring中文文档

零件绑定

以下 Java 和 YAML 示例延续了前面的示例,并演示了如何配置使用者:Spring中文文档

消费者
@SpringBootApplication
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
            System.out.println(in + " received from queue " + queue);
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            listen-in-0:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0
不支持动态缩放。 每个分区必须至少有一个使用者。 使用者的用于指示使用哪个分区。 Cloud Foundry 等平台只能有一个实例具有 .RabbitMessageChannelBinderinstanceIndexinstanceIndex
不支持动态缩放。 每个分区必须至少有一个使用者。 使用者的用于指示使用哪个分区。 Cloud Foundry 等平台只能有一个实例具有 .RabbitMessageChannelBinderinstanceIndexinstanceIndex