版本 2.4 引入了对 RabbitMQ Stream Plugin 的 Java 客户端的初始支持。
-
RabbitStreamTemplate
-
StreamListenerContainer
将依赖项添加到您的项目中:spring-rabbit-stream
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
<version>3.1.6</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbit-stream:3.1.6'
您可以像往常一样预置队列,使用 Bean 使用 该方法指定队列类型。
例如:RabbitAdmin
QueueBuilder.stream()
@Bean
Queue stream() {
return QueueBuilder.durable("stream.queue1")
.stream()
.build();
}
但是,这仅在您还使用非流组件(例如 or )时才有效,因为在打开 AMQP 连接时,会触发 admin 声明定义的 bean。
如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应改为配置:SimpleMessageListenerContainer
DirectMessageListenerContainer
StreamAdmin
@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, sc -> {
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
sc.stream("stream.queue2").create();
});
}
请参阅 RabbitMQ 文档,以了解有关 .StreamCreator
发送消息
它提供了 (AMQP) 功能的子集。RabbitStreamTemplate
RabbitTemplate
public interface RabbitStreamOperations extends AutoCloseable {
CompletableFuture<Boolean> send(Message message);
CompletableFuture<Boolean> convertAndSend(Object message);
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
该实现具有以下构造函数和属性:RabbitStreamTemplate
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
在方法中用于将对象转换为 Spring AMQP 。MessageConverter
convertAndSend
Message
这用于从 Spring AMQP 转换为 native stream 。StreamMessageConverter
Message
Message
您也可以直接发送本机流 s;使用该方法提供对 消息生成器的访问。Message
messageBuilder()
Producer
这提供了一种在构建 producer 之前对其进行自定义的机制。ProducerCustomizer
请参阅 Java 客户端文档,了解如何自定义 和 。Environment
Producer
从版本 3.0 开始,方法返回类型不是 .CompletableFuture ListenableFuture |
从版本 3.0 开始,方法返回类型不是 .CompletableFuture ListenableFuture |
接收消息
异步消息接收由 (以及使用 时) 提供。StreamListenerContainer
StreamRabbitListenerContainerFactory
@RabbitListener
侦听器容器需要一个流名称以及一个流名称。Environment
您可以使用经典接收 Spring AMQP ,也可以使用新接口接收本机流:Message
MessageListener
Message
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
有关支持的属性的信息,请参阅 Message Listener Container Configuration 。
与模板类似,容器具有 property。ConsumerCustomizer
请参阅 Java 客户端文档,了解如何自定义 和 。Environment
Consumer
使用 时,请配置 ;此时,大多数属性 (等) 将被忽略。仅支持 、 和 。
此外,只能包含一个流名称。@RabbitListener
StreamRabbitListenerContainerFactory
@RabbitListener
concurrency
id
queues
autoStartup
containerFactory
queues
例子
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue1")
.stream()
.build();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue2")
.stream()
.build();
}
版本 2.4.5 将属性添加到 (及其工厂) 中。
还提供了一个新的工厂 Bean 来创建无状态重试拦截器,该拦截器在使用原始流消息时具有可选的使用。adviceChain
StreamListenerContainer
StreamMessageRecoverer
@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
此容器不支持有状态重试。 |
此容器不支持有状态重试。 |
超级流
Super Stream 是分区流的抽象概念,通过将多个流队列绑定到具有参数的 exchange 来实现。x-super-stream: true
供应
为方便起见,可以通过定义 type 为 的单个 bean 来供应 super 流。SuperStream
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
检测到此 bean 并将声明 exchange () 和 3 个队列(分区) - 其中 是 、 、 、 绑定的路由键等于 。RabbitAdmin
my.super.stream
my.super-stream-n
n
0
1
2
n
如果您还希望通过 AMQP 向 Exchange 发布信息,则可以提供自定义路由密钥:
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
键的数量必须等于分区的数量。
使用 SuperStream 进行制作
您必须将 a 添加到 :superStreamRoutingFunction
RabbitStreamTemplate
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
您还可以使用 .RabbitTemplate
使用具有单个活跃使用者的 Super Streams
在侦听器容器上调用该方法,在超级流上启用单个活跃的 consumer。superStream
@Bean
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
此时,当并发数大于 1 时,实际并发数由 ;要实现完全并发,请将环境设置为 1。
请参阅配置环境。Environment maxConsumersByConnection |
此时,当并发数大于 1 时,实际并发数由 ;要实现完全并发,请将环境设置为 1。
请参阅配置环境。Environment maxConsumersByConnection |
千分尺观察
从版本 3.0.5 开始,现在支持对 和 流侦听器容器使用 Micrometer 进行观察。
该容器现在还支持 Micrometer 计时器(未启用观察时)。RabbitStreamTemplate
在每个组件上设置以启用观察;这将禁用 Micrometer Timers,因为计时器现在将随每个观测一起管理。
使用带注释的侦听器时,请在容器工厂上设置。observationEnabled
observationEnabled
有关更多信息,请参阅 Micrometer Tracing 。
要向计时器/跟踪添加标签,请分别配置自定义或模板或侦听器容器。RabbitStreamTemplateObservationConvention
RabbitStreamListenerObservationConvention
默认实施会添加 template observations 的标签和 containers 的标签。name
listener.id
您可以子类化 or or 提供全新的实现。DefaultRabbitStreamTemplateObservationConvention
DefaultStreamRabbitListenerObservationConvention
有关更多详细信息,请参阅千分尺观测文档。