对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

Spring Integration 2.1 引入了对 Redis 的支持:“一个开源的高级键值存储”。 这种支持以基于 Redis 的形式出现,以及 Redis 通过其 PUBLISH、SUBSCRIBEUNSUBSCRIBE 命令支持的发布-订阅消息传递适配器。MessageStoreSpring中文文档

您需要将此依赖项包含在项目中:Spring中文文档

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
    <version>6.2.6</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:6.2.6"

您还需要包括 Redis 客户端依赖项,例如 Lettuce。Spring中文文档

要下载、安装和运行 Redis,请参阅 Redis 文档Spring中文文档

连接到 Redis

要开始与 Redis 交互,您首先需要连接到它。 Spring Integration 使用另一个 Spring 项目 Spring Data Redis 提供的支持,它提供了典型的 Spring 构造:和 . 这些抽象简化了与多个 Redis 客户端 Java API 的集成。 目前,Spring Data Redis 支持 JedisLettuceConnectionFactoryTemplateSpring中文文档

RedisConnectionFactory

要连接到 Redis,您可以使用接口的实现之一。 以下列表显示了接口定义:RedisConnectionFactorySpring中文文档

public interface RedisConnectionFactory extends PersistenceExceptionTranslator {

    /**
     * Provides a suitable connection for interacting with Redis.
     * @return connection for interacting with Redis.
     */
    RedisConnection getConnection();
}

以下示例演示如何在 Java 中创建一个:LettuceConnectionFactorySpring中文文档

LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();

以下示例显示了如何在 Spring 的 XML 配置中创建一个:LettuceConnectionFactorySpring中文文档

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

的实现提供了一组属性,例如端口和主机,您可以根据需要设置这些属性。 一旦你有了 的实例,你就可以创建一个 的实例,并用 注入 .RedisConnectionFactoryRedisConnectionFactoryRedisTemplateRedisConnectionFactorySpring中文文档

RedisTemplate

与 Spring 中的其他模板类(如 和 )一样,是一个简化 Redis 数据访问代码的帮助程序类。 有关及其变体(例如)的更多信息,请参阅 Spring Data Redis 文档JdbcTemplateJmsTemplateRedisTemplateRedisTemplateStringRedisTemplateSpring中文文档

以下示例演示如何在 Java 中创建实例:RedisTemplateSpring中文文档

RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);

以下示例显示了如何在 Spring 的 XML 配置中创建实例:RedisTemplateSpring中文文档

<bean id="redisTemplate"
         class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

使用 Redis 进行消息传递

简介中所述,Redis 通过其 、 和 命令提供对发布-订阅消息传递的支持。 与 JMS 和 AMQP 一样,Spring Integration 提供了消息通道和适配器,用于通过 Redis 发送和接收消息。PUBLISHSUBSCRIBEUNSUBSCRIBESpring中文文档

Redis 发布/订阅频道

与 JMS 类似,在某些情况下,生产者和使用者都打算成为同一应用程序的一部分,在同一进程中运行。 您可以使用一对入站和出站通道适配器来实现此目的。 但是,与Spring Integration的JMS支持一样,有一种更简单的方法可以解决此用例。 您可以创建发布-订阅频道,如以下示例所示:Spring中文文档

<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>

A 的行为与主 Spring Integration 命名空间中的普通元素非常相似。 它可以由任何终结点的属性引用。 不同之处在于,此通道由 Redis 主题名称提供支持:该属性指定的值。 但是,与 JMS 不同的是,此主题不必提前创建,甚至不必由 Redis 自动创建。 在 Redis 中,主题是扮演地址角色的简单值。 生产者和使用者可以使用与其主题名称相同的值进行通信。 对此通道的简单订阅意味着可以在生产终结点和使用终结点之间进行异步发布-订阅消息传递。 但是,与通过在简单的 Spring Integration 元素中添加元素创建的异步消息通道不同,消息不会存储在内存队列中。 相反,这些消息是通过 Redis 传递的,这使您可以依赖它对持久性和集群的支持,以及它与其他非 Java 平台的互操作性。publish-subscribe-channel<publish-subscribe-channel/>input-channeloutput-channelStringtopic-nameStringString<queue/><channel/>Spring中文文档

Redis 入站通道适配器

Redis 入站通道适配器 () 以与其他入站适配器相同的方式将传入的 Redis 消息改编为 Spring 消息。 它接收特定于平台的消息(在本例中为 Redis),并使用策略将它们转换为 Spring 消息。 以下示例演示如何配置 Redis 入站通道适配器:RedisInboundChannelAdapterMessageConverterSpring中文文档

<int-redis:inbound-channel-adapter id="redisAdapter"
       topics="thing1, thing2"
       channel="receiveChannel"
       error-channel="testErrorChannel"
       message-converter="testConverter" />

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

前面的示例显示了 Redis 入站通道适配器的简单但完整的配置。 请注意,前面的配置依赖于熟悉的 Spring 范式,即自动发现某些 Bean。 在这种情况下,将隐式注入到适配器中。 您可以改用该属性显式指定它。redisConnectionFactoryconnection-factorySpring中文文档

另请注意,上述配置会向适配器注入自定义 . 该方法类似于 JMS,其中实例用于在 Redis 消息和 Spring Integration 消息有效负载之间进行转换。 默认值为 .MessageConverterMessageConverterSimpleMessageConverterSpring中文文档

入站适配器可以订阅多个主题名称,因此属性中的值集以逗号分隔。topicsSpring中文文档

从版本 3.0 开始,除了现有属性外,入站适配器现在还具有该属性。 此属性包含一组以逗号分隔的 Redis 主题模式。 有关 Redis 发布-订阅的更多信息,请参阅 Redis 发布/订阅topicstopic-patternsSpring中文文档

入站适配器可以使用 a 来反序列化 Redis 消息的正文。 的属性可以设置为空字符串,这会导致属性的值。 在这种情况下,Redis 消息的原始正文将作为消息负载提供。RedisSerializerserializer<int-redis:inbound-channel-adapter>nullRedisSerializerbyte[]Spring中文文档

从版本 5.0 开始,您可以使用 的属性向入站适配器提供实例。 此外,收到的 Spring Integration 消息现在具有指示已发布消息来源的标题:主题或模式。 您可以将此下游用于路由逻辑。Executortask-executor<int-redis:inbound-channel-adapter>RedisHeaders.MESSAGE_SOURCESpring中文文档

Redis 出站通道适配器

