Pub/Sub Messaging

Spring Data provides dedicated messaging integration for Redis, similar in functionality and naming to the JMS integration in Spring Framework.spring-doc.cn

Redis messaging can be roughly divided into two areas of functionality:spring-doc.cn

This is an example of the pattern often called Publish/Subscribe (Pub/Sub for short). The RedisTemplate class is used for message production. For asynchronous reception similar to Java EE’s message-driven bean style, Spring Data provides a dedicated message listener container that is used to create Message-Driven POJOs (MDPs) and, for synchronous reception, the RedisConnection contract.spring-doc.cn

The org.springframework.data.redis.connection and org.springframework.data.redis.listener packages provide the core functionality for Redis messaging.spring-doc.cn

Publishing (Sending Messages)

To publish a message, you can use, as with the other operations, either the low-level [Reactive]RedisConnection or the high-level [Reactive]RedisOperations. Both entities offer the publish method, which accepts the message and the destination channel as arguments. While RedisConnection requires raw data (array of bytes), the [Reactive]RedisOperations lets arbitrary objects be passed in as messages, as shown in the following example:spring-doc.cn

// send message through connection
RedisConnection con = …
byte[] msg = …
byte[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through RedisOperations
RedisOperations operations = …
Long numberOfClients = operations.convertAndSend("hello!", "world");
// send message through connection
ReactiveRedisConnection con = …
ByteBuffer[] msg = …
ByteBuffer[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through ReactiveRedisOperations
ReactiveRedisOperations operations = …
Mono<Long> numberOfClients = operations.convertAndSend("hello!", "world");

Subscribing (Receiving Messages)

On the receiving side, one can subscribe to one or multiple channels either by naming them directly or by using pattern matching. The latter approach is quite useful, as it not only lets multiple subscriptions be created with one command but can also listen on channels not yet created at subscription time (as long as they match the pattern).spring-doc.cn

At the low-level, RedisConnection offers the subscribe and pSubscribe methods that map the Redis commands for subscribing by channel or by pattern, respectively. Note that multiple channels or patterns can be used as arguments. To change the subscription of a connection or query whether it is listening, RedisConnection provides the getSubscription and isSubscribed methods.spring-doc.cn

Subscription commands in Spring Data Redis are blocking. That is, calling subscribe on a connection causes the current thread to block as it starts waiting for messages. The thread is released only if the subscription is canceled, which happens when another thread invokes unsubscribe or pUnsubscribe on the same connection. See “Message Listener Containers” (later in this document) for a solution to this problem.

As mentioned earlier, once subscribed, a connection starts waiting for messages. Only commands that add new subscriptions, modify existing subscriptions, and cancel existing subscriptions are allowed. Invoking anything other than subscribe, pSubscribe, unsubscribe, or pUnsubscribe throws an exception.spring-doc.cn

In order to subscribe to messages, one needs to implement the MessageListener callback. Each time a new message arrives, the callback gets invoked and the user code gets run by the onMessage method. The interface gives access not only to the actual message but also to the channel it has been received through and the pattern (if any) used by the subscription to match the channel. This information lets the callee differentiate between various messages not just by content but also examining additional details.spring-doc.cn

Message Listener Containers

Due to its blocking nature, low-level subscription is not attractive, as it requires connection and thread management for every single listener. To alleviate this problem, Spring Data offers RedisMessageListenerContainer, which does all the heavy lifting. If you are familiar with EJB and JMS, you should find the concepts familiar, as it is designed to be as close as possible to the support in Spring Framework and its message-driven POJOs (MDPs).spring-doc.cn

RedisMessageListenerContainer acts as a message listener container. It is used to receive messages from a Redis channel and drive the MessageListener instances that are injected into it. The listener container is responsible for all threading of message reception and dispatches into the listener for processing. A message listener container is the intermediary between an MDP and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like. This lets you as an application developer write the (possibly complex) business logic associated with receiving a message (and reacting to it) and delegates boilerplate Redis infrastructure concerns to the framework.spring-doc.cn

A MessageListener can additionally implement SubscriptionListener to receive notifications upon subscription/unsubscribe confirmation. Listening to subscription notifications can be useful when synchronizing invocations.spring-doc.cn

Furthermore, to minimize the application footprint, RedisMessageListenerContainer lets one connection and one thread be shared by multiple listeners even though they do not share a subscription. Thus, no matter how many listeners or channels an application tracks, the runtime cost remains the same throughout its lifetime. Moreover, the container allows runtime configuration changes so that you can add or remove listeners while an application is running without the need for a restart. Additionally, the container uses a lazy subscription approach, using a RedisConnection only when needed. If all the listeners are unsubscribed, cleanup is automatically performed, and the thread is released.spring-doc.cn

To help with the asynchronous nature of messages, the container requires a java.util.concurrent.Executor (or Spring’s TaskExecutor) for dispatching the messages. Depending on the load, the number of listeners, or the runtime environment, you should change or tweak the executor to better serve your needs. In particular, in managed environments (such as app servers), it is highly recommended to pick a proper TaskExecutor to take advantage of its runtime.spring-doc.cn

The MessageListenerAdapter

The MessageListenerAdapter class is the final component in Spring’s asynchronous messaging support. In a nutshell, it lets you expose almost any class as a MDP (though there are some constraints).spring-doc.cn

Consider the following interface definition:spring-doc.cn

public interface MessageDelegate {
  void handleMessage(String message);
  void handleMessage(Map message);
  void handleMessage(byte[] message);
  void handleMessage(Serializable message);
  // pass the channel/pattern as well
  void handleMessage(Serializable message, String channel);
 }

Notice that, although the interface does not extend the MessageListener interface, it can still be used as a MDP by using the MessageListenerAdapter class. Notice also how the various message handling methods are strongly typed according to the contents of the various Message types that they can receive and handle. In addition, the channel or pattern to which a message is sent can be passed in to the method as the second argument of type String:spring-doc.cn

public class DefaultMessageDelegate implements MessageDelegate {
  // implementation elided for clarity...
}
Notice how the above implementation of the `MessageDelegate` interface (the above `DefaultMessageDelegate` class) has *no* Redis dependencies at all. It truly is a POJO that we make into an MDP with the following configuration:
@Configuration
class MyConfig {

  // …

  @Bean
  DefaultMessageDelegate listener() {
    return new DefaultMessageDelegate();
  }

  @Bean
  MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {
    return new MessageListenerAdapter(listener, "handleMessage");
  }

  @Bean
  RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(listener, ChannelTopic.of("chatroom"));
    return container;
  }
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:redis="http://www.springframework.org/schema/redis"
   xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/redis https://www.springframework.org/schema/redis/spring-redis.xsd">

<!-- the default ConnectionFactory -->
<redis:listener-container>
  <!-- the method attribute can be skipped as the default method name is "handleMessage" -->
  <redis:listener ref="listener" method="handleMessage" topic="chatroom" />
</redis:listener-container>

<bean id="listener" class="redisexample.DefaultMessageDelegate"/>
 ...
</beans>
The listener topic can be either a channel (for example, topic="chatroom") or a pattern (for example, topic="*room")

The preceding example uses the Redis namespace to declare the message listener container and automatically register the POJOs as listeners. The full-blown beans definition follows:spring-doc.cn

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
  <constructor-arg>
    <bean class="redisexample.DefaultMessageDelegate"/>
  </constructor-arg>
</bean>

<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="messageListeners">
    <map>
      <entry key-ref="messageListener">
        <bean class="org.springframework.data.redis.listener.ChannelTopic">
          <constructor-arg value="chatroom"/>
        </bean>
      </entry>
    </map>
  </property>
</bean>

Each time a message is received, the adapter automatically and transparently performs translation (using the configured RedisSerializer) between the low-level format and the required object type. Any exception caused by the method invocation is caught and handled by the container (by default, exceptions get logged).spring-doc.cn

Reactive Message Listener Container

Spring Data offers ReactiveRedisMessageListenerContainer which does all the heavy lifting of conversion and subscription state management on behalf of the user.spring-doc.cn

The message listener container itself does not require external threading resources. It uses the driver threads to publish messages.spring-doc.cn

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));

To await and ensure proper subscription, you can use the receiveLater method that returns a Mono<Flux<ChannelMessage>>. The resulting Mono completes with an inner publisher as a result of completing the subscription to the given topics. By intercepting onNext signals, you can synchronize server-side subscriptions.spring-doc.cn

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));

stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
    .flatMapMany(Function.identity())
    .…;

Subscribing via template API

As mentioned above you can directly use ReactiveRedisTemplate to subscribe to channels / patterns. This approach offers a straight forward, though limited solution as you lose the option to add subscriptions after the initial ones. Nevertheless you still can control the message stream via the returned Flux using eg. take(Duration). When done reading, on error or cancellation all bound resources are freed again.spring-doc.cn

redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {
    // message processing ...
}).subscribe();