对于最新的稳定版本,请使用 Spring Data Redis 3.3.1! |
对于最新的稳定版本,请使用 Spring Data Redis 3.3.1! |
Redis Streams 以抽象方法对日志数据结构进行建模。通常,日志是仅追加的数据结构,从一开始就在随机位置使用,或者通过流式传输新消息来使用。
在 Redis 参考文档中了解有关 Redis Streams 的更多信息。 |
Redis Streams 大致可以分为两个功能领域:
-
追加记录
-
使用记录
尽管此模式与 Pub/Sub 有相似之处,但主要区别在于消息的持久性及其使用方式。
虽然 Pub/Sub 依赖于暂时性消息的广播(即,如果您不收听,就会错过消息),但 Redis Stream 使用持久的、仅追加的数据类型,该类型会保留消息,直到流被修剪。使用的另一个区别是 Pub/Sub 注册服务器端订阅。Redis 将到达的消息推送到客户端,而 Redis Streams 需要主动轮询。
和 包为 Redis Streams 提供核心功能。org.springframework.data.redis.connection
org.springframework.data.redis.stream
在 Redis 参考文档中了解有关 Redis Streams 的更多信息。 |
附加
若要发送记录,可以像其他操作一样使用低级别或高级 。这两个实体都提供 () 方法,该方法接受记录和目标流作为参数。虽然需要原始数据(字节数组),但允许任意对象作为记录传入,如以下示例所示:RedisConnection
StreamOperations
add
xAdd
RedisConnection
StreamOperations
// append message through connection
RedisConnection con = …
byte[] stream = …
ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream);
con.xAdd(record);
// append message through RedisTemplate
RedisTemplate template = …
StringRecord record = StreamRecords.string(…).withStreamKey("my-stream");
template.opsForStream().add(record);
流记录携带 键值元组作为其有效负载。将记录追加到流将返回可用作进一步参考的 。Map
RecordId
消费
在消费端,可以使用一个或多个流。Redis Streams 提供读取命令,允许从已知流内容内和流端之外的任意位置(随机访问)使用流,以使用新的流记录。
在低级别,提供 和 方法,用于映射 Redis 命令,分别用于在使用者组内读取和读取。请注意,多个流可以用作参数。RedisConnection
xRead
xReadGroup
Redis 中的订阅命令可能会被阻止。也就是说,调用连接会导致当前线程在开始等待消息时阻塞。仅当 read 命令超时或收到消息时,才会释放线程。xRead |
若要使用流消息,可以在应用程序代码中轮询消息,也可以通过消息侦听器容器使用两种异步接收之一,即命令式或反应式。每次有新记录到达时,容器都会通知应用程序代码。
同步接收
虽然流使用通常与异步处理相关联,但也可以同步使用消息。重载方法提供此功能。在同步接收期间,调用线程可能会阻塞,直到消息可用。该属性指定接收方在放弃等待消息之前应等待多长时间。StreamOperations.read(…)
StreamReadOptions.block
// Read message through RedisTemplate
RedisTemplate template = …
List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),
StreamOffset.latest("my-stream"));
List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),
StreamReadOptions.empty().count(2),
StreamOffset.create("my-stream", ReadOffset.lastConsumed()))
通过消息侦听器容器进行异步接收
由于其阻塞性质,低级别轮询没有吸引力,因为它需要对每个使用者进行连接和线程管理。为了缓解这个问题,Spring Data 提供了消息侦听器,它可以完成所有繁重的工作。如果您熟悉 EJB 和 JMS,您应该会发现这些概念很熟悉,因为它被设计为尽可能接近 Spring Framework 及其消息驱动的 POJO (MDP) 中的支持。
Spring Data 附带了两种针对所用编程模型量身定制的实现:
-
StreamMessageListenerContainer
充当命令式编程模型的消息侦听器容器。它用于使用 Redis 流中的记录并驱动注入其中的实例。StreamListener
-
StreamReceiver
提供消息侦听器的反应式变体。它用于将来自 Redis 流的消息作为潜在的无限流使用,并通过 .Flux
StreamMessageListenerContainer
并负责将消息接收和调度到侦听器进行处理的所有线程。消息侦听器容器/接收方是 MDP 和消息传递提供者之间的中介,负责注册以接收消息、资源获取和发布、异常转换等。这样一来,作为应用程序开发人员,您可以编写与接收消息(并做出反应)相关的(可能很复杂)业务逻辑,并将样板 Redis 基础架构关注点委托给框架。StreamReceiver
这两个容器都允许更改运行时配置,以便可以在应用程序运行时添加或删除订阅,而无需重新启动。此外,容器使用延迟订阅方法,仅在需要时使用。如果所有侦听器都已取消订阅,它会自动执行清理,并释放线程。RedisConnection
祈使的StreamMessageListenerContainer
与 EJB 世界中的消息驱动 Bean (MDB) 类似,流驱动 POJO (SDP) 充当流消息的接收器。SDP 的一个限制是它必须实现接口。另请注意,如果您的 POJO 在多个线程上接收消息,请务必确保您的实现是线程安全的。org.springframework.data.redis.stream.StreamListener
class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
}
}
StreamListener
表示函数接口,因此可以使用其 Lambda 形式重写实现:
message -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
};
实现 后,是时候创建消息侦听器容器并注册订阅了:StreamListener
RedisConnectionFactory connectionFactory = …
StreamListener<String, MapRecord<String, String, String>> streamListener = …
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
containerOptions);
Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), streamListener);
请参阅各种消息侦听器容器的 Javadoc,了解每个实现支持的功能的完整描述。
反应性的StreamReceiver
流数据源的响应式消费通常通过事件或消息发生。反应式接收器实现随其重载消息一起提供。与利用驱动程序提供的线程资源相比,响应式方法需要更少的基础结构资源(如线程)。接收流是需求驱动的发布者:Flux
StreamReceiver
receive(…)
StreamMessageListenerContainer
StreamMessage
Flux<MapRecord<String, String, String>> messages = …
return messages.doOnNext(it -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
});
现在,我们需要创建并注册订阅以使用流消息:StreamReceiver
ReactiveRedisConnectionFactory connectionFactory = …
StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofMillis(100))
.build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);
Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-stream"));
请参阅各种消息侦听器容器的 Javadoc,了解每个实现支持的功能的完整描述。
需求驱动的消费使用背压信号来激活和停用轮询。 如果需求得到满足,订阅将暂停轮询,直到订阅者发出进一步需求的信号。根据策略的不同,这可能会导致消息被跳过。StreamReceiver ReadOffset |
Acknowledge
策略
当您通过 读取消息时,服务器将记住给定消息已传递,并将其添加到待处理条目列表 (PEL) 中。已传递但尚未确认的邮件列表。
消息必须通过以下方式确认才能从待处理条目列表中删除,如下面的代码片段所示。Consumer Group
StreamOperations.acknowledge
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ...
container.receive(Consumer.from("my-group", "my-consumer"), (1)
StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
msg -> {
// ...
redisTemplate.opsForStream().acknowledge("my-group", msg); (2)
});
1 | 从组 my-group 中以 my-consumer 身份读取。不确认收到的邮件。 |
2 | 处理后确认消息。 |
若要在接收时自动确认消息,请使用而不是 。receiveAutoAck receive |
ReadOffset
策略
流读取操作接受读取偏移量规范,以使用来自给定偏移量的消息。 表示读取偏移规范。Redis 支持 3 种偏移量变体,具体取决于您是独立使用流还是在使用者组内使用流:ReadOffset
-
ReadOffset.latest()
– 阅读最新消息。 -
ReadOffset.from(…)
– 在特定消息 ID 后阅读。 -
ReadOffset.lastConsumed()
– 在上次使用的消息 ID 之后读取(仅限消费者组)。
在基于消息容器的使用上下文中,我们需要在使用消息时提前(或增加)读取偏移量。前进取决于请求和消费模式(有/没有消费者组)。以下矩阵解释了容器如何推进:ReadOffset
ReadOffset
读取偏移量 | 独立 | 消费者组 |
---|---|---|
最近的 |
阅读最新消息 |
阅读最新消息 |
特定消息 ID |
使用上次看到的消息作为下一个 MessageId |
使用上次看到的消息作为下一个 MessageId |
上次食用时间 |
使用上次看到的消息作为下一个 MessageId |
按使用者组显示的上次使用的消息 |
从特定消息 ID 和上次使用的消息读取可以被视为安全操作,可确保使用追加到流的所有消息。
使用最新消息进行读取可以跳过在轮询操作处于死区状态时添加到流的消息。轮询引入了一个死区时间,在这个时间里,消息可以在各个轮询命令之间到达。流消耗不是线性连续读取,而是拆分为重复调用。XREAD
Redis 中的订阅命令可能会被阻止。也就是说,调用连接会导致当前线程在开始等待消息时阻塞。仅当 read 命令超时或收到消息时,才会释放线程。xRead |
需求驱动的消费使用背压信号来激活和停用轮询。 如果需求得到满足,订阅将暂停轮询,直到订阅者发出进一步需求的信号。根据策略的不同,这可能会导致消息被跳过。StreamReceiver ReadOffset |
1 | 从组 my-group 中以 my-consumer 身份读取。不确认收到的邮件。 |
2 | 处理后确认消息。 |
若要在接收时自动确认消息,请使用而不是 。receiveAutoAck receive |
读取偏移量 | 独立 | 消费者组 |
---|---|---|
最近的 |
阅读最新消息 |
阅读最新消息 |
特定消息 ID |
使用上次看到的消息作为下一个 MessageId |
使用上次看到的消息作为下一个 MessageId |
上次食用时间 |
使用上次看到的消息作为下一个 MessageId |
按使用者组显示的上次使用的消息 |
序列化
发送到流的任何记录都需要序列化为其二进制格式。由于流与哈希数据结构的接近性,因此流键、字段名称和值使用在 .RedisTemplate
Stream 属性 | 序列化程序 | 描述 |
---|---|---|
钥匙 |
键序列化程序 |
用于 |
田 |
hashKeySerializer |
用于有效负载中的每个映射键 |
价值 |
hashValueSerializer |
用于有效负载中的每个映射值 |
请务必查看正在使用的 s,并注意,如果您决定不使用任何序列化程序,则需要确保这些值已经是二进制的。RedisSerializer
Stream 属性 | 序列化程序 | 描述 |
---|---|---|
钥匙 |
键序列化程序 |
用于 |
田 |
hashKeySerializer |
用于有效负载中的每个映射键 |
价值 |
hashValueSerializer |
用于有效负载中的每个映射值 |
对象映射
简单值
StreamOperations
允许将简单值 via 直接追加到流中,而无需将这些值放入结构中。
然后,该值将被分配给有效负载字段,并且可以在读回该值时提取该值。ObjectRecord
Map
ObjectRecord<String, String> record = StreamRecords.newRecord()
.in("my-stream")
.ofObject("my-value");
redisTemplate()
.opsForStream()
.add(record); (1)
List<ObjectRecord<String, String>> records = redisTemplate()
.opsForStream()
.read(String.class, StreamOffset.fromStart("my-stream"));
1 | XADD my-stream * “_class” “java.lang.String” “_raw” “my-value” |
ObjectRecord
s 通过与所有其他记录完全相同的序列化过程,因此也可以使用返回 .MapRecord
复杂值
可以通过 3 种方式向流添加复杂值:
-
使用字符串 JSON 表示形式转换为简单值。
-
使用合适的 . 序列化值。
RedisSerializer
-
使用
HashMapper
将值转换为适合序列化的值。Map
第一个变体是最直接的变体,但忽略了流结构提供的字段值功能,但流中的值对于其他使用者仍然是可读的。
第二个选项具有与第一个选项相同的优点,但可能会导致非常具体的使用者限制,因为所有使用者都必须实现完全相同的序列化机制。
这种方法有点复杂,它利用了 steam 哈希结构,但扁平化了源。还有一些使用者仍然能够读取记录,只要选择了合适的序列化器组合。HashMapper
HashMappers 将有效负载转换为具有特定类型的负载。请确保使用能够(反)序列化哈希的哈希键和哈希值序列化程序。Map |
ObjectRecord<String, User> record = StreamRecords.newRecord()
.in("user-logon")
.ofObject(new User("night", "angel"));
redisTemplate()
.opsForStream()
.add(record); (1)
List<ObjectRecord<String, User>> records = redisTemplate()
.opsForStream()
.read(User.class, StreamOffset.fromStart("user-logon"));
1 | XADD user-logon * “_class” “com.example.User” “firstname” “night” “lastname” “angel” |
StreamOperations
默认使用 ObjectHashMapper。
您可以在获得时提供适合您要求的。HashMapper
StreamOperations
redisTemplate()
.opsForStream(new Jackson2HashMapper(true))
.add(record); (1)
1 | XADD user-logon * “firstname” “night” “@class” “com.example.User” “lastname” “angel” |
A 可能不知道域类型上使用的任何内容,因为这些类型需要通过 .
确保使用 .
|
1 | XADD my-stream * “_class” “java.lang.String” “_raw” “my-value” |
HashMappers 将有效负载转换为具有特定类型的负载。请确保使用能够(反)序列化哈希的哈希键和哈希值序列化程序。Map |
1 | XADD user-logon * “_class” “com.example.User” “firstname” “night” “lastname” “angel” |
1 | XADD user-logon * “firstname” “night” “@class” “com.example.User” “lastname” “angel” |
A 可能不知道域类型上使用的任何内容,因为这些类型需要通过 .
确保使用 .
|