Redis 出站通道适配器以与其他出站适配器相同的方式将传出的 Spring Integration 消息改编为 Redis 消息。 它接收 Spring Integration 消息,并使用策略将它们转换为特定于平台的消息(在本例中为 Redis)。 以下示例演示如何配置 Redis 出站通道适配器:MessageConverterSpring中文文档

<int-redis:outbound-channel-adapter id="outboundAdapter"
    channel="sendChannel"
    topic="thing1"
    message-converter="testConverter"/>

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379"/>
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

该配置与 Redis 入站通道适配器相似。 适配器隐式注入 ,该 定义为 作为其 Bean 名称。 此示例还包括可选的(和自定义的)(bean)。RedisConnectionFactoryredisConnectionFactoryMessageConvertertestConverterSpring中文文档

从 Spring Integration 3.0 开始,该属性提供了该属性的替代方法:您可以使用该属性在运行时确定消息的 Redis 主题。 这些属性是互斥的。<int-redis:outbound-channel-adapter>topictopic-expressionSpring中文文档

Redis 队列入站通道适配器

Spring Integration 3.0 引入了一个队列入站通道适配器,用于从 Redis 列表中“弹出”消息。 默认情况下,它使用“右弹出”,但您可以将其配置为使用“左弹出”。 适配器是消息驱动的。 它使用内部侦听器线程,不使用轮询器。Spring中文文档

以下列表显示了 : 的所有可用属性:queue-inbound-channel-adapterSpring中文文档

<int-redis:queue-inbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    auto-startup=""  (3)
                    phase=""  (4)
                    connection-factory=""  (5)
                    queue=""  (6)
                    error-channel=""  (7)
                    serializer=""  (8)
                    receive-timeout=""  (9)
                    recovery-interval=""  (10)
                    expect-message=""  (11)
                    task-executor=""  (12)
                    right-pop=""/>  (13)
