重定序器
resequencer 与 聚合器相关,但用途不同。 当聚合器合并消息时,resequencer 传递消息而不更改它们。
功能性
resequencer 的工作方式与 aggregator 类似,因为它使用 the 将消息存储在组中。
区别在于 Resequencer 不以任何方式处理消息。
相反,它会按照其 Headers 值的顺序释放它们。CORRELATION_ID
SEQUENCE_NUMBER
对此,您可以选择一次释放所有消息(在整个序列之后,根据 和其他可能性)或在有效序列可用时立即释放。
(我们将在本章后面介绍我们所说的 “有效序列” 的含义。SEQUENCE_SIZE
resequencer 旨在对具有较小间隙的相对较短的消息序列进行重新排序。 如果您有大量具有许多间隙的不相交序列,则可能会遇到性能问题。 |
配置 Resequencer
有关在 Java DSL 中配置重新排序器的信息,请参阅聚合器和重新排序器。
配置 resequencer 只需要在 XML 中包含适当的元素。
以下示例显示了 resequencer 配置:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" (1)
input-channel="inputChannel" (2)
output-channel="outputChannel" (3)
discard-channel="discardChannel" (4)
release-partial-sequences="true" (5)
message-store="messageStore" (6)
send-partial-result-on-expiry="true" (7)
send-timeout="86420000" (8)
correlation-strategy="correlationStrategyBean" (9)
correlation-strategy-method="correlate" (10)
correlation-strategy-expression="headers['something']" (11)
release-strategy="releaseStrategyBean" (12)
release-strategy-method="release" (13)
release-strategy-expression="size() == 10" (14)
empty-group-min-timeout="60000" (15)
lock-registry="lockRegistry" (16)
group-timeout="60000" (17)
group-timeout-expression="size() ge 2 ? 100 : -1" (18)
scheduler="taskScheduler" /> (19)
expire-group-upon-timeout="false" /> (20)
1 | resequencer 的 id 是可选的。 |
2 | resequencer 的 input channel。 必填。 |
3 | resequencer 将重新排序的消息发送到的通道。 自选。 |
4 | resequencer 将超时消息发送到的通道(如果设置为 )。
自选。send-partial-result-on-timeout false |
5 | 是立即发送有序序列,还是在整个消息组到达后才发送。
自选。
(默认值为 .)false |
6 | 对 a 的引用,可用于将消息组存储在其关联键下,直到它们完成。
自选。
(默认值为 volatile 内存存储。MessageGroupStore |
7 | 在组过期时,是否应发送有序的组(即使缺少某些消息)。
自选。
(默认值为 false。
请参阅管理聚合器中的状态:MessageGroupStore 。 |
8 | 向 或 发送回复时要等待的超时间隔。
仅当 output 通道具有一些 'sending' 限制(例如具有固定 'capacity' 时),它才适用。
在这种情况下,会引发 a。
对于实现,将忽略 。
对于 ,从计划过期任务开始,将重新计划此任务。
自选。Message output-channel discard-channel QueueChannel MessageDeliveryException send-timeout AbstractSubscribableChannel group-timeout(-expression) MessageDeliveryException |
9 | 对实现消息关联(分组)算法的 Bean 的引用。
bean 可以是接口的实现或 POJO。
在后一种情况下,还必须定义 attribute。
自选。
(默认情况下,聚合器使用 header.)CorrelationStrategy correlation-strategy-method IntegrationMessageHeaderAccessor.CORRELATION_ID |
10 | 在 Bean 上定义的方法,该方法由 Bean 引用并实现相关决策算法。
可选,有限制(需要存在)。correlation-strategy correlation-strategy |
11 | 表示关联策略的 SPEL 表达式。
例:。
只允许使用其中之一 or。"headers['something']" correlation-strategy correlation-strategy-expression |
12 | 对实现发布策略的 bean 的引用。
bean 可以是接口的实现或 POJO。
在后一种情况下,还必须定义 attribute。
可选(默认情况下,聚合器将使用 header 属性)。ReleaseStrategy release-strategy-method IntegrationMessageHeaderAccessor.SEQUENCE_SIZE |
13 | 在 Bean 上定义的方法,该方法引用并实现完成决策算法。
可选,有限制(需要存在)。release-strategy release-strategy |
14 | 表示发布策略的 SPEL 表达式。
表达式的根对象是 .
例:。
只允许使用其中之一 or。MessageGroup "size() == 5" release-strategy release-strategy-expression |
15 | 仅当为 配置了 时 才适用。
默认情况下,当 配置为使部分组过期时,也会删除空组。
正常释放组后,存在空组。
这是为了能够检测和丢弃延迟到达的消息。
如果您希望使空组过期的时间比使部分组过期的时间更长,请设置此属性。
然后,空组不会从 中删除,直到它们至少在此毫秒数内未被修改。
请注意,使空 group 过期的实际时间也受收割者的 timeout 属性的影响,它可能等于此值加上 timeout。MessageGroupStoreReaper <resequencer> MessageStore MessageGroupStoreReaper MessageStore |
16 | 请参阅使用 XML 配置聚合器。 |
17 | 请参阅使用 XML 配置聚合器。 |
18 | 请参阅使用 XML 配置聚合器。 |
19 | 请参阅使用 XML 配置聚合器。 |
20 | 默认情况下,当组由于超时(或 a )而完成时,将保留空组的元数据。
延迟到达的消息将被立即丢弃。
将此项设置为 可完全删除组。
然后,延迟到达的消息将启动一个新组,并且在该组再次超时之前不会被丢弃。
由于导致超时的序列范围中存在 “hole” ,因此新组永远不会正常释放。
空组可以稍后通过使用 a 和属性来过期(完全删除)。
从版本 5.0 开始,空组也会计划在过期后删除。
默认值为 'false'。MessageGroupStoreReaper true MessageGroupStoreReaper empty-group-min-timeout empty-group-min-timeout |
另请参阅 Aggregator Expiring Groups 了解更多信息。
由于在 Java 类中没有要为 resequencers 实现的自定义行为,因此没有对它的 Comments 支持。 |