对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
Spring Integration 2.1 引入了对 Redis 的支持:“一个开源的高级键值存储”。
这种支持以基于 Redis 以及发布-订阅消息收发适配器的形式出现,Redis 通过其 PUBLISH、
SUBSCRIBE
和 UNSUBSCRIBE
命令支持这些适配器。MessageStore
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>6.2.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:6.2.9"
您还需要包含 Redis 客户端依赖项,例如 Lettuce。
要下载、安装和运行 Redis,请参阅 Redis 文档。
连接到 Redis
要开始与 Redis 交互,您首先需要连接到它。
Spring 集成使用另一个 Spring 项目 Spring Data Redis 提供的支持,该项目提供了典型的 Spring 结构:和 。
这些抽象简化了与多个 Redis 客户端 Java API 的集成。
目前,Spring Data Redis 支持 Jedis 和 Lettuce。ConnectionFactory
Template
用RedisConnectionFactory
要连接到 Redis,您可以使用接口的实现之一。
下面的清单显示了接口定义:RedisConnectionFactory
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
下面的示例展示了如何在 Java 中创建一个:LettuceConnectionFactory
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
下面的示例展示了如何在 Spring 的 XML 配置中创建:LettuceConnectionFactory
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
的实现提供了一组属性,例如 port 和 host,您可以根据需要设置这些属性。
拥有 的实例后,可以创建 的实例并将其注入 .RedisConnectionFactory
RedisConnectionFactory
RedisTemplate
RedisConnectionFactory
用RedisTemplate
与 Spring 中的其他模板类(例如 和 )一样,它是一个简化 Redis 数据访问代码的辅助类。
有关及其变体(例如 )的更多信息,请参阅 Spring Data Redis 文档。JdbcTemplate
JmsTemplate
RedisTemplate
RedisTemplate
StringRedisTemplate
以下示例演示如何在 Java 中创建 的实例:RedisTemplate
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
下面的示例展示了如何在 Spring 的 XML 配置中创建 的实例:RedisTemplate
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 进行消息收发
如简介中所述,Redis 通过其 、 和 命令提供对发布-订阅消息传递的支持。
与 JMS 和 AMQP 一样, Spring 集成提供了消息通道和适配器,用于通过 Redis 发送和接收消息。PUBLISH
SUBSCRIBE
UNSUBSCRIBE
Redis 发布/订阅频道
与 JMS 类似,在某些情况下,生产者和使用者都应该成为同一应用程序的一部分,运行在同一个进程中。 您可以通过使用一对入站和出站通道适配器来实现此目的。 但是,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法可以解决此用例。 您可以创建 publish-subscribe 通道,如下例所示:
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
A 的行为很像 Spring Integration 主命名空间中的普通元素。
它可以被任何端点的 和 属性 引用。
区别在于,此通道由 Redis 主题名称提供支持:由属性指定的值。
但是,与 JMS 不同的是,此主题不必提前创建,甚至不必由 Redis 自动创建。
在 Redis 中,主题是充当地址的简单值。
创建者和使用者可以使用与其主题名称相同的值进行通信。
对此通道的简单订阅意味着可以在生产终端节点和使用终端节点之间进行异步发布-订阅消息传递。
但是,与通过在简单的 Spring Integration 元素中添加元素创建的异步消息通道不同,消息不存储在内存中队列中。
相反,这些消息是通过 Redis 传递的,这让您能够依赖它对持久性和集群的支持,以及它与其他非 Java 平台的互操作性。publish-subscribe-channel
<publish-subscribe-channel/>
input-channel
output-channel
String
topic-name
String
String
<queue/>
<channel/>
Redis 入站通道适配器
Redis 入站通道适配器 () 以与其他入站适配器相同的方式将传入的 Redis 消息改编为 Spring 消息。
它接收特定于平台的消息(在本例中为 Redis),并使用策略将其转换为 Spring 消息。
以下示例显示如何配置 Redis 入站通道适配器:RedisInboundChannelAdapter
MessageConverter
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的示例显示了 Redis 入站通道适配器的一个简单但完整的配置。
请注意,前面的配置依赖于熟悉的 Spring 自动发现某些 bean 的范例。
在这种情况下, the 被隐式注入到适配器中。
您可以改用 attribute 来显式指定它。redisConnectionFactory
connection-factory
另请注意,前面的配置会向适配器注入自定义 .
该方法类似于 JMS,其中实例用于在 Redis 消息和 Spring Integration 消息有效负载之间进行转换。
默认值为 .MessageConverter
MessageConverter
SimpleMessageConverter
入站适配器可以订阅多个主题名称,因此属性中的值集以逗号分隔。topics
从版本 3.0 开始,入站适配器除了 existing 属性外,现在还具有 attribute.
此属性包含一组以逗号分隔的 Redis 主题模式。
有关 Redis 发布-订阅的更多信息,请参阅 Redis Pub/Sub。topics
topic-patterns
入站适配器可以使用 a 反序列化 Redis 消息的正文。
的 attribute 可以设置为空字符串,这将产生 property 的值。
在这种情况下,Redis 消息的原始正文作为消息负载提供。RedisSerializer
serializer
<int-redis:inbound-channel-adapter>
null
RedisSerializer
byte[]
从版本 5.0 开始,您可以使用 .
此外,收到的 Spring 集成消息现在具有 Headers 来指示已发布消息的来源:topic 或 pattern。
您可以将此下游用于路由逻辑。Executor
task-executor
<int-redis:inbound-channel-adapter>
RedisHeaders.MESSAGE_SOURCE
Redis 出站通道适配器
Redis 出站通道适配器以与其他出站适配器相同的方式将传出的 Spring Integration 消息改编为 Redis 消息。
它接收 Spring 集成消息,并使用策略将它们转换为特定于平台的消息(在本例中为 Redis)。
以下示例显示如何配置 Redis 出站通道适配器:MessageConverter
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
该配置与 Redis 入站通道适配器类似。
适配器被隐式注入 a ,该 bean 名称定义为 。
此示例还包括可选的(和自定义的)(bean)。RedisConnectionFactory
redisConnectionFactory
MessageConverter
testConverter
从 Spring Integration 3.0 开始,它提供了该属性的替代方法:你可以使用该属性来确定运行时消息的 Redis 主题。
这些属性是互斥的。<int-redis:outbound-channel-adapter>
topic
topic-expression
Redis 队列入站通道适配器
Spring Integration 3.0 引入了一个队列入站通道适配器,用于从 Redis 列表中“弹出”消息。 默认情况下,它使用 “right pop”,但你可以将其配置为使用 “left pop” 来代替。 适配器是消息驱动的。 它使用内部侦听器线程,不使用 Poller。
以下清单显示了 的所有可用属性:queue-inbound-channel-adapter
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
1 | 组件 Bean 名称。
如果未提供该属性,则会在应用程序上下文中创建并注册 a,并将此属性作为 Bean 名称。
在这种情况下,端点本身使用 bean 名称 plus 注册。
(如果 Bean 名称为 ,则端点注册为 .)channel DirectChannel id id .adapter thing1 thing1.adapter |
2 | 要从此 Endpoint 向其发送实例的 。MessageChannel Message |
3 | 一个属性,用于指定此端点是否应在应用程序上下文启动后自动启动。
它默认为 .SmartLifecycle true |
4 | 一个属性,用于指定此终端节点的启动阶段。
它默认为 .SmartLifecycle 0 |
5 | 对 Bean 的引用。
它默认为 .RedisConnectionFactory redisConnectionFactory |
6 | Redis 列表的名称,在其上执行基于队列的“pop”操作以获取 Redis 消息。 |
7 | 当从终端节点的侦听任务收到异常时,将实例发送到该对象。
默认情况下,底层使用应用程序上下文中的默认值。MessageChannel ErrorMessage MessagePublishingErrorHandler errorChannel |
8 | Bean 引用。
它可以是一个空字符串,这意味着 'no serializer'。
在这种情况下,入站 Redis 消息中的原始消息将作为有效负载发送到 。
默认情况下,它是一个 .RedisSerializer byte[] channel Message JdkSerializationRedisSerializer |
9 | “pop”操作等待队列中的 Redis 消息的超时时间(以毫秒为单位)。 默认值为 1 秒。 |
10 | 在重新启动侦听器任务之前,侦听器任务在 'pop' 操作出现异常后应休眠的时间(以毫秒为单位)。 |
11 | 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。
如果此属性设置为 ,则 不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。
其默认值为 。Message true serializer false |
12 | 对 Spring (或标准 JDK 1.5+ )bean 的引用。
它用于底层侦听任务。
它默认为 .TaskExecutor Executor SimpleAsyncTaskExecutor |
13 | 指定此终端节点应使用 “right pop” (when ) 还是 “left pop” (when ) 从 Redis 列表中读取消息。
如果 ,则 Redis 列表在与默认 Redis 队列出站通道适配器一起使用时充当队列。
将其设置为与使用 “right push” 写入列表的软件一起使用,或实现类似堆栈的消息顺序。
其默认值为 。
从 4.3 版本开始。true false true FIFO false true |
必须配置多个线程进行处理;否则,在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。
这可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况中。
有关可能的实现,请参阅 Spring Framework Reference Manual。task-executor RedisQueueMessageDrivenEndpoint errorChannel TaskExecutor |
Redis 队列出站通道适配器
Spring Integration 3.0 引入了一个队列出站通道适配器,用于从 Spring Integration 消息“推送”到 Redis 列表。
默认情况下,它使用 “left push”,但您可以将其配置为使用 “right push” 来代替。
以下清单显示了 Redis 的所有可用属性:queue-outbound-channel-adapter
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
1 | 组件 Bean 名称。
如果未提供该属性,则会在应用程序上下文中创建并注册 a,并将此属性作为 Bean 名称。
在这种情况下,端点使用 bean 名称 plus 注册。
(如果 Bean 名称为 ,则端点注册为 .)channel DirectChannel id id .adapter thing1 thing1.adapter |
2 | 此终端节点从中接收实例的 URL。MessageChannel Message |
3 | 对 Bean 的引用。
它默认为 .RedisConnectionFactory redisConnectionFactory |
4 | Redis 列表的名称,在其上执行基于队列的“推送”操作以发送 Redis 消息。
此属性与 互斥。queue-expression |
5 | 用于确定 Redis 列表名称的 SPEL。
它使用运行时的传入作为变量。
此属性与 互斥。Expression Message #root queue |
6 | 一个 Bean 引用。
它默认为 .
但是,对于有效负载,如果未提供引用,则使用 a。RedisSerializer JdkSerializationRedisSerializer String StringRedisSerializer serializer |
7 | 指定此终端节点是应仅将负载发送还是将整个负载发送到 Redis 队列。
它默认为 .Message true |
8 | 指定此终端节点应使用 “left push” (when ) 还是 “right push” (when ) 将消息写入 Redis 列表。
如果 ,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当队列。
将其设置为与使用 “left pop” 从列表中读取的软件一起使用,或实现类似堆栈的消息顺序。
它默认为 .
从 4.3 版本开始。true false true FIFO false true |
Redis 应用程序事件
从 Spring Integration 3.0 开始,Redis 模块提供了 的实现,而 .
它封装了 Redis 操作中的异常(终端节点是事件的“源”)。
例如,在捕获操作异常后发出这些事件。
例外情况可以是任何泛型或 .
使用 处理这些事件有助于确定后台 Redis 任务的问题并执行管理操作。IntegrationEvent
org.springframework.context.ApplicationEvent
RedisExceptionEvent
<int-redis:queue-inbound-channel-adapter/>
BoundListOperations.rightPop
org.springframework.data.redis.RedisSystemException
org.springframework.data.redis.RedisConnectionFailureException
<int-event:inbound-channel-adapter/>
1 | 组件 Bean 名称。
如果未提供该属性,则会在应用程序上下文中创建并注册 a,并将此属性作为 Bean 名称。
在这种情况下,端点本身使用 bean 名称 plus 注册。
(如果 Bean 名称为 ,则端点注册为 .)channel DirectChannel id id .adapter thing1 thing1.adapter |
2 | 要从此 Endpoint 向其发送实例的 。MessageChannel Message |
3 | 一个属性,用于指定此端点是否应在应用程序上下文启动后自动启动。
它默认为 .SmartLifecycle true |
4 | 一个属性,用于指定此终端节点的启动阶段。
它默认为 .SmartLifecycle 0 |
5 | 对 Bean 的引用。
它默认为 .RedisConnectionFactory redisConnectionFactory |
6 | Redis 列表的名称,在其上执行基于队列的“pop”操作以获取 Redis 消息。 |
7 | 当从终端节点的侦听任务收到异常时,将实例发送到该对象。
默认情况下,底层使用应用程序上下文中的默认值。MessageChannel ErrorMessage MessagePublishingErrorHandler errorChannel |
8 | Bean 引用。
它可以是一个空字符串,这意味着 'no serializer'。
在这种情况下,入站 Redis 消息中的原始消息将作为有效负载发送到 。
默认情况下,它是一个 .RedisSerializer byte[] channel Message JdkSerializationRedisSerializer |
9 | “pop”操作等待队列中的 Redis 消息的超时时间(以毫秒为单位)。 默认值为 1 秒。 |
10 | 在重新启动侦听器任务之前,侦听器任务在 'pop' 操作出现异常后应休眠的时间(以毫秒为单位)。 |
11 | 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。
如果此属性设置为 ,则 不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。
其默认值为 。Message true serializer false |
12 | 对 Spring (或标准 JDK 1.5+ )bean 的引用。
它用于底层侦听任务。
它默认为 .TaskExecutor Executor SimpleAsyncTaskExecutor |
13 | 指定此终端节点应使用 “right pop” (when ) 还是 “left pop” (when ) 从 Redis 列表中读取消息。
如果 ,则 Redis 列表在与默认 Redis 队列出站通道适配器一起使用时充当队列。
将其设置为与使用 “right push” 写入列表的软件一起使用,或实现类似堆栈的消息顺序。
其默认值为 。
从 4.3 版本开始。true false true FIFO false true |
必须配置多个线程进行处理;否则,在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。
这可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况中。
有关可能的实现,请参阅 Spring Framework Reference Manual。task-executor RedisQueueMessageDrivenEndpoint errorChannel TaskExecutor |
1 | 组件 Bean 名称。
如果未提供该属性,则会在应用程序上下文中创建并注册 a,并将此属性作为 Bean 名称。
在这种情况下,端点使用 bean 名称 plus 注册。
(如果 Bean 名称为 ,则端点注册为 .)channel DirectChannel id id .adapter thing1 thing1.adapter |
2 | 此终端节点从中接收实例的 URL。MessageChannel Message |
3 | 对 Bean 的引用。
它默认为 .RedisConnectionFactory redisConnectionFactory |
4 | Redis 列表的名称,在其上执行基于队列的“推送”操作以发送 Redis 消息。
此属性与 互斥。queue-expression |
5 | 用于确定 Redis 列表名称的 SPEL。
它使用运行时的传入作为变量。
此属性与 互斥。Expression Message #root queue |
6 | 一个 Bean 引用。
它默认为 .
但是,对于有效负载,如果未提供引用,则使用 a。RedisSerializer JdkSerializationRedisSerializer String StringRedisSerializer serializer |
7 | 指定此终端节点是应仅将负载发送还是将整个负载发送到 Redis 队列。
它默认为 .Message true |
8 | 指定此终端节点应使用 “left push” (when ) 还是 “right push” (when ) 将消息写入 Redis 列表。
如果 ,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当队列。
将其设置为与使用 “left pop” 从列表中读取的软件一起使用,或实现类似堆栈的消息顺序。
它默认为 .
从 4.3 版本开始。true false true FIFO false true |
Redis 消息存储
如 Enterprise Integration Patterns (EIP) 一书中所述,消息存储允许您保留消息。
当考虑可靠性时,当处理具有缓冲消息功能的组件(聚合器、resequencer 等)时,这可能很有用。
在 Spring 集成中,该策略还为索赔检查模式提供了基础,EIP 中也对此进行了描述。MessageStore
Spring 集成的 Redis 模块提供了 .
以下示例显示了如何将其与聚合器一起使用:RedisMessageStore
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的示例是一个 bean 配置,它需要 a 作为构造函数参数。RedisConnectionFactory
默认情况下,使用 Java 序列化来序列化消息。
但是,如果要使用不同的序列化技术(如 JSON),可以通过设置 .RedisMessageStore
valueSerializer
RedisMessageStore
从版本 4.3.10 开始,框架分别为实例和实例 — 和 提供 Jackson 序列化器和反序列化器实现。
它们必须使用 .
此外,您应该在 上设置为每个序列化的复杂对象添加类型信息(如果您信任源)。
然后在反序列化期间使用该类型信息。
该框架提供了一个名为 的实用程序方法,该方法已随前面提到的所有属性和序列化程序一起提供。
此实用程序方法附带一个参数,用于限制 Java 包进行反序列化以避免安全漏洞。
默认的受信任软件包:、、、 .
要在 中管理 JSON 序列化,必须以类似于以下示例的方式对其进行配置:Message
MessageHeaders
MessageJacksonDeserializer
MessageHeadersJacksonSerializer
SimpleModule
ObjectMapper
enableDefaultTyping
ObjectMapper
JacksonJsonUtils.messagingAwareMapper()
trustedPackages
java.util
java.lang
org.springframework.messaging.support
org.springframework.integration.support
org.springframework.integration.message
org.springframework.integration.store
RedisMessageStore
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
从版本 4.3.12 开始,支持允许区分同一 Redis 服务器上的 store 实例的选项。RedisMessageStore
prefix
Redis 通道消息存储
前面显示的内容将每个组维护为单个键(组 ID)下的一个值。
虽然您可以使用它来支持持久性,但为此目的提供了专门的 (从版本 4.0 开始)。
此存储使用 for each channel、发送消息时和接收消息时。
默认情况下,此存储区还使用 JDK 序列化,但您可以修改值序列化器,如前所述。RedisMessageStore
QueueChannel
RedisChannelMessageStore
LIST
LPUSH
RPOP
我们建议使用此存储支持通道,而不是使用常规的 .
以下示例定义了一个 Redis 消息存储,并在具有队列的通道中使用它:RedisMessageStore
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用于存储数据的键的格式为:(在前面的示例中为 )。<storeBeanName>:<channelId>
redisMessageStore:somePersistentQueueChannel
此外,还提供了一个子类。
当您将此函数与 一起使用时,消息将按 (FIFO) 优先级顺序接收。
它使用标准标头并支持优先级值 ()。
具有其他优先级的消息(和没有优先级的消息)将按 FIFO 顺序检索,位于具有优先级的任何消息之后。RedisChannelPriorityMessageStore
QueueChannel
IntegrationMessageHeaderAccessor.PRIORITY
0 - 9
这些存储仅实现而不实现 .
它们只能用于支持.BasicMessageGroupStore MessageGroupStore QueueChannel |
这些存储仅实现而不实现 .
它们只能用于支持.BasicMessageGroupStore MessageGroupStore QueueChannel |
Redis 元数据存储
Spring Integration 3.0 引入了一个新的基于 Redis 的 MetadataStore
(参见元数据存储)实现。
您可以使用 来维护跨应用程序重启的状态。
您可以将此新实现与适配器一起使用,例如:RedisMetadataStore
MetadataStore
MetadataStore
要指示这些适配器使用 new ,请声明一个名为 .
Feed 入站通道适配器和 Feed 入站通道适配器都会自动选取并使用声明的 .
下面的示例展示了如何声明这样的 bean:RedisMetadataStore
metadataStore
RedisMetadataStore
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
它由 RedisProperties
提供支持。
与它的交互使用 BoundHashOperations
,而这反过来又需要 for the entire store.
在 的情况下,它扮演着区域的角色,这在分布式环境中当多个应用程序使用同一个 Redis 服务器时非常有用。
默认情况下,此参数的值为 .RedisMetadataStore
key
Properties
MetadataStore
key
key
MetaData
从版本 4.0 开始,此 store 实现了 ,使其在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。ConcurrentMetadataStore
您不能将 (例如,在 ) 与 Redis 集群一起使用,因为当前不支持 atomicity 命令。RedisMetadataStore.replace() AbstractPersistentAcceptOnceFileListFilter WATCH |
您不能将 (例如,在 ) 与 Redis 集群一起使用,因为当前不支持 atomicity 命令。RedisMetadataStore.replace() AbstractPersistentAcceptOnceFileListFilter WATCH |
Redis Store 入站通道适配器
Redis 存储入站通道适配器是一个轮询使用者,它从 Redis 集合中读取数据并将其作为有效负载发送。
以下示例显示如何配置 Redis store 入站通道适配器:Message
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
前面的示例显示了如何使用元素配置 Redis 存储入站通道适配器,并为各种属性提供值,例如:store-inbound-channel-adapter
-
key
或 :正在使用的集合的键的名称。key-expression
-
collection-type
:此适配器支持的集合类型的枚举。 支持的集合包括 、 、 、 和 。LIST
SET
ZSET
PROPERTIES
MAP
-
connection-factory
:对 实例的引用。o.s.data.redis.connection.RedisConnectionFactory
-
redis-template
:对 实例的引用。o.s.data.redis.core.RedisTemplate
-
所有其他入站适配器中通用的其他属性(例如 'channel')。
不能同时设置 和 。redis-template connection-factory |
默认情况下,适配器使用 .
这将对 key、values、hash keys 和 hash values 使用实例。
如果您的 Redis 存储包含使用其他技术序列化的对象,则必须提供具有适当序列化程序的 configured。
例如,如果使用设置为 的 Redis 存储出站适配器写入存储,则必须提供如下配置:
对键和哈希键使用序列化程序,对值和哈希值使用默认 JDK 序列化序列化程序。 |
因为它具有 的 Literal 值,所以前面的示例相对简单且静态。
有时,您可能需要根据某些条件在运行时更改 key 的值。
为此,请改用,其中提供的表达式可以是任何有效的 SPEL 表达式。key
key-expression
此外,您可能希望对从 Redis 集合中读取的成功处理的数据执行一些后处理。
例如,您可能希望在处理完值后移动或删除该值。
你可以通过使用 Spring Integration 2.2 中添加的事务同步功能来实现这一点。
以下示例使用 and 事务同步:key-expression
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
你可以使用 element 将你的 Poller 声明为事务性。
此元素可以引用真实的事务管理器(例如,如果流的某个其他部分调用 JDBC)。
如果您没有“真正的”事务,则可以使用 ,它是 Spring 的实现,并且可以在没有实际事务时使用 Redis 适配器的事务同步功能。transactional
o.s.i.transaction.PseudoTransactionManager
PlatformTransactionManager
这不会使 Redis 活动本身成为事务性活动。 它允许在成功 (提交) 之前或之后或失败 (回滚) 执行操作同步。 |
一旦你的 Poller 是事务性的,你就可以在元素上设置一个 the 的实例。 创建 的实例。
为方便起见,我们公开了默认的 SpEL-based ,它允许您配置 SpEL 表达式,其执行与事务协调(同步)。
支持 before-commit、after-commit 和 after-rollback 的表达式,以及发送评估结果(如果有)的通道(每种事件一个)。
对于每个子元素,您可以指定 and 属性。
如果仅存在该属性,则收到的消息将作为特定同步方案的一部分发送到该位置。
如果仅存在属性且表达式的结果是非空值,则会生成一条将结果作为有效负载的消息,并将其发送到默认通道 () 并显示在日志中(在级别)。
如果您希望评估结果转到特定频道,请添加属性。
如果表达式的结果是 null 或 void,则不会生成任何消息。o.s.i.transaction.TransactionSynchronizationFactory
transactional
TransactionSynchronizationFactory
TransactionSynchronization
TransactionSynchronizationFactory
expression
channel
channel
expression
NullChannel
DEBUG
channel
该 将添加一个属性,其中包含绑定到 transaction 的实例,该实例可从实现访问。RedisStoreMessageSource
store
RedisStore
IntegrationResourceHolder
TransactionSynchronizationProcessor
有关事务同步的更多信息,请参阅事务同步。
不能同时设置 和 。redis-template connection-factory |
默认情况下,适配器使用 .
这将对 key、values、hash keys 和 hash values 使用实例。
如果您的 Redis 存储包含使用其他技术序列化的对象,则必须提供具有适当序列化程序的 configured。
例如,如果使用设置为 的 Redis 存储出站适配器写入存储,则必须提供如下配置:
对键和哈希键使用序列化程序,对值和哈希值使用默认 JDK 序列化序列化程序。 |
这不会使 Redis 活动本身成为事务性活动。 它允许在成功 (提交) 之前或之后或失败 (回滚) 执行操作同步。 |
RedisStore 出站通道适配器
RedisStore 出站通道适配器允许您将消息有效负载写入 Redis 集合,如下例所示:
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
前面的配置:Redis 使用 element.
它为各种属性提供值,例如:store-inbound-channel-adapter
-
key
或 :正在使用的集合的键的名称。key-expression
-
extract-payload-elements
:如果设置为(默认)并且有效负载是“多值”对象(即 a 或 a )的实例,则使用 “addAll” 和 “putAll” 语义存储它。 否则,如果设置为 ,则有效负载将存储为单个条目,而不管其类型如何。 如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且有效负载始终存储为单个条目。true
Collection
Map
false
-
collection-type
:此适配器支持的类型的枚举。 支持的集合包括 、 、 、 和 。Collection
LIST
SET
ZSET
PROPERTIES
MAP
-
map-key-expression
:返回要存储的条目的键名称的 SPEL 表达式。 仅当 is 或 且 'extract-payload-elements' 为 false 时,它才适用。collection-type
MAP
PROPERTIES
-
connection-factory
:对 实例的引用。o.s.data.redis.connection.RedisConnectionFactory
-
redis-template
:对 实例的引用。o.s.data.redis.core.RedisTemplate
-
所有其他入站适配器中通用的其他属性(例如 'channel')。
不能同时设置 和 。redis-template connection-factory |
默认情况下,适配器使用 .
这将对 key、values、hash keys 和 hash values 使用实例。
但是,如果设置为 ,则将使用具有键和哈希键实例以及值和哈希值的实例 s 的 a。
使用 JDK 序列化程序时,请务必了解 Java 序列化用于所有值,而不管该值是否实际上是集合。
如果您需要对值的序列化进行更多控制,请考虑提供自己的值,而不是依赖这些默认值。StringRedisTemplate StringRedisSerializer extract-payload-elements false RedisTemplate StringRedisSerializer JdkSerializationRedisSerializer RedisTemplate |
由于它具有 the 和其他属性的 Literals 值,因此前面的示例相对简单且静态。
有时,您可能需要在运行时根据某些条件动态更改值。
为此,请使用它们的等效项(、 、 等),其中提供的表达式可以是任何有效的 SPEL 表达式。key
-expression
key-expression
map-key-expression
不能同时设置 和 。redis-template connection-factory |
默认情况下,适配器使用 .
这将对 key、values、hash keys 和 hash values 使用实例。
但是,如果设置为 ,则将使用具有键和哈希键实例以及值和哈希值的实例 s 的 a。
使用 JDK 序列化程序时,请务必了解 Java 序列化用于所有值,而不管该值是否实际上是集合。
如果您需要对值的序列化进行更多控制,请考虑提供自己的值,而不是依赖这些默认值。StringRedisTemplate StringRedisSerializer extract-payload-elements false RedisTemplate StringRedisSerializer JdkSerializationRedisSerializer RedisTemplate |
Redis 出站命令网关
Spring 集成 4.0 引入了 Redis 命令网关,允许您使用通用方法执行任何标准 Redis 命令。
以下清单显示了 Redis 出站网关的可用属性:RedisConnection#execute
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
1 | 此终端节点从中接收实例的 URL。MessageChannel Message |
2 | 此端点发送回复实例的位置。MessageChannel Message |
3 | 指定此出站网关是否必须返回非 null 值。
它默认为 .
当 Redis 返回值时,将引发 A。true ReplyRequiredException null |
4 | 在发送回复消息之前等待的超时 (以毫秒为单位)。 它通常应用于基于队列的有限回复通道。 |
5 | 对 Bean 的引用。
它默认为 .
它与 'redis-template' 属性互斥。RedisConnectionFactory redisConnectionFactory |
6 | 对 Bean 的引用。
它与 'connection-factory' 属性互斥。RedisTemplate |
7 | 对 实例的引用。
如有必要,它用于将每个 command 参数序列化为 byte[]。org.springframework.data.redis.serializer.RedisSerializer |
8 | 返回命令键的 SpEL 表达式。
它默认为消息标头。
它的计算结果不得为 。redis_command null |
9 | 以逗号分隔的 SpEL 表达式,计算为命令参数。
与 属性互斥。
如果两者都不提供,则 the 将用作命令参数。
参数表达式的计算结果可以为 'null' 以支持可变数量的参数。arguments-strategy payload |
10 | 一个标志,用于指定评估的 Redis 命令字符串是否在 when 配置的表达式评估上下文中作为变量可用。
否则,将忽略此属性。boolean #cmd o.s.i.redis.outbound.ExpressionArgumentsStrategy argument-expressions |
11 | 对 实例的引用。
它与 attribute 互斥。
如果两者都不提供,则 the 将用作命令参数。o.s.i.redis.outbound.ArgumentsStrategy argument-expressions payload |
您可以使用 as a common component 来执行任何所需的 Redis 操作。
以下示例显示如何从 Redis 原子序数获取递增的值:<int-redis:outbound-gateway>
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
有效负载应具有 name ,该名称可由 bean 定义提供。Message
redisCounter
org.springframework.data.redis.support.atomic.RedisAtomicInteger
该方法具有 generic 作为其返回类型。
实际结果取决于命令类型。
例如,返回一个 .
有关命令、其参数和结果类型的更多信息,请参阅 Redis 规范。RedisConnection#execute
Object
MGET
List<byte[]>
1 | 此终端节点从中接收实例的 URL。MessageChannel Message |
2 | 此端点发送回复实例的位置。MessageChannel Message |
3 | 指定此出站网关是否必须返回非 null 值。
它默认为 .
当 Redis 返回值时,将引发 A。true ReplyRequiredException null |
4 | 在发送回复消息之前等待的超时 (以毫秒为单位)。 它通常应用于基于队列的有限回复通道。 |
5 | 对 Bean 的引用。
它默认为 .
它与 'redis-template' 属性互斥。RedisConnectionFactory redisConnectionFactory |
6 | 对 Bean 的引用。
它与 'connection-factory' 属性互斥。RedisTemplate |
7 | 对 实例的引用。
如有必要,它用于将每个 command 参数序列化为 byte[]。org.springframework.data.redis.serializer.RedisSerializer |
8 | 返回命令键的 SpEL 表达式。
它默认为消息标头。
它的计算结果不得为 。redis_command null |
9 | 以逗号分隔的 SpEL 表达式,计算为命令参数。
与 属性互斥。
如果两者都不提供,则 the 将用作命令参数。
参数表达式的计算结果可以为 'null' 以支持可变数量的参数。arguments-strategy payload |
10 | 一个标志,用于指定评估的 Redis 命令字符串是否在 when 配置的表达式评估上下文中作为变量可用。
否则,将忽略此属性。boolean #cmd o.s.i.redis.outbound.ExpressionArgumentsStrategy argument-expressions |
11 | 对 实例的引用。
它与 attribute 互斥。
如果两者都不提供,则 the 将用作命令参数。o.s.i.redis.outbound.ArgumentsStrategy argument-expressions payload |
Redis 队列出站网关
Spring 集成引入了 Redis 队列出站网关来执行请求和回复场景。
它将对话推送到提供的 ,将以该值作为键推送到 Redis 列表,并等待键为 plus 的 Redis 列表的回复。
每次交互使用不同的 UUID。
以下清单显示了 Redis 出站网关的可用属性:UUID
queue
UUID
UUID
.reply
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
1 | 此终端节点从中接收实例的 URL。MessageChannel Message |
2 | 此端点发送回复实例的位置。MessageChannel Message |
3 | 指定此出站网关是否必须返回非 null 值。
此值为默认值。
否则,当 Redis 返回值时,将引发 a。false ReplyRequiredException null |
4 | 在发送回复消息之前等待的超时 (以毫秒为单位)。 它通常应用于基于队列的有限回复通道。 |
5 | 对 Bean 的引用。
它默认为 .
它与 'redis-template' 属性互斥。RedisConnectionFactory redisConnectionFactory |
6 | 出站网关向其发送对话的 Redis 列表的名称。UUID |
7 | 注册多个网关时此出站网关的顺序。 |
8 | Bean 引用。
它可以是一个空字符串,这意味着 “no serializer”。
在这种情况下,入站 Redis 消息中的原始消息将作为有效负载发送到 。
默认情况下,它是一个 .RedisSerializer byte[] channel Message JdkSerializationRedisSerializer |
9 | 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。
如果此属性设置为 ,则 不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。Message true serializer |
1 | 此终端节点从中接收实例的 URL。MessageChannel Message |
2 | 此端点发送回复实例的位置。MessageChannel Message |
3 | 指定此出站网关是否必须返回非 null 值。
此值为默认值。
否则,当 Redis 返回值时,将引发 a。false ReplyRequiredException null |
4 | 在发送回复消息之前等待的超时 (以毫秒为单位)。 它通常应用于基于队列的有限回复通道。 |
5 | 对 Bean 的引用。
它默认为 .
它与 'redis-template' 属性互斥。RedisConnectionFactory redisConnectionFactory |
6 | 出站网关向其发送对话的 Redis 列表的名称。UUID |
7 | 注册多个网关时此出站网关的顺序。 |
8 | Bean 引用。
它可以是一个空字符串,这意味着 “no serializer”。
在这种情况下,入站 Redis 消息中的原始消息将作为有效负载发送到 。
默认情况下,它是一个 .RedisSerializer byte[] channel Message JdkSerializationRedisSerializer |
9 | 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。
如果此属性设置为 ,则 不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。Message true serializer |
Redis 队列入站网关
Spring Integration 4.1 引入了 Redis 队列入站网关来执行请求和回复场景。
它从提供的 中弹出一个对话,从 Redis 列表中弹出以该值作为其键的值,并使用键 plus 将回复推送到 Redis 列表。
以下清单显示了 Redis 队列入站网关的可用属性:UUID
queue
UUID
UUID
.reply
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
1 | 此终端节点发送从 Redis 数据创建的实例的位置。MessageChannel Message |
2 | 此终端节点等待回复实例的 from。
可选 - 标头仍在使用中。MessageChannel Message replyChannel |
3 | 对 Spring (或标准 JDK )bean 的引用。
它用于底层侦听任务。
它默认为 .TaskExecutor Executor SimpleAsyncTaskExecutor |
4 | 在发送回复消息之前等待的超时 (以毫秒为单位)。 它通常应用于基于队列的有限回复通道。 |
5 | 对 Bean 的引用。
它默认为 .
它与 'redis-template' 属性互斥。RedisConnectionFactory redisConnectionFactory |
6 | 对话的 Redis 列表的名称。UUID |
7 | 注册多个网关时此入站网关的顺序。 |
8 | Bean 引用。
它可以是一个空字符串,这意味着 “no serializer”。
在这种情况下,入站 Redis 消息中的原始消息将作为有效负载发送到 。
它默认为 .
(请注意,在 4.3 版之前的版本中,默认情况下它是 a.
要恢复该行为,请提供对 a 的引用)。RedisSerializer byte[] channel Message JdkSerializationRedisSerializer StringRedisSerializer StringRedisSerializer |
9 | 等待接收消息的超时 (以毫秒为单位)。 它通常应用于基于队列的有限请求通道。 |
10 | 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。
如果此属性设置为 ,则 不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。Message true serializer |
11 | 在重新启动侦听器任务之前,侦听器任务在 “right pop” 操作出现异常后应休眠的时间(以毫秒为单位)。 |
必须配置多个线程进行处理;否则,在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。
这可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况中。
有关可能的实现,请参阅 Spring Framework Reference Manual。task-executor RedisQueueMessageDrivenEndpoint errorChannel TaskExecutor |
1 | 此终端节点发送从 Redis 数据创建的实例的位置。MessageChannel Message |
2 | 此终端节点等待回复实例的 from。
可选 - 标头仍在使用中。MessageChannel Message replyChannel |
3 | 对 Spring (或标准 JDK )bean 的引用。
它用于底层侦听任务。
它默认为 .TaskExecutor Executor SimpleAsyncTaskExecutor |
4 | 在发送回复消息之前等待的超时 (以毫秒为单位)。 它通常应用于基于队列的有限回复通道。 |
5 | 对 Bean 的引用。
它默认为 .
它与 'redis-template' 属性互斥。RedisConnectionFactory redisConnectionFactory |
6 | 对话的 Redis 列表的名称。UUID |
7 | 注册多个网关时此入站网关的顺序。 |
8 | Bean 引用。
它可以是一个空字符串,这意味着 “no serializer”。
在这种情况下,入站 Redis 消息中的原始消息将作为有效负载发送到 。
它默认为 .
(请注意,在 4.3 版之前的版本中,默认情况下它是 a.
要恢复该行为,请提供对 a 的引用)。RedisSerializer byte[] channel Message JdkSerializationRedisSerializer StringRedisSerializer StringRedisSerializer |
9 | 等待接收消息的超时 (以毫秒为单位)。 它通常应用于基于队列的有限请求通道。 |
10 | 指定此终端节点是否期望 Redis 队列中的数据包含整个实例。
如果此属性设置为 ,则 不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。Message true serializer |
11 | 在重新启动侦听器任务之前,侦听器任务在 “right pop” 操作出现异常后应休眠的时间(以毫秒为单位)。 |
必须配置多个线程进行处理;否则,在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。
这可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况中。
有关可能的实现,请参阅 Spring Framework Reference Manual。task-executor RedisQueueMessageDrivenEndpoint errorChannel TaskExecutor |
Redis 流出站频道适配器
Spring 集成 5.4 引入了反应式 Redis 流出站通道适配器,用于将 Message 有效负载写入 Redis 流。
出站通道适配器用于将 a 添加到流中。
以下示例显示了如何使用 Redis Stream Outbound Channel Adapter 的 Java 配置和 Service 类。ReactiveStreamOperations.add(…)
Record
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
1 | 构造 using 和 stream name 的实例以添加记录。
另一个构造函数变体基于 SpEL 表达式,用于根据请求消息评估流键。ReactiveRedisStreamMessageHandler ReactiveRedisConnectionFactory |
2 | 设置 a 用于在添加到流之前序列化记录键和值。RedisSerializationContext |
3 | Set 提供 Java 类型和 Redis 哈希/映射之间的协定。HashMapper |
4 | 如果为 'true',则通道适配器将从请求消息中提取要添加的流记录的有效负载。
或者使用整个消息作为值。
它默认为 .true |
1 | 构造 using 和 stream name 的实例以添加记录。
另一个构造函数变体基于 SpEL 表达式,用于根据请求消息评估流键。ReactiveRedisStreamMessageHandler ReactiveRedisConnectionFactory |
2 | 设置 a 用于在添加到流之前序列化记录键和值。RedisSerializationContext |
3 | Set 提供 Java 类型和 Redis 哈希/映射之间的协定。HashMapper |
4 | 如果为 'true',则通道适配器将从请求消息中提取要添加的流记录的有效负载。
或者使用整个消息作为值。
它默认为 .true |
Redis Stream 入站频道适配器
Spring 集成 5.4 引入了反应式流入站通道适配器,用于从 Redis 流中读取消息。
入站通道适配器使用 OR BASED on 自动确认标志从 Redis 流中读取记录。
以下示例显示如何对 Redis Stream Inbound Channel Adapter 使用 Java 配置。StreamReceiver.receive(…)
StreamReceiver.receiveAutoAck()
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
1 | 构造 using 和 stream key 实例来读取记录。ReactiveRedisStreamMessageProducer ReactiveRedisConnectionFactory |
2 | A 使用反应式基础设施使用 redis 流。StreamReceiver.StreamReceiverOptions |
3 | 一个属性,用于指定此端点是否应在应用程序上下文启动后自动启动。
它默认为 .
如果 ,则应手动启动 。SmartLifecycle true false RedisStreamMessageProducer messageProducer.start() |
4 | 如果 ,则不会自动确认收到的邮件。
消息的确认将推迟到客户端使用消息。
它默认为 .false true |
5 | 如果为 ,将创建一个消费组。
在创建消费组时,也会创建 stream(如果尚不存在)。
Consumer Group 跟踪消息送达并区分 Consumer
它默认为 .true false |
6 | 设置 Consumer Group name。 它默认为定义的 bean 名称。 |
7 | 设置 Consumer name。
将消息读取为来自组 。my-consumer my-group |
8 | 从此终端节点向其发送消息的消息通道。 |
9 | 定义要读取消息的偏移量。
它默认为 .ReadOffset.latest() |
10 | 如果为“true”,则通道适配器将从 中提取有效载荷值。
否则,整体将用作有效负载。
它默认为 .Record Record true |
如果设置为 ,Redis 驱动程序不会自动确认 Redis Stream 中的 Redis Stream,而是将标头添加到消息中,以实例作为值生成。
每当根据此类记录为消息完成业务逻辑时,目标集成流负责调用其回调。
即使在反序列化期间发生异常并已配置,也需要类似的 logic。
因此,目标错误处理程序必须决定 ack 或 nack 此类失败的消息。
除了 之外,还会将这些标头填充到消息中以生成: 、 和 。autoAck
false
Record
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
SimpleAcknowledgment
acknowledge()
errorChannel
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
ReactiveRedisStreamMessageProducer
RedisHeaders.STREAM_KEY
RedisHeaders.STREAM_MESSAGE_ID
RedisHeaders.CONSUMER_GROUP
RedisHeaders.CONSUMER
从版本 5.5 开始,您可以在 上显式配置选项,包括新引入的函数,如果 Redis Stream 使用者在发生反序列化错误时应继续轮询,则需要该函数。
default 函数向 error 通道(如果提供)发送一条消息,并可能确认失败的消息,如上所述。
所有这些都与外部提供的 .StreamReceiver.StreamReceiverOptionsBuilder
ReactiveRedisStreamMessageProducer
onErrorResume
StreamReceiver.StreamReceiverOptionsBuilder
StreamReceiver.StreamReceiverOptions
1 | 构造 using 和 stream key 实例来读取记录。ReactiveRedisStreamMessageProducer ReactiveRedisConnectionFactory |
2 | A 使用反应式基础设施使用 redis 流。StreamReceiver.StreamReceiverOptions |
3 | 一个属性,用于指定此端点是否应在应用程序上下文启动后自动启动。
它默认为 .
如果 ,则应手动启动 。SmartLifecycle true false RedisStreamMessageProducer messageProducer.start() |
4 | 如果 ,则不会自动确认收到的邮件。
消息的确认将推迟到客户端使用消息。
它默认为 .false true |
5 | 如果为 ,将创建一个消费组。
在创建消费组时,也会创建 stream(如果尚不存在)。
Consumer Group 跟踪消息送达并区分 Consumer
它默认为 .true false |
6 | 设置 Consumer Group name。 它默认为定义的 bean 名称。 |
7 | 设置 Consumer name。
将消息读取为来自组 。my-consumer my-group |
8 | 从此终端节点向其发送消息的消息通道。 |
9 | 定义要读取消息的偏移量。
它默认为 .ReadOffset.latest() |
10 | 如果为“true”,则通道适配器将从 中提取有效载荷值。
否则,整体将用作有效负载。
它默认为 .Record Record true |
Redis Lock 注册表
Spring Integration 4.0 引入了 .
某些组件(例如,aggregator 和 resequencer)使用从实例获取的锁来确保一次只有一个线程操作一个组。
在单个组件中执行此功能。
现在,您可以在这些组件上配置外部锁注册表。
当您将其与共享 一起使用时,可以使用 跨多个应用程序实例提供此功能,以便一次只有一个实例可以操作该组。RedisLockRegistry
LockRegistry
DefaultLockRegistry
MessageGroupStore
RedisLockRegistry
当本地线程释放锁时,另一个本地线程通常可以立即获取该锁。 如果锁由使用其他注册表实例的线程释放,则可能需要长达 100 毫秒的时间才能获取锁。
为避免“挂起”锁(当服务器发生故障时),此注册表中的锁将在默认 60 秒后过期,但您可以在注册表上配置此值。 锁的持有时间通常要短得多。
由于密钥可能会过期,因此尝试解锁过期的锁会导致引发异常。 但是,受此类锁定保护的资源可能已泄露,因此应将此类异常视为严重异常。 您应该将过期时间设置得足够大,以防止出现这种情况,但要设置得足够低,以便在服务器发生故障后,可以在合理的时间内恢复锁定。 |
从版本 5.0 开始, implements ,它删除上次获取的锁以及当前未锁定的锁。RedisLockRegistry
ExpirableLockRegistry
age
从版本 5.5.6 开始,支持通过 自动清理 redisLocks 的缓存。
有关更多信息,请参阅其 JavaDocs。RedisLockRegistry
RedisLockRegistry.locks
RedisLockRegistry.setCacheCapacity()
从版本 5.5.13 开始,它提供了一个选项来确定应该在哪种模式下进行 Redis 锁采集:RedisLockRegistry
setRedisLockType(RedisLockType)
-
RedisLockType.SPIN_LOCK
- 通过周期性循环(100ms)来获取锁,检查是否可以获取锁。 违约。 -
RedisLockType.PUB_SUB_LOCK
- 锁由 redis 发布-订阅获取。
发布-订阅是首选模式 - 客户端 Redis 服务器之间的网络颤动更少,性能更高 - 当订阅在其他进程中收到解锁通知时,会立即获取锁。 但是,Redis 不支持主/副本连接中的发布-订阅(例如在 AWS ElastiCache 环境中),因此选择忙旋转模式作为默认模式,以使注册表在任何环境中工作。
由于密钥可能会过期,因此尝试解锁过期的锁会导致引发异常。 但是,受此类锁定保护的资源可能已泄露,因此应将此类异常视为严重异常。 您应该将过期时间设置得足够大,以防止出现这种情况,但要设置得足够低,以便在服务器发生故障后,可以在合理的时间内恢复锁定。 |