1 组件 Bean 名称。 如果未提供该属性,那么将在应用程序上下文中创建并注册此属性作为 Bean 名称。 在本例中,端点本身是使用 Bean name plus 注册的。 (如果 Bean 名称为 ,则端点注册为 。channelDirectChannelidid.adapterthing1thing1.adapter
2 要从此终端节点将实例发送到的实例。MessageChannelMessage
3 一个属性,用于指定此终结点是否应在应用程序上下文启动后自动启动。 它默认为 。SmartLifecycletrue
4 用于指定启动此终结点的阶段的属性。 它默认为 。SmartLifecycle0
5 对 Bean 的引用。 它默认为 。RedisConnectionFactoryredisConnectionFactory
6 执行基于队列的“pop”操作以获取 Redis 消息的 Redis 列表的名称。
7 当终端节点的侦听任务收到异常时,要将实例发送到的实例。 默认情况下,基础使用应用程序上下文中的默认值。MessageChannelErrorMessageMessagePublishingErrorHandlererrorChannel
8 Bean 引用。 它可以是空字符串,表示“无序列化程序”。 在这种情况下,来自入站 Redis 消息的原始消息将作为有效负载发送到 。 默认情况下,它是一个 .RedisSerializerbyte[]channelMessageJdkSerializationRedisSerializer
9 “pop”操作等待队列中的 Redis 消息的超时时间(以毫秒为单位)。 默认值为 1 秒。
10 在重新启动侦听器任务之前,侦听器任务在“pop”操作异常后应休眠的时间(以毫秒为单位)。
11 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。 如果此属性设置为 ,则不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。 其默认值为 。Messagetrueserializerfalse
12 对 Spring(或标准 JDK 1.5+)bean 的引用。 它用于基础侦听任务。 它默认为 .TaskExecutorExecutorSimpleAsyncTaskExecutor
13 指定此终结点是否应使用“right pop”(当)或“left pop”(何时)从 Redis 列表中读取消息。 如果 ,则 Redis 列表在与默认 Redis 队列出站通道适配器一起使用时充当队列。 将其设置为与通过“右键推送”写入列表的软件一起使用,或实现类似堆栈的消息顺序。 其默认值为 。 从版本 4.3 开始。truefalsetrueFIFOfalsetrue
必须配置多个线程进行处理;否则,在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况中。 有关可能的实现,请参阅 Spring Framework 参考手册task-executorRedisQueueMessageDrivenEndpointerrorChannelTaskExecutor

Redis 队列出站通道适配器

Spring Integration 3.0 引入了一个队列出站通道适配器,用于从 Spring Integration 消息“推送”到 Redis 列表。 默认情况下,它使用“左推”,但您可以将其配置为使用“右推”。 以下列表显示了 Redis 的所有可用属性:queue-outbound-channel-adapterSpring中文文档

<int-redis:queue-outbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    connection-factory=""  (3)
                    queue=""  (4)
                    queue-expression=""  (5)
                    serializer=""  (6)
                    extract-payload=""  (7)
                    left-push=""/>  (8)
1 组件 Bean 名称。 如果未提供该属性,那么将在应用程序上下文中创建并注册此属性作为 Bean 名称。 在本例中,终端节点的注册 Bean 名称为 plus 。 (如果 Bean 名称为 ,则端点注册为 。channelDirectChannelidid.adapterthing1thing1.adapter
2 此终端节点从中接收实例。MessageChannelMessage
3 对 Bean 的引用。 它默认为 。RedisConnectionFactoryredisConnectionFactory
4 执行基于队列的“推送”操作以发送 Redis 消息的 Redis 列表的名称。 此属性与 互斥。queue-expression
5 用于确定 Redis 列表名称的 SpEL。 它使用运行时的传入作为变量。 此属性与 互斥。ExpressionMessage#rootqueue
6 Bean 引用。 它默认为 . 但是,对于有效负载,如果未提供参考,则使用 a。RedisSerializerJdkSerializationRedisSerializerStringStringRedisSerializerserializer
7 指定此终端节点是仅向 Redis 队列发送有效负载还是将整个负载发送。 它默认为 。Messagetrue
8 指定此终结点应使用“左推”(当)还是“右推”(何时)将消息写入 Redis 列表。 如果 ,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当队列。 将其设置为与使用“左弹出”从列表中读取的软件一起使用,或实现类似堆栈的消息顺序。 它默认为 。 从版本 4.3 开始。truefalsetrueFIFOfalsetrue

Redis 应用程序事件

从 Spring Integration 3.0 开始,Redis 模块提供了 的实现,而 又是 . 封装了 Redis 操作的异常(终端节点是事件的“源”)。 例如,在捕获操作中的异常后发出这些事件。 例外可以是任何泛型或 . 使用 an 处理这些事件对于确定后台 Redis 任务的问题和采取管理操作非常有用。IntegrationEventorg.springframework.context.ApplicationEventRedisExceptionEvent<int-redis:queue-inbound-channel-adapter/>BoundListOperations.rightPoporg.springframework.data.redis.RedisSystemExceptionorg.springframework.data.redis.RedisConnectionFailureException<int-event:inbound-channel-adapter/>Spring中文文档

1 组件 Bean 名称。 如果未提供该属性,那么将在应用程序上下文中创建并注册此属性作为 Bean 名称。 在本例中,端点本身是使用 Bean name plus 注册的。 (如果 Bean 名称为 ,则端点注册为 。channelDirectChannelidid.adapterthing1thing1.adapter
2 要从此终端节点将实例发送到的实例。MessageChannelMessage
3 一个属性,用于指定此终结点是否应在应用程序上下文启动后自动启动。 它默认为 。SmartLifecycletrue
4 用于指定启动此终结点的阶段的属性。 它默认为 。SmartLifecycle0
5 对 Bean 的引用。 它默认为 。RedisConnectionFactoryredisConnectionFactory
6 执行基于队列的“pop”操作以获取 Redis 消息的 Redis 列表的名称。
7 当终端节点的侦听任务收到异常时,要将实例发送到的实例。 默认情况下,基础使用应用程序上下文中的默认值。MessageChannelErrorMessageMessagePublishingErrorHandlererrorChannel
8 Bean 引用。 它可以是空字符串,表示“无序列化程序”。 在这种情况下,来自入站 Redis 消息的原始消息将作为有效负载发送到 。 默认情况下,它是一个 .RedisSerializerbyte[]channelMessageJdkSerializationRedisSerializer
9 “pop”操作等待队列中的 Redis 消息的超时时间(以毫秒为单位)。 默认值为 1 秒。
10 在重新启动侦听器任务之前,侦听器任务在“pop”操作异常后应休眠的时间(以毫秒为单位)。
11 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。 如果此属性设置为 ,则不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。 其默认值为 。Messagetrueserializerfalse
12 对 Spring(或标准 JDK 1.5+)bean 的引用。 它用于基础侦听任务。 它默认为 .TaskExecutorExecutorSimpleAsyncTaskExecutor
13 指定此终结点是否应使用“right pop”(当)或“left pop”(何时)从 Redis 列表中读取消息。 如果 ,则 Redis 列表在与默认 Redis 队列出站通道适配器一起使用时充当队列。 将其设置为与通过“右键推送”写入列表的软件一起使用,或实现类似堆栈的消息顺序。 其默认值为 。 从版本 4.3 开始。truefalsetrueFIFOfalsetrue
必须配置多个线程进行处理;否则,在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况中。 有关可能的实现,请参阅 Spring Framework 参考手册task-executorRedisQueueMessageDrivenEndpointerrorChannelTaskExecutor
1 组件 Bean 名称。 如果未提供该属性,那么将在应用程序上下文中创建并注册此属性作为 Bean 名称。 在本例中,终端节点的注册 Bean 名称为 plus 。 (如果 Bean 名称为 ,则端点注册为 。channelDirectChannelidid.adapterthing1thing1.adapter
2 此终端节点从中接收实例。MessageChannelMessage
3 对 Bean 的引用。 它默认为 。RedisConnectionFactoryredisConnectionFactory
4 执行基于队列的“推送”操作以发送 Redis 消息的 Redis 列表的名称。 此属性与 互斥。queue-expression
5 用于确定 Redis 列表名称的 SpEL。 它使用运行时的传入作为变量。 此属性与 互斥。ExpressionMessage#rootqueue
6 Bean 引用。 它默认为 . 但是,对于有效负载,如果未提供参考,则使用 a。RedisSerializerJdkSerializationRedisSerializerStringStringRedisSerializerserializer
7 指定此终端节点是仅向 Redis 队列发送有效负载还是将整个负载发送。 它默认为 。Messagetrue
8 指定此终结点应使用“左推”(当)还是“右推”(何时)将消息写入 Redis 列表。 如果 ,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当队列。 将其设置为与使用“左弹出”从列表中读取的软件一起使用,或实现类似堆栈的消息顺序。 它默认为 。 从版本 4.3 开始。truefalsetrueFIFOfalsetrue

Redis 消息存储

《企业集成模式 (EIP)》一书中所述,消息存储库允许您持久化消息。 当可靠性受到关注时,处理能够缓冲消息的组件(聚合器、重排序器等)时,这可能很有用。 在 Spring Integration 中,该策略还为声明检查模式提供了基础,这在 EIP 中也有描述。MessageStoreSpring中文文档

Spring Integration 的 Redis 模块提供了 . 以下示例演示如何将其与聚合器一起使用:RedisMessageStoreSpring中文文档

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
    <constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="redisMessageStore"/>

前面的示例是一个 Bean 配置,它需要 a 作为构造函数参数。RedisConnectionFactorySpring中文文档

缺省情况下,使用 Java 序列化来序列化消息。 但是,如果要使用不同的序列化技术(如 JSON),可以通过设置 .RedisMessageStorevalueSerializerRedisMessageStoreSpring中文文档

从版本 4.3.10 开始,该框架分别为实例和实例提供了 Jackson 序列化程序和反序列化程序实现。 它们必须配置为 . 此外,还应设置 为每个序列化的复杂对象添加类型信息(如果您信任源)。 然后,在反序列化过程中使用该类型信息。 该框架提供了一个名为 的实用工具方法,该方法已提供前面提到的所有属性和序列化程序。 此实用程序方法附带了限制 Java 包进行反序列化的参数,以避免安全漏洞。 默认受信任的包:、、、、、。 若要在 中管理 中的 JSON 序列化,必须以类似于以下示例的方式对其进行配置:MessageMessageHeadersMessageJacksonDeserializerMessageHeadersJacksonSerializerSimpleModuleObjectMapperenableDefaultTypingObjectMapperJacksonJsonUtils.messagingAwareMapper()trustedPackagesjava.utiljava.langorg.springframework.messaging.supportorg.springframework.integration.supportorg.springframework.integration.messageorg.springframework.integration.storeRedisMessageStoreSpring中文文档

RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);

从版本 4.3.12 开始,支持允许区分同一 Redis 服务器上的存储实例的选项。RedisMessageStoreprefixSpring中文文档

Redis 通道消息存储

前面显示的将每个组维护为单个键(组 ID)下的值。 虽然您可以使用它来支持持久性,但为此目的提供了专用(从 4.0 版开始)。 此存储在发送消息和接收消息时对每个通道使用 a。 默认情况下,此存储还使用 JDK 序列化,但您可以修改值序列化程序,如前所述RedisMessageStoreQueueChannelRedisChannelMessageStoreLISTLPUSHRPOPSpring中文文档

我们建议使用此商店支持渠道,而不是使用一般的 . 以下示例定义了一个 Redis 消息存储库,并在具有队列的通道中使用它:RedisMessageStoreSpring中文文档

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
	<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="redisMessageStore"/>
<int:channel>

用于存储数据的键的格式为:(在前面的示例中为 )。<storeBeanName>:<channelId>redisMessageStore:somePersistentQueueChannelSpring中文文档

此外,还提供了一个子类。 当您将其与 一起使用时,消息将按 (FIFO) 优先级顺序接收。 它使用标准标头并支持优先级值 ()。 具有其他优先级的消息(和没有优先级的消息)将按 FIFO 顺序在任何具有优先级的消息之后检索。RedisChannelPriorityMessageStoreQueueChannelIntegrationMessageHeaderAccessor.PRIORITY0 - 9Spring中文文档

这些商店仅实现和不实现。 它们只能用于支持 .BasicMessageGroupStoreMessageGroupStoreQueueChannel
这些商店仅实现和不实现。 它们只能用于支持 .BasicMessageGroupStoreMessageGroupStoreQueueChannel

Redis 元数据存储

Spring Integration 3.0 引入了一个新的基于 Redis 的 MetadataStore(请参阅元数据存储)实现。 可以使用 来维护跨应用程序重新启动的状态。 您可以将此新实现与适配器一起使用,例如:RedisMetadataStoreMetadataStoreMetadataStoreSpring中文文档

要指示这些适配器使用新的 ,请声明一个名为 的 Spring bean。 Feed 入站通道适配器和 Feed 入站通道适配器都会自动拾取并使用声明的 . 下面的示例演示如何声明这样的 Bean:RedisMetadataStoremetadataStoreRedisMetadataStoreSpring中文文档

<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
    <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

RedisProperties 提供支持。 与它的交互使用 BoundHashOperations,而 BoundHashOperations 又需要整个存储。 在 的情况下,它扮演一个区域的角色,这在分布式环境中很有用,当多个应用程序使用同一个 Redis 服务器时。 默认情况下,它的值为 。RedisMetadataStorekeyPropertiesMetadataStorekeykeyMetaDataSpring中文文档

从版本 4.0 开始,此存储实现了 ,允许它在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改密钥的值。ConcurrentMetadataStoreSpring中文文档

您不能将 (例如,在 中 )与 Redis 集群一起使用,因为当前不支持 atomicity 命令。RedisMetadataStore.replace()AbstractPersistentAcceptOnceFileListFilterWATCH
您不能将 (例如,在 中 )与 Redis 集群一起使用,因为当前不支持 atomicity 命令。RedisMetadataStore.replace()AbstractPersistentAcceptOnceFileListFilterWATCH

Redis Store 入站通道适配器

Redis 存储入站通道适配器是一个轮询使用者,它从 Redis 集合中读取数据并将其作为有效负载发送。 以下示例演示如何配置 Redis 存储入站通道适配器:MessageSpring中文文档

<int-redis:store-inbound-channel-adapter id="listAdapter"
    connection-factory="redisConnectionFactory"
    key="myCollection"
    channel="redisChannel"
    collection-type="LIST" >
    <int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>

前面的示例演示如何使用元素配置 Redis 存储入站通道适配器,并为各种属性提供值,例如:store-inbound-channel-adapterSpring中文文档

  • key或 :正在使用的集合的密钥的名称。key-expressionSpring中文文档

  • collection-type:此适配器支持的集合类型的枚举。 支持的集合包括 、 、 和 。LISTSETZSETPROPERTIESMAPSpring中文文档

  • connection-factory:对 的实例的引用。o.s.data.redis.connection.RedisConnectionFactorySpring中文文档

  • redis-template:对 的实例的引用。o.s.data.redis.core.RedisTemplateSpring中文文档

  • 所有其他入站适配器通用的其他属性(例如“通道”)。Spring中文文档

不能同时设置 和 。redis-templateconnection-factory

默认情况下,适配器使用 . 这将实例用于键、值、哈希键和哈希值。 如果您的 Redis 存储包含使用其他技术序列化的对象,则必须提供配置了适当序列化程序的对象。 例如,如果使用设置为 的 Redis 存储出站适配器写入存储,则必须提供配置如下的存储:StringRedisTemplateStringRedisSerializerRedisTemplateextract-payload-elementsfalseRedisTemplateSpring中文文档

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
    <property name="keySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
    <property name="hashKeySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
</bean>

对键和哈希键使用序列化程序,对值和哈希值使用默认的 JDK 序列化序列化器。RedisTemplateStringSpring中文文档

因为它具有 的文本值,所以前面的示例相对简单且静态。 有时,您可能需要根据某些条件在运行时更改密钥的值。 为此,请改用,其中提供的表达式可以是任何有效的 SpEL 表达式。keykey-expressionSpring中文文档

此外,您可能希望对从 Redis 集合中读取的成功处理的数据执行一些后处理。 例如,您可能希望在处理值后移动或删除该值。 您可以使用 Spring Integration 2.2 中添加的事务同步功能来执行此操作。 以下示例使用 and 事务同步:key-expressionSpring中文文档

<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
        connection-factory="redisConnectionFactory"
        key-expression="'presidents'"
        channel="otherRedisChannel"
        auto-startup="false"
        collection-type="ZSET">
            <int:poller fixed-rate="1000" max-messages-per-poll="2">
                <int:transactional synchronization-factory="syncFactory"/>
            </int:poller>
</int-redis:store-inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
	<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

您可以使用元素将轮询器声明为事务性轮询器。 此元素可以引用实际的事务管理器(例如,如果流的其他部分调用 JDBC)。 如果你没有“真实”事务,你可以使用 ,它是 Spring 的实现,可以在没有实际事务时使用 Redis 适配器的事务同步功能。transactionalo.s.i.transaction.PseudoTransactionManagerPlatformTransactionManagerSpring中文文档

这不会使 Redis 活动本身成为事务性活动。 它允许在成功(提交)或失败(回滚)之前或之后执行操作的同步。

轮询器是事务性的,就可以设置 on 元素的实例。 创建 . 为方便起见,我们公开了默认的基于 SpEL 的 ,它允许您配置 SpEL 表达式,其执行与事务协调(同步)。 支持提交前、提交后和回滚后的表达式,以及发送评估结果(如果有)的通道(每种事件一个)。 对于每个子元素,您可以指定和属性。 如果仅存在该属性,则接收到的消息将作为特定同步方案的一部分发送到该位置。 如果仅存在该属性,并且表达式的结果是非 null 值,则会生成一条消息,并将结果作为有效负载发送到默认通道 () 并显示在日志中(在级别上)。 如果希望评估结果转到特定通道,请添加属性。 如果表达式的结果为 null 或 void,则不会生成任何消息。o.s.i.transaction.TransactionSynchronizationFactorytransactionalTransactionSynchronizationFactoryTransactionSynchronizationTransactionSynchronizationFactoryexpressionchannelchannelexpressionNullChannelDEBUGchannelSpring中文文档

该添加一个属性,其中包含绑定到事务的实例,该实例可以从实现访问。RedisStoreMessageSourcestoreRedisStoreIntegrationResourceHolderTransactionSynchronizationProcessorSpring中文文档

有关事务同步的详细信息,请参阅事务同步Spring中文文档

不能同时设置 和 。redis-templateconnection-factory

默认情况下,适配器使用 . 这将实例用于键、值、哈希键和哈希值。 如果您的 Redis 存储包含使用其他技术序列化的对象,则必须提供配置了适当序列化程序的对象。 例如,如果使用设置为 的 Redis 存储出站适配器写入存储,则必须提供配置如下的存储:StringRedisTemplateStringRedisSerializerRedisTemplateextract-payload-elementsfalseRedisTemplateSpring中文文档

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
    <property name="keySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
    <property name="hashKeySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
</bean>

对键和哈希键使用序列化程序,对值和哈希值使用默认的 JDK 序列化序列化器。RedisTemplateStringSpring中文文档

这不会使 Redis 活动本身成为事务性活动。 它允许在成功(提交)或失败(回滚)之前或之后执行操作的同步。

RedisStore 出站通道适配器

RedisStore 出站通道适配器允许您将消息负载写入 Redis 集合,如以下示例所示:Spring中文文档

<int-redis:store-outbound-channel-adapter id="redisListAdapter"
          collection-type="LIST"
          channel="requestChannel"
          key="myCollection" />

前面的配置是 Redis 使用元素存储出站通道适配器。 它为各种属性提供值,例如:store-inbound-channel-adapterSpring中文文档

  • key或 :正在使用的集合的密钥的名称。key-expressionSpring中文文档

  • extract-payload-elements:如果设置为(默认值)并且有效负载是“多值”对象(即 a 或 a)的实例,则使用“addAll”和“putAll”语义存储它。 否则,如果设置为 ,则有效负载将存储为单个条目,而不管其类型如何。 如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且有效负载始终存储为单个条目。trueCollectionMapfalseSpring中文文档

  • collection-type:此适配器支持的类型的枚举。 支持的集合包括 、 、 和 。CollectionLISTSETZSETPROPERTIESMAPSpring中文文档

  • map-key-expression:SpEL 表达式,用于返回正在存储的条目的键的名称。 仅当 is 或 和 'extract-payload-elements' 为 false 时,它才适用。collection-typeMAPPROPERTIESSpring中文文档

  • connection-factory:对 的实例的引用。o.s.data.redis.connection.RedisConnectionFactorySpring中文文档

  • redis-template:对 的实例的引用。o.s.data.redis.core.RedisTemplateSpring中文文档

  • 所有其他入站适配器通用的其他属性(例如“通道”)。Spring中文文档

不能同时设置 和 。redis-templateconnection-factory
默认情况下,适配器使用 . 这将实例用于键、值、哈希键和哈希值。 但是,如果设置为 ,则将使用具有键和哈希键实例以及值和哈希值实例 s 的实例。 使用 JDK 序列化程序时,必须了解 Java 序列化用于所有值,而不管该值是否实际是集合。 如果需要对值的序列化进行更多控制,请考虑提供自己的值,而不是依赖这些默认值。StringRedisTemplateStringRedisSerializerextract-payload-elementsfalseRedisTemplateStringRedisSerializerJdkSerializationRedisSerializerRedisTemplate

由于它具有 和其他属性的文本值,因此前面的示例相对简单且静态。 有时,您可能需要根据某些条件在运行时动态更改值。 为此,请使用它们的等效项(、 等),其中提供的表达式可以是任何有效的 SpEL 表达式。key-expressionkey-expressionmap-key-expressionSpring中文文档

不能同时设置 和 。redis-templateconnection-factory
默认情况下,适配器使用 . 这将实例用于键、值、哈希键和哈希值。 但是,如果设置为 ,则将使用具有键和哈希键实例以及值和哈希值实例 s 的实例。 使用 JDK 序列化程序时,必须了解 Java 序列化用于所有值,而不管该值是否实际是集合。 如果需要对值的序列化进行更多控制,请考虑提供自己的值,而不是依赖这些默认值。StringRedisTemplateStringRedisSerializerextract-payload-elementsfalseRedisTemplateStringRedisSerializerJdkSerializationRedisSerializerRedisTemplate

Redis 出站命令网关

Spring Integration 4.0 引入了 Redis 命令网关,允许您使用泛型方法执行任何标准 Redis 命令。 以下列表显示了 Redis 出站网关的可用属性:RedisConnection#executeSpring中文文档

<int-redis:outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        redis-template=""  (6)
        arguments-serializer=""  (7)
        command-expression=""  (8)
        argument-expressions=""  (9)
        use-command-variable=""  (10)
        arguments-strategy="" /> (11)
1 此终端节点从中接收实例。MessageChannelMessage
2 此终结点发送应答实例的位置。MessageChannelMessage
3 指定此出站网关是否必须返回非 null 值。 它默认为 。 当 Redis 返回一个值时,将抛出 A。trueReplyRequiredExceptionnull
4 等待回复消息发送前的超时(以毫秒为单位)。 它通常适用于基于队列的有限回复通道。
5 对 Bean 的引用。 它默认为 。 它与“redis-template”属性是互斥的。RedisConnectionFactoryredisConnectionFactory
6 对 Bean 的引用。 它与“connection-factory”属性是互斥的。RedisTemplate
7 对 的实例的引用。 如有必要,它用于将每个命令参数序列化为 byte[]。org.springframework.data.redis.serializer.RedisSerializer
8 返回命令键的 SpEL 表达式。 它默认为消息标头。 它不能计算为 。redis_commandnull
9 以逗号分隔的 SpEL 表达式,这些表达式被计算为命令参数。 与属性互斥。 如果两者都不提供任何属性,则 将用作命令参数。 参数表达式的计算结果可以为“null”,以支持可变数量的参数。arguments-strategypayload
10 一个标志,用于指定是否将评估的 Redis 命令字符串作为 when 配置的表达式评估上下文中的变量提供。 否则,将忽略此属性。boolean#cmdo.s.i.redis.outbound.ExpressionArgumentsStrategyargument-expressions
11 对 的实例的引用。 它与属性是互斥的。 如果两者都不提供任何属性,则 将用作命令参数。o.s.i.redis.outbound.ArgumentsStrategyargument-expressionspayload

您可以将其用作通用组件来执行任何所需的 Redis 操作。 以下示例演示如何从 Redis 原子序数中获取递增值:<int-redis:outbound-gateway>Spring中文文档

<int-redis:outbound-gateway request-channel="requestChannel"
    reply-channel="replyChannel"
    command-expression="'INCR'"/>

有效负载的名称应为 ,该名称可由 Bean 定义提供。MessageredisCounterorg.springframework.data.redis.support.atomic.RedisAtomicIntegerSpring中文文档

该方法的返回类型为泛型。 实际结果取决于命令类型。 例如,返回一个 . 有关命令、其参数和结果类型的更多信息,请参阅 Redis 规范RedisConnection#executeObjectMGETList<byte[]>Spring中文文档

1 此终端节点从中接收实例。MessageChannelMessage
2 此终结点发送应答实例的位置。MessageChannelMessage
3 指定此出站网关是否必须返回非 null 值。 它默认为 。 当 Redis 返回一个值时,将抛出 A。trueReplyRequiredExceptionnull
4 等待回复消息发送前的超时(以毫秒为单位)。 它通常适用于基于队列的有限回复通道。
5 对 Bean 的引用。 它默认为 。 它与“redis-template”属性是互斥的。RedisConnectionFactoryredisConnectionFactory
6 对 Bean 的引用。 它与“connection-factory”属性是互斥的。RedisTemplate
7 对 的实例的引用。 如有必要,它用于将每个命令参数序列化为 byte[]。org.springframework.data.redis.serializer.RedisSerializer
8 返回命令键的 SpEL 表达式。 它默认为消息标头。 它不能计算为 。redis_commandnull
9 以逗号分隔的 SpEL 表达式,这些表达式被计算为命令参数。 与属性互斥。 如果两者都不提供任何属性,则 将用作命令参数。 参数表达式的计算结果可以为“null”,以支持可变数量的参数。arguments-strategypayload
10 一个标志,用于指定是否将评估的 Redis 命令字符串作为 when 配置的表达式评估上下文中的变量提供。 否则,将忽略此属性。boolean#cmdo.s.i.redis.outbound.ExpressionArgumentsStrategyargument-expressions
11 对 的实例的引用。 它与属性是互斥的。 如果两者都不提供任何属性,则 将用作命令参数。o.s.i.redis.outbound.ArgumentsStrategyargument-expressionspayload

Redis 队列出站网关

Spring Integration 引入了 Redis 队列出站网关来执行请求和回复场景。 它将会话推送到提供的 ,将 value 作为其键推送到 Redis 列表,并等待键为 plus 的 Redis 列表的回复。 每个交互都使用不同的 UUID。 以下列表显示了 Redis 出站网关的可用属性:UUIDqueueUUIDUUID.replySpring中文文档

<int-redis:queue-outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        extract-payload=""/>  (9)
1 此终端节点从中接收实例。MessageChannelMessage
2 此终结点发送应答实例的位置。MessageChannelMessage
3 指定此出站网关是否必须返回非 null 值。 默认情况下,此值为值。 否则,当 Redis 返回值时,将抛出 a。falseReplyRequiredExceptionnull
4 等待回复消息发送前的超时(以毫秒为单位)。 它通常适用于基于队列的有限回复通道。
5 对 Bean 的引用。 它默认为 。 它与“redis-template”属性是互斥的。RedisConnectionFactoryredisConnectionFactory
6 出站网关向其发送会话的 Redis 列表的名称。UUID
7 注册多个网关时此出站网关的顺序。
8 Bean 引用。 它可以是一个空字符串,表示“无序列化程序”。 在这种情况下,来自入站 Redis 消息的原始消息将作为有效负载发送到 。 默认情况下,它是一个 .RedisSerializerbyte[]channelMessageJdkSerializationRedisSerializer
9 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。 如果此属性设置为 ,则不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。Messagetrueserializer
1 此终端节点从中接收实例。MessageChannelMessage
2 此终结点发送应答实例的位置。MessageChannelMessage
3 指定此出站网关是否必须返回非 null 值。 默认情况下,此值为值。 否则,当 Redis 返回值时,将抛出 a。falseReplyRequiredExceptionnull
4 等待回复消息发送前的超时(以毫秒为单位)。 它通常适用于基于队列的有限回复通道。
5 对 Bean 的引用。 它默认为 。 它与“redis-template”属性是互斥的。RedisConnectionFactoryredisConnectionFactory
6 出站网关向其发送会话的 Redis 列表的名称。UUID
7 注册多个网关时此出站网关的顺序。
8 Bean 引用。 它可以是一个空字符串,表示“无序列化程序”。 在这种情况下,来自入站 Redis 消息的原始消息将作为有效负载发送到 。 默认情况下,它是一个 .RedisSerializerbyte[]channelMessageJdkSerializationRedisSerializer
9 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。 如果此属性设置为 ,则不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。Messagetrueserializer

Redis 队列入站网关

Spring Integration 4.1 引入了 Redis 队列入站网关来执行请求和回复场景。 它从提供的会话中弹出一个会话,从 Redis 列表中弹出以 that 作为其键的值,并使用键 plus 将回复推送到 Redis 列表。 以下列表显示了 Redis 队列入站网关的可用属性:UUIDqueueUUIDUUID.replySpring中文文档

<int-redis:queue-inbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        executor=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        receive-timeout=""  (9)
        expect-message=""  (10)
        recovery-interval=""/>  (11)
1 此终端节点发送从 Redis 数据创建的实例的位置。MessageChannelMessage
2 此终结点从中等待回复实例的位置。 可选 - 标头仍在使用中。MessageChannelMessagereplyChannel
3 对 Spring(或标准 JDK)bean 的引用。 它用于基础侦听任务。 它默认为 .TaskExecutorExecutorSimpleAsyncTaskExecutor
4 等待回复消息发送前的超时(以毫秒为单位)。 它通常适用于基于队列的有限回复通道。
5 对 Bean 的引用。 它默认为 。 它与“redis-template”属性是互斥的。RedisConnectionFactoryredisConnectionFactory
6 对话的 Redis 列表的名称。UUID
7 注册多个网关时此入站网关的顺序。
8 Bean 引用。 它可以是一个空字符串,表示“无序列化程序”。 在这种情况下,来自入站 Redis 消息的原始消息将作为有效负载发送到 。 它默认为 . (请注意,在 4.3 版之前的版本中,默认情况下它是 a。 若要还原该行为,请提供对 的引用)。RedisSerializerbyte[]channelMessageJdkSerializationRedisSerializerStringRedisSerializerStringRedisSerializer
9 等待获取接收消息的超时(以毫秒为单位)。 它通常适用于基于队列的有限请求通道。
10 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。 如果此属性设置为 ,则不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。Messagetrueserializer
11 在重新启动侦听器任务之前,侦听器任务在“右弹出”操作异常后应休眠的时间(以毫秒为单位)。
必须配置多个线程进行处理;否则,在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况中。 有关可能的实现,请参阅 Spring Framework 参考手册task-executorRedisQueueMessageDrivenEndpointerrorChannelTaskExecutor
1 此终端节点发送从 Redis 数据创建的实例的位置。MessageChannelMessage
2 此终结点从中等待回复实例的位置。 可选 - 标头仍在使用中。MessageChannelMessagereplyChannel
3 对 Spring(或标准 JDK)bean 的引用。 它用于基础侦听任务。 它默认为 .TaskExecutorExecutorSimpleAsyncTaskExecutor
4 等待回复消息发送前的超时(以毫秒为单位)。 它通常适用于基于队列的有限回复通道。
5 对 Bean 的引用。 它默认为 。 它与“redis-template”属性是互斥的。RedisConnectionFactoryredisConnectionFactory
6 对话的 Redis 列表的名称。UUID
7 注册多个网关时此入站网关的顺序。
8 Bean 引用。 它可以是一个空字符串,表示“无序列化程序”。 在这种情况下,来自入站 Redis 消息的原始消息将作为有效负载发送到 。 它默认为 . (请注意,在 4.3 版之前的版本中,默认情况下它是 a。 若要还原该行为,请提供对 的引用)。RedisSerializerbyte[]channelMessageJdkSerializationRedisSerializerStringRedisSerializerStringRedisSerializer
9 等待获取接收消息的超时(以毫秒为单位)。 它通常适用于基于队列的有限请求通道。
10 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。 如果此属性设置为 ,则不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。Messagetrueserializer
11 在重新启动侦听器任务之前,侦听器任务在“右弹出”操作异常后应休眠的时间(以毫秒为单位)。
必须配置多个线程进行处理;否则,在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况中。 有关可能的实现,请参阅 Spring Framework 参考手册task-executorRedisQueueMessageDrivenEndpointerrorChannelTaskExecutor

Redis 流出站通道适配器

Spring Integration 5.4 引入了反应式 Redis 流出站通道适配器,用于将消息有效负载写入 Redis 流。 出站通道适配器用于向流添加 a。 以下示例演示如何对 Redis 流出站通道适配器使用 Java 配置和服务类。ReactiveStreamOperations.add(…​)RecordSpring中文文档

@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
    ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
        new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
    reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
    reactiveStreamMessageHandler.setExtractPayload(true); (4)
    return reactiveStreamMessageHandler;
}
1 构造 using 和 stream name 的实例以添加记录。 另一个构造函数变体基于 SpEL 表达式,用于根据请求消息评估流键。ReactiveRedisStreamMessageHandlerReactiveRedisConnectionFactory
2 设置 用于在添加到流之前序列化记录键和值。RedisSerializationContext
3 set,它提供 Java 类型和 Redis 哈希/映射之间的协定。HashMapper
4 如果为“true”,则通道适配器将从请求消息中提取有效负载,以便添加要添加的流记录。 或者将整个消息用作值。 它默认为 。true
1 构造 using 和 stream name 的实例以添加记录。 另一个构造函数变体基于 SpEL 表达式,用于根据请求消息评估流键。ReactiveRedisStreamMessageHandlerReactiveRedisConnectionFactory
2 设置 用于在添加到流之前序列化记录键和值。RedisSerializationContext
3 set,它提供 Java 类型和 Redis 哈希/映射之间的协定。HashMapper
4 如果为“true”,则通道适配器将从请求消息中提取有效负载,以便添加要添加的流记录。 或者将整个消息用作值。 它默认为 。true

Redis Stream 入站通道适配器

Spring Integration 5.4 引入了 Reactive Stream 入站通道适配器,用于从 Redis Stream 读取消息。 入站通道适配器使用或基于自动确认标志从 Redis 流中读取记录。 以下示例说明如何使用 Redis 流入站通道适配器的 Java 配置。StreamReceiver.receive(…​)StreamReceiver.receiveAutoAck()Spring中文文档

@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
       ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
            new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    messageProducer.setStreamReceiverOptions( (2)
                StreamReceiver.StreamReceiverOptions.builder()
                      .pollTimeout(Duration.ofMillis(100))
                      .build());
    messageProducer.setAutoStartup(true); (3)
    messageProducer.setAutoAck(false); (4)
    messageProducer.setCreateConsumerGroup(true); (5)
    messageProducer.setConsumerGroup("my-group"); (6)
    messageProducer.setConsumerName("my-consumer"); (7)
    messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
    messageProducer.setReadOffset(ReadOffset.latest()); (9)
    messageProducer.extractPayload(true); (10)
    return messageProducer;
}
1 构造一个使用 和 stream key 读取记录的实例。ReactiveRedisStreamMessageProducerReactiveRedisConnectionFactory
2 A 使用反应式基础设施来使用 redis 流。StreamReceiver.StreamReceiverOptions
3 一个属性,用于指定此终结点是否应在应用程序上下文启动后自动启动。 它默认为 。 如果 ,应该手动启动。SmartLifecycletruefalseRedisStreamMessageProducermessageProducer.start()
4 如果 ,则不会自动确认收到的消息。 消息的确认将推迟到使用客户端的消息。 它默认为 。falsetrue
5 如果 ,将创建一个使用者组。 在创建消费者组期间,也会创建流(如果尚不存在)。 消费者组跟踪消息传递并区分消费者。 它默认为 。truefalse
6 设置使用者组名称。 它默认为定义的 Bean 名称。
7 设置使用者名称。 从组 中读取消息。my-consumermy-group
8 要从此终结点向其发送消息的消息通道。
9 定义要读取消息的偏移量。 它默认为 。ReadOffset.latest()
10 如果为 'true',通道适配器将从 中提取有效负载值。 否则,整体将用作有效负载。 它默认为 。RecordRecordtrue

如果设置为 ,则 Redis 驱动程序不会自动确认 In Redis Stream,而是将标头添加到消息中,以实例作为值生成。 每当基于此类记录的消息完成业务逻辑时,目标集成流负责调用其回调。 即使在反序列化期间发生异常并进行了配置,也需要类似的逻辑。 因此,目标错误处理程序必须决定确认或确认此类失败消息。 除了 ,还会将这些标头填充到要生成的消息中:、 和 。autoAckfalseRecordIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKSimpleAcknowledgmentacknowledge()errorChannelIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKReactiveRedisStreamMessageProducerRedisHeaders.STREAM_KEYRedisHeaders.STREAM_MESSAGE_IDRedisHeaders.CONSUMER_GROUPRedisHeaders.CONSUMERSpring中文文档

从版本 5.5 开始,您可以在 上显式配置选项,包括新引入的函数,如果 Redis Stream 使用者在发生反序列化错误时应继续轮询,则需要该函数。 默认函数向错误通道发送一条消息(如果提供),并可能确认失败的消息,如上所述。 所有这些都是相互排斥的,外部提供.StreamReceiver.StreamReceiverOptionsBuilderReactiveRedisStreamMessageProduceronErrorResumeStreamReceiver.StreamReceiverOptionsBuilderStreamReceiver.StreamReceiverOptionsSpring中文文档

1 构造一个使用 和 stream key 读取记录的实例。ReactiveRedisStreamMessageProducerReactiveRedisConnectionFactory
2 A 使用反应式基础设施来使用 redis 流。StreamReceiver.StreamReceiverOptions
3 一个属性,用于指定此终结点是否应在应用程序上下文启动后自动启动。 它默认为 。 如果 ,应该手动启动。SmartLifecycletruefalseRedisStreamMessageProducermessageProducer.start()
4 如果 ,则不会自动确认收到的消息。 消息的确认将推迟到使用客户端的消息。 它默认为 。falsetrue
5 如果 ,将创建一个使用者组。 在创建消费者组期间,也会创建流(如果尚不存在)。 消费者组跟踪消息传递并区分消费者。 它默认为 。truefalse
6 设置使用者组名称。 它默认为定义的 Bean 名称。
7 设置使用者名称。 从组 中读取消息。my-consumermy-group
8 要从此终结点向其发送消息的消息通道。
9 定义要读取消息的偏移量。 它默认为 。ReadOffset.latest()
10 如果为 'true',通道适配器将从 中提取有效负载值。 否则,整体将用作有效负载。 它默认为 。RecordRecordtrue

Redis 锁注册表

Spring Integration 4.0 引入了 . 某些组件(例如,聚合器和重排序器)使用从实例获取的锁来确保一次只有一个线程操作一个组。 在单个组件中执行此功能。 您现在可以在这些组件上配置外部锁注册表。 当您将它与 共享 一起使用时,您可以使用 跨多个应用程序实例提供此功能,以便一次只能有一个实例操作该组。RedisLockRegistryLockRegistryDefaultLockRegistryMessageGroupStoreRedisLockRegistrySpring中文文档

当一个锁被本地线程释放时,另一个本地线程通常可以立即获取该锁。 如果线程使用不同的注册表实例释放锁,则最多可能需要 100 毫秒才能获取锁。Spring中文文档

为了避免“挂起”锁(当服务器发生故障时),此注册表中的锁在默认 60 秒后过期,但您可以在注册表上配置此值。 锁的保持时间通常要短得多。Spring中文文档

由于密钥可能会过期,因此尝试解锁过期的锁会导致引发异常。 但是,受此类锁定保护的资源可能已受到损害,因此应将此类异常视为严重。 应将过期值设置为足够大的值以防止出现这种情况,但应将其设置得足够低,以便在服务器发生故障后可以在合理的时间内恢复锁定。

从版本 5.0 开始,实现 ,它删除了上次获取的锁,而不是以前获得的锁,并且当前未锁定的锁。RedisLockRegistryExpirableLockRegistryageSpring中文文档

从版本 5.5.6 开始,支持自动清理缓存,以便通过 . 有关更多信息,请参阅其 JavaDocs。RedisLockRegistryRedisLockRegistry.locksRedisLockRegistry.setCacheCapacity()Spring中文文档

从版本 5.5.13 开始,公开了一个选项,用于确定 Redis 锁获取应以何种模式进行:RedisLockRegistrysetRedisLockType(RedisLockType)Spring中文文档

  • RedisLockType.SPIN_LOCK- 通过周期性循环(100ms)获取锁,检查是否可以获取锁。 违约。Spring中文文档

  • RedisLockType.PUB_SUB_LOCK- 该锁是通过 redis pub-sub 订阅获得的。Spring中文文档

pub-sub 是首选模式 - 客户端 Redis 服务器之间的网络聊天更少,性能更高 - 当订阅收到有关在其他进程中解锁的通知时,会立即获取锁定。 但是,Redis 不支持主/副本连接中的发布订阅(例如在 AWS ElastiCache 环境中),因此默认选择繁忙旋转模式以使注册表在任何环境中工作。Spring中文文档

由于密钥可能会过期,因此尝试解锁过期的锁会导致引发异常。 但是,受此类锁定保护的资源可能已受到损害,因此应将此类异常视为严重。 应将过期值设置为足够大的值以防止出现这种情况,但应将其设置得足够低,以便在服务器发生故障后可以在合理的时间内恢复锁定。