将依赖项添加到项目:Add the dependency to your project:spring-rabbit-streamSpring中文文档

马文
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>3.1.6</version>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbit-stream:3.1.6'

您可以像往常一样,使用 Bean 来配置队列,并使用指定队列类型的方法。 例如:RabbitAdminQueueBuilder.stream()Spring中文文档

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

但是,这仅在您还使用非流组件(如 或)时才有效,因为在打开 AMQP 连接时会触发管理员声明定义的 bean。 如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应改为配置:SimpleMessageListenerContainerDirectMessageListenerContainerStreamAdminSpring中文文档

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

有关 .StreamCreatorSpring中文文档

发送消息

提供 (AMQP) 功能的子集。RabbitStreamTemplateRabbitTemplateSpring中文文档

RabbitStream操作
public interface RabbitStreamOperations extends AutoCloseable {

	CompletableFuture<Boolean> send(Message message);

	CompletableFuture<Boolean> convertAndSend(Object message);

	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

	MessageBuilder messageBuilder();

	MessageConverter messageConverter();

	StreamMessageConverter streamMessageConverter();

	@Override
	void close() throws AmqpException;

}

该实现具有以下构造函数和属性:RabbitStreamTemplateSpring中文文档

RabbitStreamTemplate
public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

在将对象转换为 Spring AMQP 的方法中使用。MessageConverterconvertAndSendMessageSpring中文文档

用于从 Spring AMQP 转换为本机流。StreamMessageConverterMessageMessageSpring中文文档

您也可以直接发送原生流;使用提供对 的消息生成器的访问的方法。MessagemessageBuilder()ProducerSpring中文文档

提供了一种机制,用于在生成生成器之前对其进行自定义。ProducerCustomizerSpring中文文档

请参阅 Java 客户端文档,了解如何自定义 和 。EnvironmentProducerSpring中文文档

从版本 3.0 开始,方法返回类型不是 。CompletableFutureListenableFuture
从版本 3.0 开始,方法返回类型不是 。CompletableFutureListenableFuture

接收消息

异步消息接收由 (和 使用 时提供)。StreamListenerContainerStreamRabbitListenerContainerFactory@RabbitListenerSpring中文文档

侦听器容器需要一个流名称和一个流名称。EnvironmentSpring中文文档

您可以使用经典接口接收 Spring AMQP s,也可以使用新接口接收本机流:MessageMessageListenerMessageSpring中文文档

public interface StreamMessageListener extends MessageListener {

	void onStreamMessage(Message message, Context context);

}

有关支持的属性的信息,请参阅消息侦听器容器配置Spring中文文档

与模板类似,容器具有属性。ConsumerCustomizerSpring中文文档

请参阅 Java 客户端文档,了解如何自定义 和 。EnvironmentConsumerSpring中文文档

使用时,配置一个;此时,大多数属性(等)将被忽略。仅支持 、 和 。 此外,只能包含一个流名称。@RabbitListenerStreamRabbitListenerContainerFactory@RabbitListenerconcurrencyidqueuesautoStartupcontainerFactoryqueuesSpring中文文档

例子

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

版本 2.4.5 将该属性添加到(及其工厂)。 还提供了一个新的工厂 Bean,用于创建无状态重试拦截器,该拦截器具有可选功能,可在使用原始流消息时使用。adviceChainStreamListenerContainerStreamMessageRecovererSpring中文文档

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}
此容器不支持有状态重试。
此容器不支持有状态重试。

超级流

超级流是分区流的抽象概念,通过将多个流队列绑定到具有参数的交换来实现。x-super-stream: trueSpring中文文档

供应

为方便起见,可以通过定义类型为 的单个 Bean 来置备超级流。SuperStreamSpring中文文档

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

检测到此 bean,并将声明 exchange () 和 3 个队列(分区) - 其中 是 、 、 ,绑定的路由键等于 。RabbitAdminmy.super.streammy.super-stream-nn012nSpring中文文档

如果您还希望通过 AMQP 发布到交换,则可以提供自定义路由密钥:Spring中文文档

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

键数必须等于分区数。Spring中文文档

制作到 SuperStream

您必须将 a 添加到:superStreamRoutingFunctionRabbitStreamTemplateSpring中文文档

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

您还可以使用 .RabbitTemplateSpring中文文档

使用具有单个活动使用者的超级流

在侦听器容器上调用该方法,以在超级流上启用单个活动使用者。superStreamSpring中文文档

@Bean
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}
此时,当并发性大于 1 时,实际并发量由 ;若要实现完全并发,请将环境设置为 1。 请参阅配置环境EnvironmentmaxConsumersByConnection
此时,当并发性大于 1 时,实际并发量由 ;若要实现完全并发,请将环境设置为 1。 请参阅配置环境EnvironmentmaxConsumersByConnection

千分尺观察

从版本 3.0.5 开始,现在支持将 Micrometer 用于 和 流侦听器容器。 该容器现在还支持千分尺计时器(当未启用观察时)。RabbitStreamTemplateSpring中文文档

在每个组件上设置以启用观察;这将禁用千分尺计时器,因为计时器现在将针对每次观测进行管理。 使用带注释的侦听器时,请在容器工厂上设置。observationEnabledobservationEnabledSpring中文文档

有关详细信息,请参阅千分尺跟踪Spring中文文档

要将标记添加到计时器/跟踪,请分别配置自定义或模板或侦听器容器。RabbitStreamTemplateObservationConventionRabbitStreamListenerObservationConventionSpring中文文档

默认实现会添加模板观察任务的标记和容器的标记。namelistener.idSpring中文文档

您可以子类化或提供全新的实现。DefaultRabbitStreamTemplateObservationConventionDefaultStreamRabbitListenerObservationConventionSpring中文文档

有关详细信息,请参阅千分尺观察文档Spring中文文档