此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.3! |
消息存储
Enterprise Integration Patterns (EIP) 一书确定了几种能够缓冲消息的模式。
例如,聚合器会缓冲消息,直到它们可以被释放,而QueueChannel
buffers 消息,直到使用者显式地从该通道接收这些消息。
由于消息流中的任何点都可能发生故障,因此缓冲消息的 EIP 组件也会引入消息可能丢失的点。
为了降低丢失消息的风险,EIP 定义了消息存储模式,该模式允许 EIP 组件存储消息,通常存储在某种类型的持久性存储(例如 RDBMS)中。
Spring 集成通过以下方式为消息存储模式提供支持:
-
定义
org.springframework.integration.store.MessageStore
策略界面 -
提供此接口的多种实现
-
公开
message-store
属性,以便您可以注入任何实现MessageStore
接口。
有关如何配置特定消息存储实现以及如何注入MessageStore
实现到特定的缓冲组件中都有介绍(请参阅特定组件,例如 QueueChannel、Aggregator、Delayer 等)。
以下一对示例显示了如何为QueueChannel
对于聚合商:
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
默认情况下,消息使用o.s.i.store.SimpleMessageStore
,则MessageStore
.
这对于开发或简单的低容量环境来说可能很好,在这些环境中,非持久性消息的潜在丢失并不是一个问题。
但是,典型的生产应用程序需要一个更健壮的选项,这不仅是为了降低消息丢失的风险,也是为了避免潜在的内存不足错误。
因此,我们还提供MessageStore
各种数据存储的实现。
以下是支持的实施的完整列表:
-
Hazelcast 消息存储:使用 Hazelcast 分布式缓存来存储消息
-
JDBC 消息存储:使用 RDBMS 存储消息
-
Redis 消息存储:使用 Redis 键/值数据存储来存储消息
-
MongoDB 消息存储:使用 MongoDB 文档存储来存储消息
但是,在使用 Message 数据(payload 和 headers)使用不同的序列化策略进行序列化和反序列化,具体取决于 请特别注意表示某些类型数据的标头。
例如,如果其中一个 Headers 包含某个 Spring Bean 的实例,则在反序列化时,你最终可能会得到该 Bean 的不同实例,这会直接影响框架创建的一些隐式 Headers(例如 从 Spring 集成版本 3.0 开始,你可以通过配置一个 Headers Enricher 来解决这个问题,以便在向 此外,请考虑按如下方式配置 message-flow 时会发生什么:gateway → queue-channel(由持久 Message Store 支持)→ service-activator。
该网关会创建一个临时回复通道,当服务激活器的 Poller 从队列中读取时,该通道将丢失。
同样,您可以使用标头扩充器将标头替换为 有关更多信息,请参阅 Header Enricher。 |
Spring Integration 4.0 引入了两个新接口:
-
ChannelMessageStore
:要实现特定于QueueChannel
实例 -
PriorityCapableChannelMessageStore
:标记MessageStore
要用于的 implementations to use forPriorityChannel
实例,并为持久消息提供优先级顺序。
实际行为取决于实现。
该框架提供了以下实现,这些实现可以用作持久MessageStore
为QueueChannel
和PriorityChannel
:
注意事项
SimpleMessageStore 从版本 4.1 开始, 现在,在组件(如聚合器)之外访问组存储的用户可以直接引用聚合器正在使用的组,而不是副本。 在聚合器外部纵组可能会导致不可预知的结果。 因此,您不应该执行此类作,或者将 |
用MessageGroupFactory
从版本 4.3 开始,一些MessageGroupStore
实现可以注入自定义的MessageGroupFactory
策略来创建和自定义MessageGroup
实例MessageGroupStore
.
这默认为SimpleMessageGroupFactory
,它会产生SimpleMessageGroup
基于GroupType.HASH_SET
(LinkedHashSet
) internal 集合。
其他可能的选项包括SYNCHRONISED_SET
和BLOCKING_QUEUE
,其中最后一个可用于恢复之前的SimpleMessageGroup
行为。
此外,PERSISTENT
选项可用。
有关更多信息,请参阅下一节。
从版本 5.0.1 开始,LIST
当组中消息的顺序和唯一性无关紧要时,选项也可用。
持续MessageGroupStore
和 Lazy-load
从版本 4.3 开始,所有 persistentMessageGroupStore
实例检索MessageGroup
实例及其messages
以 lazy-load 方式从 store 中。
在大多数情况下,它对关联MessageHandler
实例(请参阅 Aggregator 和 Resequencer),这会增加加载整个MessageGroup
从存储中执行每个关联作。
您可以使用AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
选项从配置中关闭 Lazy-load 行为。
我们对 MongoDB 上的延迟加载进行的性能测试MessageStore
(MongoDB 消息存储)和<aggregator>
(聚合器)使用自定义release-strategy
类似于以下内容:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
对于 1000 条简单消息,它会生成类似于以下内容的结果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
但是,从版本 5.5 开始,所有持久性MessageGroupStore
implementations 提供了一个streamMessagesForGroup(Object groupId)
Contract 的 Contract。
当 store 中的组非常大时,这可以提高资源利用率。
在框架内部,当 Delayer 在启动时重新安排持久消息时,将使用此新 API(例如)。
返回的Stream<Message<?>>
必须在处理结束时关闭,例如通过try-with-resources
.
每当PersistentMessageGroup
,则其streamMessages()
delegates 到MessageGroupStore.streamMessagesForGroup()
.
消息组条件
从版本 5.5 开始,MessageGroup
abstraction 提供了一个condition
string 选项。
此选项的值可以是以后可以出于任何原因解析以为组做出决策的任何内容。
例如,一个ReleaseStrategy
从关联消息处理程序可以从组中查询此属性,而不是迭代组中的所有消息。
这MessageGroupStore
暴露一个setGroupCondition(Object groupId, String condition)
应用程序接口。
为此,一个setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
选项已添加到AbstractCorrelatingMessageHandler
.
在将每条消息添加到组后,将根据每条消息以及组的现有条件评估此函数。
实现可以决定返回新值、现有值或将目标条件重置为null
.
的值condition
可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。
例如,FileMarkerReleaseStrategy
从 File Aggregator 组件中,将条件填充到一个组中FileHeaders.LINE_COUNT
标头的FileSplitter.FileMarker.Mark.END
消息并从其canRelease()
将组大小与此条件中的值进行比较。
这样,它不会迭代组中的所有消息来查找FileSplitter.FileMarker.Mark.END
message 替换为FileHeaders.LINE_COUNT
页眉。
它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。
此外,为了便于配置,一个GroupConditionProvider
合同。
这AbstractCorrelatingMessageHandler
检查提供的ReleaseStrategy
实现此接口并提取conditionSupplier
用于组条件评估逻辑。