聚合
基本上,聚合器是拆分器的镜像,是一种消息处理程序,它接收多条消息并将它们组合成一条消息。 事实上,聚合器通常是包含拆分器的管道中的下游使用者。
从技术上讲,聚合器比 splitter 更复杂,因为它是有状态的。
它必须保存要聚合的消息,并确定何时准备好聚合整个消息组。
为此,它需要一个 .MessageStore
功能性
Aggregator 通过关联和存储一组相关消息来组合这些消息,直到该组被视为完整为止。 此时,聚合器通过处理整个组来创建单个消息,并将聚合消息作为输出发送。
实现聚合器需要提供执行聚合(即,从多个中创建单个消息)的逻辑。 两个相关概念是 correlation 和 release。
Correlation 确定如何对消息进行分组以进行聚合。
在 Spring 集成中,默认情况下,关联是根据消息头完成的。
具有相同消息的消息将归为一组。
但是,您可以自定义关联策略,以允许以其他方式指定应如何将消息分组在一起。
为此,您可以实现 a (本章稍后将介绍)。IntegrationMessageHeaderAccessor.CORRELATION_ID
IntegrationMessageHeaderAccessor.CORRELATION_ID
CorrelationStrategy
要确定一组消息的准备处理点,请查阅 a。
当序列中包含的所有消息都存在时,聚合器的默认发布策略会根据标头释放一个组。
您可以通过提供对自定义实施的引用来覆盖此默认策略。ReleaseStrategy
IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
ReleaseStrategy
编程模型
聚合 API 由许多类组成:
-
interface 及其子类: 和
MessageGroupProcessor
MethodInvokingAggregatingMessageGroupProcessor
ExpressionEvaluatingMessageGroupProcessor
-
接口及其默认实现:
ReleaseStrategy
SimpleSequenceSizeReleaseStrategy
-
接口及其默认实现:
CorrelationStrategy
HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
( 的子类 )是一种实现,封装了聚合器(和其他相关用例)的常见功能,如下所示:AggregatingMessageHandler
AbstractCorrelatingMessageHandler
MessageHandler
-
将消息关联到要聚合的组中
-
将这些消息保留在 中,直到可以释放组
MessageStore
-
决定何时可以发布组
-
将已发布的组聚合到一条消息中
-
识别和响应过期的组
决定如何将消息组合在一起的责任委派给实例。
决定是否可以释放消息组的责任委托给实例。CorrelationStrategy
ReleaseStrategy
下面的清单显示了 base 的简要亮点(实现该方法的责任留给了开发人员):AbstractAggregatingMessageGroupProcessor
aggregatePayloads
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
请参阅 和 作为 .DefaultAggregatingMessageGroupProcessor
ExpressionEvaluatingMessageGroupProcessor
MethodInvokingMessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
从版本 5.2 开始,可以使用一种策略来合并和计算(聚合)输出消息的标头。
该实现可用于返回组之间没有冲突的所有 Headers 的逻辑;组中的一封或多封邮件的报头缺失不被视为冲突。
冲突的标头将被省略。
与新引入的 一起,此函数用于任何任意(非)实现。
从本质上讲,框架将提供的函数注入到实例中,并将所有其他实现包装到 .
和 the 之间的逻辑差异是,后者在调用委托策略之前不会提前计算标头,如果委托返回 或 ,则不会调用函数。
在这种情况下,框架假定目标实现已负责生成一组填充到返回结果中的适当标头。
该策略可用作 XML 配置的 reference 属性、Java DSL 的选项以及普通 Java 配置的选项。Function<MessageGroup, Map<String, Object>>
AbstractAggregatingMessageGroupProcessor
DefaultAggregateHeadersFunction
DelegatingMessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
MessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
DelegatingMessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
DelegatingMessageGroupProcessor
Message
AbstractIntegrationMessageBuilder
Function<MessageGroup, Map<String, Object>>
headers-function
AggregatorSpec.headersFunction()
AggregatorFactoryBean.setHeadersFunction()
该 由 拥有,并且具有基于消息标头的默认值,如下例所示:CorrelationStrategy
AbstractCorrelatingMessageHandler
IntegrationMessageHeaderAccessor.CORRELATION_ID
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是 .
它会创建一个 single,其 payload 是给定组收到的 payload 之一。
这适用于使用拆分器、发布-订阅通道或上游收件人列表路由器的简单分散-收集实现。DefaultAggregatingMessageGroupProcessor
Message
List
在此类场景中使用 publish-subscribe 通道或收件人列表路由器时,请务必启用该标志。
这样做会添加必要的标头:、 、 和 .
默认情况下,对于 Spring 集成中的拆分器,该行为是启用的,但不会为发布-订阅通道或收件人列表路由器启用该行为,因为这些组件可能用于不需要这些 Headers 的各种上下文中。apply-sequence CORRELATION_ID SEQUENCE_NUMBER SEQUENCE_SIZE |
在为应用程序实施特定的聚合器策略时,您可以扩展和实施该方法。
但是,有更好的解决方案,与 API 的耦合较少,用于实现聚合逻辑,可以通过 XML 或 Comments 进行配置。AbstractAggregatingMessageGroupProcessor
aggregatePayloads
通常,任何 POJO 都可以实现聚合算法,前提是它提供了一个接受 single 作为参数的方法(也支持参数化列表)。
调用该方法用于聚合消息,如下所示:java.util.List
-
如果参数是 a 且参数类型 T is assignable to ,则为聚合而累积的消息的整个列表将发送到聚合器。
java.util.Collection<T>
Message
-
如果参数是非参数化的或参数类型不可分配给 ,则该方法将接收累积消息的有效负载。
java.util.Collection
Message
-
如果返回类型不可分配给 ,则会将其视为框架自动创建的 a 的有效负载。
Message
Message
为了实现代码简单性并促进最佳实践(如低耦合、可测试性等),实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中对其进行配置。 |
从版本 5.3 开始,在处理消息组后,an 会为具有多个嵌套级别的适当 splitter-aggregator 场景执行消息头修改。
仅当消息组发布结果不是消息集合时,才会执行该操作。
在这种情况下,目标负责在构建这些消息时进行调用。AbstractCorrelatingMessageHandler
MessageBuilder.popSequenceDetails()
MessageGroupProcessor
MessageBuilder.popSequenceDetails()
如果 返回 a ,则仅当输出消息与组中的第一条消息匹配时,才会对输出消息执行 a 。
(以前,仅当 plain payload 或 an 已从 返回时,才会执行此操作。MessageGroupProcessor
Message
MessageBuilder.popSequenceDetails()
sequenceDetails
AbstractIntegrationMessageBuilder
MessageGroupProcessor
此功能可由新属性控制,因此,当标准拆分器尚未填充关联详细信息时,可以禁用 。
此属性实质上撤消了 .
有关更多信息,请参阅 Splitter 。popSequence
boolean
MessageBuilder.popSequenceDetails()
applySequence = true
AbstractMessageSplitter
该方法返回一个 .
因此,如果聚合 POJO 方法具有参数,则传入的参数正是该实例,并且当您对聚合器使用 a 时,将在释放组后清除该原始对象。
因此,如果 POJO 中的变量从聚合器中传出,它也会被清除。
如果您只想按原样发布该集合以进行进一步处理,则必须构建一个新的(例如, )。
从版本 4.3 开始,框架不再将消息复制到新集合,以避免创建不需要的额外对象。SimpleMessageGroup.getMessages() unmodifiableCollection Collection<Message> Collection SimpleMessageStore Collection<Message> Collection<Message> Collection new ArrayList<Message>(messages) |
在版本 4.2 之前,无法使用 XML 配置提供 。
只有 POJO 方法可用于聚合。
现在,如果框架检测到引用的(或内部的)bean implements ,则将其用作聚合器的输出处理器。MessageGroupProcessor
MessageProcessor
如果您希望从自定义中释放对象集合作为消息的有效负载,则您的类应扩展并实现 .MessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
aggregatePayloads()
此外,从版本 4.2 开始,提供了 a。
它返回来自组的消息集合,如前所述,这会导致单独发送已发布的消息。SimpleMessageGroupProcessor
这使得聚合器可以充当消息屏障,其中到达的消息将被保留,直到发布策略触发并且该组作为单个消息序列发布。
从版本 6.0 开始,上述拆分行为仅在组处理器为 .
否则,对于返回 , 则仅发出一条回复消息,并将整个消息集合作为其有效负载。
这种逻辑由聚合器的规范目的决定 - 通过某个键收集请求消息并生成单个分组消息。SimpleMessageGroupProcessor
MessageGroupProcessor
Collection<Message>
ReleaseStrategy
接口定义如下:ReleaseStrategy
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
通常,如果任何 POJO 提供接受 single 作为参数的方法(也支持参数化列表)并返回布尔值,则任何 POJO 都可以实现完成决策逻辑。
在每条新消息到达后调用此方法,以确定组是否完成,如下所示:java.util.List
-
如果参数是 a 且参数类型是 assignable to ,则组中累积的整个消息列表将发送到该方法。
java.util.List<T>
T
Message
-
如果参数是非参数化的或参数类型不是 assign to ,则该方法接收累积消息的有效负载。
java.util.List
Message
-
如果消息组已准备好进行聚合,则必须返回该方法,否则返回 false。
true
以下示例显示如何对 type 为 a 的注释 :@ReleaseStrategy
List
Message
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例显示如何对 type 为 a 的注释 :@ReleaseStrategy
List
String
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
根据前两个示例中的签名,基于 POJO 的发布策略将传递尚未发布的消息(如果需要访问整个)或 payload 对象(如果 type 参数不是 )。
这满足了大多数使用案例。
但是,如果出于某种原因,您需要访问 full ,则应提供接口的实现。Collection
Message
Collection
Message
MessageGroup
ReleaseStrategy
在处理可能较大的组时,您应该了解如何调用这些方法,因为在释放组之前可能会多次调用发布策略。
最有效的是 的实现,因为聚合器可以直接调用它。
第二高效的是具有参数类型的 POJO 方法。
效率最低的是具有类型的 POJO 方法。
每次调用发布策略时,框架都必须将有效负载从组中的消息复制到新集合中(并可能尝试将有效负载转换为 )。
使用可避免转换,但仍需要创建新的 . 出于这些原因,对于大型组,我们建议您实施 . |
当释放组进行聚合时,将处理其所有尚未发布的消息并将其从组中删除。
如果组也已完成(即,如果序列中的所有消息都已到达或未定义序列),则该组将标记为完成。
此组的任何新消息都将发送到 discard 通道(如果已定义)。
设置为 (默认为 ) 将删除整个组,并且任何新消息(与已删除的组具有相同的关联 ID)将形成一个新组。
您可以通过使用 a together 并将其设置为 来释放部分序列。expire-groups-upon-completion
true
false
MessageGroupStoreReaper
send-partial-result-on-expiry
true
为了便于丢弃延迟到达的消息,聚合器必须在组发布后维护有关该组的状态。
这最终可能导致内存不足情况。
为避免此类情况,应考虑配置 a 以删除组元数据。
应将过期参数设置为在到达某个点后使组过期,之后预计延迟消息不会到达。
有关配置收割者的信息,请参阅管理聚合器中的状态:MessageGroupStore 。MessageGroupStoreReaper |
Spring 集成提供了 : 的实现。
此实现会查阅每条到达消息的 and 标头,以确定消息组何时完成并准备好进行聚合。
如前所述,它也是默认策略。ReleaseStrategy
SimpleSequenceSizeReleaseStrategy
SEQUENCE_NUMBER
SEQUENCE_SIZE
在版本 5.0 之前,默认发布策略为 ,该策略在大型组中表现不佳。
使用该策略,可以检测并拒绝重复的序列号。
此操作可能很昂贵。SequenceSizeReleaseStrategy |
如果要聚合大型组,则无需释放部分组,也不需要检测/拒绝重复序列,请考虑改用 - 对于这些用例,它的效率要高得多,并且自版本 5.0 以来,当未指定部分组释放时,它是默认设置。SimpleSequenceSizeReleaseStrategy
聚合大型组
4.3 版本将 a 中消息的默认值更改为(以前是 )。
当从大型组中删除单个消息时,这很昂贵(需要 O(n) 线性扫描)。
尽管删除哈希集的速度通常要快得多,但对于大型消息来说,它的成本可能很高,因为必须在 insert 和 remove 上计算哈希值。
如果您的消息哈希成本很高,请考虑使用其他一些集合类型。
如使用MessageGroupFactory
中所述,提供了a,以便您可以选择最适合您需求的。
您还可以提供自己的工厂实现来创建其他一些 .Collection
SimpleMessageGroup
HashSet
BlockingQueue
SimpleMessageGroupFactory
Collection
Collection<Message<?>>
以下示例显示如何使用前面的实现和 :SimpleSequenceSizeReleaseStrategy
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果 filter 端点涉及聚合器上游的流,则 sequence size release strategy(固定的或基于标头的)将无法实现其目的,因为 sequence 中的某些消息可能会被 filter 丢弃。
在这种情况下,建议选择另一个 ,或使用从丢弃子流发送的补偿消息,该子流的内容中携带一些信息,以便在自定义完整组函数中跳过。
有关更多信息,请参阅过滤器。sequenceSize ReleaseStrategy |
关联策略
接口定义如下:CorrelationStrategy
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个,该键表示用于将消息与消息组关联的相关键。
该键必须满足 中用于键的条件,以便实现 和 。Object
Map
equals()
hashCode()
通常,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法的参数(或多个参数)的规则与 a 相同(包括对 Comments 的支持)。
该方法必须返回一个值,并且该值不能为 。ServiceActivator
@Header
null
Spring 集成提供了 : 的实现。
此实现返回其中一个消息标头(其名称由 constructor 参数指定)的值作为相关键。
默认情况下,关联策略是返回 header 属性值的 a。
如果您有要用于关联的自定义标头名称,则可以在 的实例上配置该名称,并将其作为聚合器关联策略的参考提供。CorrelationStrategy
HeaderAttributeCorrelationStrategy
HeaderAttributeCorrelationStrategy
CORRELATION_ID
HeaderAttributeCorrelationStrategy
锁定注册表
对组的更改是线程安全的。
因此,当您同时发送同一相关 ID 的消息时,聚合器中将只处理其中一个消息,从而使其实际上每个消息组都是单线程的。
A 用于获取已解析相关 ID 的锁。
默认情况下使用 A (in-memory)。
要在使用共享的服务器之间同步更新,您必须配置共享锁注册表。LockRegistry
DefaultLockRegistry
MessageGroupStore
避免死锁
如上所述,当消息组发生更改(添加或释放消息)时,将持有一个锁。
请考虑以程:
...->aggregator1-> ... ->aggregator2-> ...
如果有多个线程,并且聚合器共享一个公共锁注册表,则可能会出现死锁。
这将导致线程挂起,并可能显示如下结果:jstack <pid>
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免此问题:
-
确保每个聚合器都有自己的 Lock Registry(这可以是跨应用程序实例的共享注册表,但流中的两个或多个聚合器必须各自具有不同的注册表)
-
使用 OR 作为聚合器的输出通道,以便下游流在新线程上运行
ExecutorChannel
QueueChannel
-
从版本 5.1.1 开始,将 Aggregator 属性设置为
releaseLockBeforeSend
true
如果由于某种原因,单个聚合器的输出最终路由回同一聚合器,也可能导致此问题。 当然,上述第一种解决方案不适用于这种情况。 |
在 Java DSL 中配置聚合器
有关如何在 Java DSL 中配置聚合器的信息,请参见 Aggregators 和 Resequencers。
使用 XML 配置聚合器
Spring 集成支持通过元素使用 XML 配置聚合器。
以下示例显示了聚合器的示例:<aggregator/>
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | 聚合商的 ID 是可选的。 |
2 | 生命周期属性指示是否应在应用程序上下文启动期间启动聚合器。 可选(默认值为 'true')。 |
3 | 聚合器从中接收消息的通道。 必填。 |
4 | 聚合器将聚合结果发送到的通道。 可选(因为传入消息本身可以在 'replyChannel' 消息头中指定回复通道)。 |
5 | 聚合器将超时消息发送到的通道 (if is )。
自选。send-partial-result-on-expiry false |
6 | 对 a 的引用,用于将消息组存储在其关联键下,直到它们完成。
自选。
默认情况下,它是一个易失性内存存储。
有关更多信息,请参阅 Message Store 。MessageGroupStore |
7 | 当多个句柄订阅到同一 (用于负载平衡目的) 时,此聚合器的顺序。
自选。DirectChannel |
8 | 表示过期的消息应该被聚合,并在其包含过期后发送到 'output-channel' 或 'replyChannel' (参见 MessageGroupStore.expireMessageGroups(long) )。
使 a 过期的一种方法是配置 .
但是,您也可以通过调用 来过期。
您可以通过 Control Bus 操作来实现此目的,或者,如果您有对实例的引用,则通过调用 .
否则,此属性本身不会执行任何操作。
它仅用作指示是否丢弃或将仍在 中即将过期的任何消息发送到 output 或 reply 通道。
可选(默认值为 )。
注意:此属性可能更合适地称为 ,因为如果设置为 ,则组实际上可能不会过期。MessageGroup MessageGroup MessageGroupStoreReaper MessageGroup MessageGroupStore.expireMessageGroups(timeout) MessageGroupStore expireMessageGroups(timeout) MessageGroup false send-partial-result-on-timeout expire-groups-upon-timeout false |
9 | 向 或 发送回复时要等待的超时间隔。
默认为秒。
仅当 output 通道具有一些 'sending' 限制(例如具有固定 'capacity' 时),它才适用。
在这种情况下,会引发 a。
对于实现, 则 被忽略 。
对于 ,从计划过期任务开始将导致重新计划此任务。
自选。Message output-channel discard-channel 30 QueueChannel MessageDeliveryException AbstractSubscribableChannel send-timeout group-timeout(-expression) MessageDeliveryException |
10 | 对实现消息关联(分组)算法的 Bean 的引用。
bean 可以是接口的实现或 POJO。
在后一种情况下,还必须定义 attribute。
可选(默认情况下,聚合器使用 header)。CorrelationStrategy correlation-strategy-method IntegrationMessageHeaderAccessor.CORRELATION_ID |
11 | 在 Bean 上定义的方法,由 引用。
它实现了关联决策算法。
可选,有限制 ( must be present)。correlation-strategy correlation-strategy |
12 | 表示关联策略的 SPEL 表达式。
例:。
只允许使用其中之一 or。"headers['something']" correlation-strategy correlation-strategy-expression |
13 | 对在应用程序上下文中定义的 Bean 的引用。 如前所述,Bean 必须实现聚合逻辑。 可选(默认情况下,聚合消息列表将成为输出消息的有效负载)。 |
14 | 在 Bean 上定义的方法,该属性引用该属性。
它实现了消息聚合算法。
可选 (取决于定义的属性)。ref ref |
15 | 对实现发布策略的 bean 的引用。
bean 可以是接口的实现或 POJO。
在后一种情况下,还必须定义 attribute。
可选(默认情况下,聚合商使用 header 属性)。ReleaseStrategy release-strategy-method IntegrationMessageHeaderAccessor.SEQUENCE_SIZE |
16 | 在 Bean 上定义的方法,该属性引用该属性。
它实现了完成决策算法。
可选,有限制 ( must be present)。release-strategy release-strategy |
17 | 表示发布策略的 SPEL 表达式。
表达式的根对象是 .
例:。
只允许使用其中之一 or。MessageGroup "size() == 5" release-strategy release-strategy-expression |
18 | 当设置为 (default is )时,已完成的组将从邮件存储中删除,从而让具有相同关联的后续邮件形成一个新组。
默认行为是将具有与已完成组相同的关联的消息发送到 .true false discard-channel |
19 | 仅当为 的 配置了 时适用。
默认情况下,当 配置为使部分组过期时,也会删除空组。
在正常释放组后,存在空组。
空组允许检测和丢弃延迟到达的消息。
如果您希望使空组过期的时间比使部分组过期的时间更长,请设置此属性。
然后,空组不会从 中删除,直到它们至少在此毫秒数内未被修改。
请注意,空 group 过期的实际时间也受 reaper 属性的影响,它可能与此值加上 timeout 一样多。MessageGroupStoreReaper MessageStore <aggregator> MessageGroupStoreReaper MessageStore timeout |
20 | 对 Bean 的引用。
它过去用于获取基于 的 for 并发操作。
默认情况下,使用 internal 。
使用 distributed ,例如 ,可确保聚合器只有一个实例可以同时对组进行操作。
有关更多信息,请参阅 Redis Lock Registry 或 Zookeeper Lock Registry。org.springframework.integration.util.LockRegistry Lock groupId MessageGroup DefaultLockRegistry LockRegistry ZookeeperLockRegistry |
21 | 超时(以毫秒为单位),用于在当前消息到达时不释放组时强制完成。
当需要发出部分结果(或丢弃组)时,如果新消息在超时(从最后一条消息到达的时间开始计算)内未到达,则此属性为聚合器提供内置的基于时间的发布策略。
要设置从创建时间开始计算的超时,请参阅 information。
当新消息到达聚合器时,将取消其任何现有消息。
如果返回 (表示不发布) 和 ,则计划新任务使组过期。
我们不建议将此属性设置为零(或负值)。
这样做可以有效地禁用聚合器,因为每个消息组都会立即完成。
但是,您可以使用表达式有条件地将其设置为零(或负值)。
请参阅 以获取信息。
完成期间执行的操作取决于 和 属性。
有关更多信息,请参阅 Aggregator 和 Group Timeout 。
它与 'group-timeout-expression' 属性互斥。MessageGroup ReleaseStrategy MessageGroup MessageGroup group-timeout-expression ScheduledFuture<?> MessageGroup ReleaseStrategy false groupTimeout > 0 group-timeout-expression ReleaseStrategy send-partial-group-on-expiry |
22 | 计算结果为 a 的 SPEL 表达式,其中 作为计算上下文对象。
用于安排 强制完成。
如果表达式的计算结果为 ,则不会计划完成。
如果计算结果为零,则立即在当前线程上完成该组。
实际上,这提供了一个动态属性。
例如,如果您希望在自创建组以来经过 10 秒后强制完成 a,则可以考虑使用以下 SpEL 表达式:其中由 as the here 是评估上下文对象。
但请记住,组创建时间可能与第一条消息的时间不同,具体取决于其他组过期属性的配置。
有关更多信息,请参阅。
与 'group-timeout' 属性互斥。groupTimeout MessageGroup #root MessageGroup null group-timeout MessageGroup timestamp + 10000 - T(System).currentTimeMillis() timestamp MessageGroup.getTimestamp() MessageGroup #root group-timeout |
23 | 当组由于超时(或 a )而完成时,默认情况下,该组将过期(完全删除)。
迟到的消息将启动一个新组。
您可以将此项设置为完成组,但保留其元数据,以便丢弃延迟到达的消息。
空组可以稍后使用 a 和 属性过期。
它默认为 'true'。MessageGroupStoreReaper false MessageGroupStoreReaper empty-group-min-timeout |
24 | 一个 bean 引用,用于安排 在 中没有新消息到达时强制完成 。
如果未提供,则使用在 () 中注册的默认调度程序 () 。
如果指定了 或 未指定,则此属性不适用。TaskScheduler MessageGroup MessageGroup groupTimeout taskScheduler ApplicationContext ThreadPoolTaskScheduler group-timeout group-timeout-expression |
25 | 从 4.1 版本开始。
它允许为操作启动事务。
它由 a 或 发起,不适用于正常的 、 和 操作。
仅允许此子元素 or。forceComplete group-timeout(-expression) MessageGroupStoreReaper add release discard <expire-advice-chain/> |
26 | 从 4.1 版本开始。
它允许为操作配置 any。
它由 a 或 发起,不适用于正常的 、 和 操作。
仅允许此子元素 or。
还可以使用 Spring 命名空间在此处配置事务。Advice forceComplete group-timeout(-expression) MessageGroupStoreReaper add release discard <expire-transactional/> Advice tx |
过期组
有两个属性与过期(完全删除)组相关。
当组过期时,没有该组的记录,如果到达具有相同关联的新消息,则会启动一个新组。
当组完成(无过期)时,空组将保留,并丢弃延迟到达的消息。
稍后可以通过将 a 与属性结合使用来删除空组。
如果组未正常完成,但因超时而被释放或丢弃,则组通常已过期。
从版本 4.1 开始,您可以使用 .
它默认为向后兼容。
从版本 5.0 开始,空组也计划在 之后删除。
如果为 和 ,则在正常或部分序列发布发生时计划删除组的任务。 从版本 5.4 开始,可以将聚合器(和重新排序器)配置为使孤立组(持久性消息存储中的组)过期,否则可能不会发布)。
(如果大于 )表示应清除存储中早于此值的组。
该方法在启动时调用,并与提供的 一起在计划任务中定期调用。
此方法也可以随时从外部调用。
根据上述提供的过期选项,过期逻辑完全委托给功能。
当需要从那些不会再使用常规消息到达逻辑释放的旧组中清除消息存储时,这种定期清除功能非常有用。
在大多数情况下,这发生在应用程序重新启动后,当使用持久性消息组存储时。
该功能类似于 Scheduled Task,但在使用 group timeout 而不是 reaper 时,提供了一种处理特定组件中的旧组的便捷方法。
必须专门为当前关联终端节点提供。
否则,一个聚合商可能会从另一个聚合商中清除组。
使用聚合器时,使用此技术过期的组将被丢弃或作为部分组发布,具体取决于属性。 |
如果自定义聚合器处理程序实现可能在其他定义中引用,我们通常建议使用属性。
但是,如果自定义聚合器实现仅由单个定义使用,则可以使用内部 Bean 定义(从版本 1.0.3 开始)在元素中配置聚合 POJO,如下例所示:ref
<aggregator>
<aggregator>
<aggregator>
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
不允许在同一配置中同时使用属性和内部 Bean 定义,因为它会产生不明确的条件。
在这种情况下,将引发 Exception。ref <aggregator> |
以下示例显示了聚合器 Bean 的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前面示例的完成策略 Bean 的实现可能如下所示:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
只要有必要这样做,就可以将发布策略方法和 aggregator 方法组合成一个 bean。 |
上面示例的相关策略 bean 的实现可能如下所示:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面示例中的聚合器将按某个标准对数字进行分组(在本例中为除以 10 后的余数)并保留该组,直到有效负载提供的数字之和超过特定值。
只要有必要,就可以将发布策略方法、相关策略方法和 聚合器方法组合到单个 bean 中。 (实际上,它们全部或其中任何两个都可以组合。 |
聚合器和 Spring 表达式语言 (SpEL)
从 Spring Integration 2.0 开始,你可以使用 SPEL 处理各种策略(关联、发布和聚合),如果这种发布策略背后的逻辑相对简单,我们建议这样做。
假设您有一个 legacy 组件,该组件旨在接收对象数组。
我们知道,默认发布策略会将所有聚合消息组合到 .
现在我们有两个问题。
首先,我们需要从列表中提取单个消息。
其次,我们需要提取每条消息的有效负载并组装对象数组。
以下示例解决了这两个问题:List
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
但是,使用 SPEL,实际上可以通过单行表达式相对容易地处理此类需求,从而避免编写自定义类并将其配置为 bean。 以下示例显示了如何执行此操作:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们使用集合投影表达式从列表中所有消息的有效负载中组装一个新的集合,然后将其转换为数组,从而获得与早期 Java 代码相同的结果。
在处理自定义发布和关联策略时,您可以应用相同的基于表达式的方法。
您可以将简单的关联逻辑实现为 SPEL 表达式并在属性中对其进行配置,而不是在属性中为自定义定义 Bean,如下例所示:CorrelationStrategy
correlation-strategy
correlation-strategy-expression
correlation-strategy-expression="payload.person.id"
在前面的示例中,我们假设有效负载具有一个带有 的属性,该属性将用于关联消息。person
id
同样,对于 ,您可以将发布逻辑实现为 SpEL 表达式,并在属性中对其进行配置。
evaluation context 的根对象是 itself.
可以通过在表达式中使用 group 的属性来引用 of messages。ReleaseStrategy
release-strategy-expression
MessageGroup
List
message
在版本 5.0 之前的版本中,根对象是 的集合,如前面的示例所示:Message<?> |
release-strategy-expression="!messages.?[payload==5].empty"
在前面的示例中,SPEL 评估上下文的根对象是自身,并且您表示,只要此组中有有效负载为 的消息,就应该释放该组。MessageGroup
5
聚合器和组超时
从版本 4.0 开始,引入了两个新的互斥属性:和 .
请参阅使用 XML 配置聚合器。
在某些情况下,如果当前消息到达时没有释放,则可能需要在超时后发出聚合器结果(或丢弃该组)。
为此,该选项允许安排 be forced to complete,如下例所示:group-timeout
group-timeout-expression
ReleaseStrategy
groupTimeout
MessageGroup
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此示例中,如果聚合器按顺序接收最后一条消息,则可以正常发布。
如果该特定消息未到达,则强制该组在 10 秒后完成,只要该组至少包含两个 Message。release-strategy-expression
groupTimeout
强制组完成的结果取决于 和 。
首先,再次咨询发布策略,看看是否要进行正常的发布。
虽然组尚未更改,但此时可以决定释放组。
如果发布策略仍未释放该组,则表示该组已过期。
如果为 ,则(部分)中的现有消息将作为普通聚合器回复消息发布到 。
否则,它将被丢弃。ReleaseStrategy
send-partial-result-on-expiry
ReleaseStrategy
send-partial-result-on-expiry
true
MessageGroup
output-channel
行为 和 之间存在差异(请参阅 使用 XML 配置 Aggregator)。
reaper 定期启动对所有 s 的强制完成。
如果在 期间没有新消息到达,则 会单独对每个消息执行此操作。
此外,收割者可用于删除空组(如果为 false,则保留空组以丢弃延迟的消息)。groupTimeout
MessageGroupStoreReaper
MessageGroup
MessageGroupStore
groupTimeout
MessageGroup
groupTimeout
expire-groups-upon-completion
从版本 5.5 开始,可以评估为实例。
这在以下情况下非常有用,例如根据组创建时间 () 而不是当前消息到达来确定计划任务时刻,因为它是在计算结果时计算的:groupTimeoutExpression
java.util.Date
MessageGroup.getTimestamp()
groupTimeoutExpression
long
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注释配置聚合器
以下示例显示了配置了注释的聚合器:
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
1 | 一个注释,指示此方法应用作聚合器。 如果将此类用作聚合器,则必须指定它。 |
2 | 一个注释,指示此方法用作聚合器的发布策略。
如果任何方法中都不存在,则聚合器将使用 .SimpleSequenceSizeReleaseStrategy |
3 | 一个注释,指示此方法应用作聚合器的关联策略。
如果未指示关联策略,则聚合器将使用基于 .HeaderAttributeCorrelationStrategy CORRELATION_ID |
XML 元素提供的所有配置选项也可用于注释。@Aggregator
聚合器可以从 XML 中显式引用,或者如果在类上定义了 ,则通过 Classpath 扫描自动检测。@MessageEndpoint
Aggregator 组件的 Comments 配置(和其他配置)仅涵盖简单的用例,其中大多数默认选项就足够了。
如果在使用 Comments 配置时需要对这些选项进行更多控制,请考虑使用 the 的定义,并用 标记其方法,如下例所示:@Aggregator
@Bean
AggregatingMessageHandler
@Bean
@ServiceActivator
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
有关更多信息,请参见Programming Model and Annotations on @Bean
Methods。
从版本 4.2 开始,可以使用来简化 .AggregatorFactoryBean AggregatingMessageHandler |
在 Aggregator 中管理状态:MessageGroupStore
Aggregator(以及 Spring Integration 中的其他一些模式)是一种有状态模式,它要求根据一段时间内到达的一组消息做出决策,所有这些消息都具有相同的关联键。
有状态模式(例如 )中接口的设计是由以下原则驱动的:组件(无论是由框架定义还是由用户定义)应能够保持无状态。
所有状态都由 承载,其管理委托给 。
接口定义如下:ReleaseStrategy
MessageGroup
MessageGroupStore
MessageGroupStore
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关更多信息,请参阅 Javadoc。
在等待发布策略触发时累积状态信息,并且该事件可能永远不会发生。
因此,为了防止过时的消息挥之不去,并让易失性存储提供一个钩子,以便在应用程序关闭时进行清理,允许您注册回调,以便在它们过期时应用于它。
该界面非常简单,如下面的清单所示:MessageGroupStore
MessageGroups
MessageGroupStore
MessageGroups
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过从存储中完全删除组)。
维护这些回调的列表,它根据需要将其应用于时间戳早于作为参数提供的时间的所有消息(请参阅前面描述的 和 方法)。MessageGroupStore
registerMessageGroupExpiryCallback(..)
expireMessageGroups(..)
当您打算依赖该功能时,不要在不同的聚合器组件中使用相同的实例,这一点很重要。
每个 Symbol 都会根据回调注册自己的 Alpha S Alpha S Role。
这样,每个过期的组都可能被错误的聚合器完成或丢弃。
从版本 5.0.10 开始,a 用于 .
反过来,该 会检查是否存在此类的实例,如果回调集中已存在该实例,则记录错误并显示相应的消息。
这样,框架不允许在不同的聚合器/重排序器中使用实例,以避免上述未由特定关联处理程序创建的组过期的副作用。MessageGroupStore expireMessageGroups AbstractCorrelatingMessageHandler MessageGroupCallback forceComplete() UniqueExpiryCallback AbstractCorrelatingMessageHandler MessageGroupStore MessageGroupStore MessageGroupStore |
您可以使用超时值调用该方法。
任何早于当前时间减去此值的消息都已过期,并应用了回调。
因此,是 store 的用户定义了消息组 “expiry” 的含义。expireMessageGroups
为了方便用户, Spring 集成以 a 的形式为消息过期提供了一个包装器,如下例所示:MessageGroupStoreReaper
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割者是一个 .
在前面的示例中,消息组存储的 expire 方法每 10 秒调用一次。
超时本身为 30 秒。Runnable
请务必了解,的 'timeout' 属性是一个近似值,受任务计划程序速率的影响,因为此属性仅在任务的下一次计划执行时检查。
例如,如果超时设置为 10 分钟,但任务计划每小时运行一次,并且任务的最后一次执行发生在超时前一分钟,则在接下来的 59 分钟内不会过期。
因此,我们建议将速率设置为至少等于或更短的超时值。MessageGroupStoreReaper MessageGroupStoreReaper MessageGroupStoreReaper MessageGroupStoreReaper MessageGroup |
除了 reaper 之外,当应用程序关闭时,还会通过 .AbstractCorrelatingMessageHandler
它会注册自己的过期回调,这是聚合器的 XML 配置中带有 boolean 标志的链接。
如果该标志设置为 ,则在调用到期回调时,可将尚未发布的组中的任何未标记消息发送到输出通道。AbstractCorrelatingMessageHandler
send-partial-result-on-expiry
true
由于 the 是从计划任务中调用的,并且可能导致向下游集成流生成消息(取决于选项),因此建议通过 提供具有 to 处理程序异常的自定义,因为常规聚合器发布功能可能会预期如此。
相同的逻辑也适用于组超时功能,该功能也依赖于 .
有关更多信息,请参阅错误处理。MessageGroupStoreReaper sendPartialResultOnExpiry TaskScheduler MessagePublishingErrorHandler errorChannel TaskScheduler |
当 shared 用于不同的关联终端节点时,您必须配置适当的 ID 以确保组 ID 的唯一性。
否则,当一个关联终端节点释放来自其他终端节点的消息或使其他关联终端节点的消息过期时,可能会发生意外行为。
具有相同关联键的消息存储在同一个消息组中。 某些实现允许通过对数据进行分区来使用相同的物理资源。
例如,the has a property, and the has a property. 有关接口及其实现的更多信息,请参阅 Message Store. |
Flux 聚合器
在 5.2 版中,引入了该组件。
它基于 Project Reactor 和运算符。
传入的消息将发出到由此组件的构造函数 in 发起的 中。
如果未提供 ,或者它不是 instance ,则从 implementation 完成对 main 的订阅。
否则,它将推迟到 implementation 完成的 subscription 。
消息按 using a for the group 键进行分组。
默认情况下,将查询消息的标头。FluxAggregatorMessageHandler
Flux.groupBy()
Flux.window()
FluxSink
Flux.create()
outputChannel
ReactiveStreamsSubscribableChannel
Flux
Lifecycle.start()
ReactiveStreamsSubscribableChannel
Flux.groupBy()
CorrelationStrategy
IntegrationMessageHeaderAccessor.CORRELATION_ID
默认情况下,每个关闭的窗口都作为要生成的消息的 in payload 发布。
此消息包含窗口中第一条消息的所有标头。
输出消息负载中的此内容必须在下游订阅和处理。
这样的逻辑可以被 的 配置选项 自定义 (或取代)。
例如,如果我们想在最终消息中包含 of payloads,我们可以配置如下:Flux
Flux
setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
FluxAggregatorMessageHandler
List
Flux.collectList()
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
to select an appropriate window 策略中有几个选项:FluxAggregatorMessageHandler
-
setBoundaryTrigger(Predicate<Message<?>>)
- 传播到运算符。 有关更多信息,请参阅其 JavaDocs。 优先于所有其他窗口选项。Flux.windowUntil()
-
setWindowSize(int)
和 - 传播到 或 。 默认情况下,窗口大小是根据组中的第一条消息及其标头计算的。setWindowSizeFunction(Function<Message<?>, Integer>)
Flux.window(int)
windowTimeout(int, Duration)
IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
-
setWindowTimespan(Duration)
- 传播到 OR,具体取决于窗口大小配置。Flux.window(Duration)
windowTimeout(int, Duration)
-
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 一个函数,用于将变换应用于 Grouped Fluxes 中,用于 Exposed 选项未涵盖的任何自定义窗口操作。
由于此组件是一个实现,因此它可以简单地用作定义和消息传递注释。
使用 Java DSL 可以从 EIP 方法使用它。
下面的示例演示了我们如何在运行时注册 a 以及如何将 a 与上游的拆分器相关联:MessageHandler
@Bean
@ServiceActivator
.handle()
IntegrationFlow
FluxAggregatorMessageHandler
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
消息组的条件
从版本 5.5 开始,一个(包括其 Java & XML DSLs)公开了一个实现选项。
此函数用于添加到组的每条消息,并将结果条件句子存储到组中以供将来考虑。
可以查询此条件,而不是迭代组中的所有消息。
有关更多信息,请参阅 JavaDocs 和 Message Group Condition 。AbstractCorrelatingMessageHandler
groupConditionSupplier
BiFunction<Message<?>, String, String>
ReleaseStrategy
GroupConditionProvider
另请参阅 File Aggregator。