Spring Data 为 Redis 提供了专用的消息集成,在功能和命名上类似于 Spring Framework 中的 JMS 集成。Spring中文文档

Redis 消息传递大致可以分为两个功能领域:Spring中文文档

这是通常称为 Publish/Sub(简称 Pub/Sub)的模式的一个示例。该类用于消息生成。对于类似于 Java EE 的消息驱动 Bean 样式的异步接收,Spring Data 提供了一个专用的消息侦听器容器,用于创建消息驱动的 POJO (MDP),以及用于同步接收的协定。RedisTemplateRedisConnectionSpring中文文档

和 包为 Redis 消息传递提供核心功能。org.springframework.data.redis.connectionorg.springframework.data.redis.listenerSpring中文文档

发布(发送消息)

若要发布消息,可以像其他操作一样使用低级别或高级 .这两个实体都提供该方法,该方法接受消息和目标通道作为参数。虽然需要原始数据(字节数组),但允许任意对象作为消息传入,如以下示例所示:[Reactive]RedisConnection[Reactive]RedisOperationspublishRedisConnection[Reactive]RedisOperationsSpring中文文档

// send message through connection
RedisConnection con = …
byte[] msg = …
byte[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through RedisOperations
RedisOperations operations = …
Long numberOfClients = operations.convertAndSend("hello!", "world");
// send message through connection
ReactiveRedisConnection con = …
ByteBuffer[] msg = …
ByteBuffer[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through ReactiveRedisOperations
ReactiveRedisOperations operations = …
Mono<Long> numberOfClients = operations.convertAndSend("hello!", "world");

订阅(接收消息)

在接收端,可以通过直接命名或使用模式匹配来订阅一个或多个频道。后一种方法非常有用,因为它不仅允许使用一个命令创建多个订阅,还可以收听订阅时尚未创建的频道(只要它们与模式匹配)。Spring中文文档

在低级别,提供 and 方法,用于映射 Redis 命令,以便分别按通道或模式进行订阅。请注意,多个通道或模式可以用作参数。若要更改连接的订阅或查询连接是否正在侦听,请提供 和 方法。RedisConnectionsubscribepSubscribeRedisConnectiongetSubscriptionisSubscribedSpring中文文档

Spring Data Redis 中的订阅命令被阻止。也就是说,在连接上调用 subscribe 会导致当前线程在开始等待消息时阻塞。仅当订阅被取消时,才会释放线程,当另一个线程调用或在同一连接上时,就会发生这种情况。有关此问题的解决方案,请参阅“消息侦听器容器”(在本文档后面)。unsubscribepUnsubscribe

如前所述,一旦订阅,连接就会开始等待消息。仅允许添加新订阅、修改现有订阅和取消现有订阅的命令。调用除 、 、 以外的任何内容,或引发异常。subscribepSubscribeunsubscribepUnsubscribeSpring中文文档

为了订阅消息,需要实现回调。每次有新消息到达时,都会调用回调,并由该方法运行用户代码。该接口不仅可以访问实际消息,还可以访问接收消息的通道以及订阅用于匹配通道的模式(如果有)。此信息使被调用者不仅可以通过内容来区分各种消息,还可以检查其他详细信息。MessageListeneronMessageSpring中文文档

消息侦听器容器

由于其阻塞性质,低级别订阅没有吸引力,因为它需要对每个侦听器进行连接和线程管理。为了缓解这个问题,Spring Data 提供了 ,它可以完成所有繁重的工作。如果您熟悉 EJB 和 JMS,您应该会发现这些概念很熟悉,因为它被设计为尽可能接近 Spring Framework 及其消息驱动的 POJO (MDP) 中的支持。RedisMessageListenerContainerSpring中文文档

RedisMessageListenerContainer充当消息侦听器容器。它用于从 Redis 通道接收消息并驱动注入其中的实例。侦听器容器负责将消息接收和调度到侦听器进行处理的所有线程。消息侦听器容器是 MDP 和消息传递提供者之间的中介,负责注册以接收消息、资源获取和发布、异常转换等。这样一来,作为应用程序开发人员,您可以编写与接收消息(并做出反应)相关的(可能很复杂)业务逻辑,并将样板 Redis 基础架构关注点委托给框架。MessageListenerSpring中文文档

A还可以实现在订阅/取消订阅确认时接收通知。在同步调用时,侦听订阅通知非常有用。MessageListenerSubscriptionListenerSpring中文文档

此外,为了最大程度地减少应用程序占用空间,允许一个连接和一个线程由多个侦听器共享,即使它们不共享订阅。因此,无论应用程序跟踪多少侦听器或通道,运行时成本在其整个生命周期内都保持不变。此外,该容器允许运行时配置更改,以便您可以在应用程序运行时添加或删除侦听器,而无需重新启动。此外,容器使用延迟订阅方法,仅在需要时使用。如果所有侦听器都已取消订阅,则会自动执行清理,并释放线程。RedisMessageListenerContainerRedisConnectionSpring中文文档

为了帮助解决消息的异步性质,容器需要一个(或Spring的)来调度消息。根据负载、侦听器数量或运行时环境,您应该更改或调整执行程序以更好地满足您的需求。特别是,在托管环境(如应用服务器)中,强烈建议选择适当的环境来利用其运行时。java.util.concurrent.ExecutorTaskExecutorTaskExecutorSpring中文文档

The MessageListenerAdapter

该类是 Spring 异步消息支持的最后一个组件。简而言之,它允许您将几乎任何类公开为 MDP(尽管有一些限制)。MessageListenerAdapterSpring中文文档

请考虑以下接口定义:Spring中文文档

public interface MessageDelegate {
  void handleMessage(String message);
  void handleMessage(Map message);
  void handleMessage(byte[] message);
  void handleMessage(Serializable message);
  // pass the channel/pattern as well
  void handleMessage(Serializable message, String channel);
 }

请注意,尽管接口不扩展接口,但仍可以通过使用类将其用作 MDP。另请注意,如何根据各种消息处理方法可以接收和处理的各种类型的内容对它们进行强类型化。此外,消息发送到的通道或模式可以作为 type 的第二个参数传递到方法中:MessageListenerMessageListenerAdapterMessageStringSpring中文文档

public class DefaultMessageDelegate implements MessageDelegate {
  // implementation elided for clarity...
}

请注意,上面的接口实现(上面的类)根本没有 Redis 依赖项。它确实是一个 POJO,我们将其制作成具有以下配置的 MDP:MessageDelegateDefaultMessageDelegateSpring中文文档

@Configuration
class MyConfig {

  // …

  @Bean
  DefaultMessageDelegate listener() {
    return new DefaultMessageDelegate();
  }

  @Bean
  MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {
    return new MessageListenerAdapter(listener, "handleMessage");
  }

  @Bean
  RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(listener, ChannelTopic.of("chatroom"));
    return container;
  }
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:redis="http://www.springframework.org/schema/redis"
   xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/redis https://www.springframework.org/schema/redis/spring-redis.xsd">

<!-- the default ConnectionFactory -->
<redis:listener-container>
  <!-- the method attribute can be skipped as the default method name is "handleMessage" -->
  <redis:listener ref="listener" method="handleMessage" topic="chatroom" />
</redis:listener-container>

<bean id="listener" class="redisexample.DefaultMessageDelegate"/>
 ...
</beans>
侦听器主题可以是通道(例如,)或模式(例如,topic="chatroom"topic="*room")

前面的示例使用 Redis 命名空间声明消息监听器容器,并自动将 POJO 注册为监听器。成熟的豆子定义如下:Spring中文文档

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
  <constructor-arg>
    <bean class="redisexample.DefaultMessageDelegate"/>
  </constructor-arg>
</bean>

<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="messageListeners">
    <map>
      <entry key-ref="messageListener">
        <bean class="org.springframework.data.redis.listener.ChannelTopic">
          <constructor-arg value="chatroom"/>
        </bean>
      </entry>
    </map>
  </property>
</bean>

每次收到消息时,适配器都会自动且透明地在低级格式和所需对象类型之间执行转换(使用配置的)。由方法调用引起的任何异常都会由容器捕获和处理(默认情况下,会记录异常)。RedisSerializerSpring中文文档

Spring Data Redis 中的订阅命令被阻止。也就是说,在连接上调用 subscribe 会导致当前线程在开始等待消息时阻塞。仅当订阅被取消时,才会释放线程,当另一个线程调用或在同一连接上时,就会发生这种情况。有关此问题的解决方案,请参阅“消息侦听器容器”(在本文档后面)。unsubscribepUnsubscribe
侦听器主题可以是通道(例如,)或模式(例如,topic="chatroom"topic="*room")

反应式消息侦听器容器

Spring Data 提供代表用户完成转换和订阅状态管理的所有繁重工作。ReactiveRedisMessageListenerContainerSpring中文文档

消息侦听器容器本身不需要外部线程处理资源。它使用驱动程序线程发布消息。Spring中文文档

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));

若要等待并确保正确订阅,可以使用返回 . 由于完成对给定主题的订阅,生成的将完成内部发布者。通过拦截信号,可以同步服务器端订阅。receiveLaterMono<Flux<ChannelMessage>>MonoonNextSpring中文文档

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));

stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
    .flatMapMany(Function.identity())
    .…;

通过模板 API 订阅

如上所述,您可以直接用于订阅频道/模式。这种方法 提供了一个简单但有限的解决方案,因为您失去了在初始订阅后添加订阅的选项 的。尽管如此,您仍然可以通过返回的消息流来控制消息流,例如。.什么时候 完成读取,在错误或取消时,所有绑定的资源将再次释放。ReactiveRedisTemplateFluxtake(Duration)Spring中文文档

redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {
    // message processing ...
}).subscribe();