此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
幂等接收器企业集成模式
从版本 4.1 开始, Spring 集成提供了幂等接收器企业集成模式的实现。
它是一个函数模式,整个幂等逻辑应该在应用程序中实现。
但是,为了简化决策,提供了该组件。
这是应用于方法的 AOP,可以根据其配置请求消息或将其标记为 。IdempotentReceiverInterceptor
Advice
MessageHandler.handleMessage()
filter
duplicate
以前,您可以通过在 (请参阅 Filter) 中使用 custom 来实现此模式。
但是,由于此模式实际上定义了终端节点的行为,而不是终端节点本身,因此幂等接收器实现不提供终端节点组件。
相反,它应用于应用程序中声明的端点。MessageSelector
<filter/>
的逻辑基于提供的,如果该选择器不接受消息,则使用设置为 的标头来扩充消息。
目标(或下游流)可以查阅此标头以实现正确的幂等逻辑。
如果配置了 或 ,则不会将重复消息发送到目标 。
相反,它被丢弃了。
如果要丢弃(不处理)重复的消息,则应使用 ,例如默认 bean。IdempotentReceiverInterceptor
MessageSelector
duplicateMessage
true
MessageHandler
IdempotentReceiverInterceptor
discardChannel
throwExceptionOnRejection = true
MessageHandler.handleMessage()
discardChannel
NullChannel
nullChannel
为了维护消息之间的状态并提供比较消息以实现幂等性的功能,我们提供了 .
它接受一个实现 (根据 创建查找键) 和一个可选的 (Metadata Store)。
有关更多信息,请参阅 MetadataStoreSelector
Javadoc。
您还可以使用额外的 .
默认情况下,使用 message 报头。MetadataStoreSelector
MessageProcessor
Message
ConcurrentMetadataStore
value
ConcurrentMetadataStore
MessageProcessor
MetadataStoreSelector
timestamp
通常,如果键没有现有值,选择器会选择一条消息进行接受。
在某些情况下,比较键的当前值和新值以确定是否应接受该消息非常有用。
从版本 5.3 开始,提供了引用 ;第一个参数是旧值;return 接受消息,并将旧值替换为 .
这对于减少键的数量很有用;例如,在处理文件中的行时,可以将文件名存储在 Key 中,将当前行号存储在 Value 中。
然后,在重新启动后,您可以跳过已处理的行。
有关示例,请参阅Idempotent Downstream Processing a Split File。compareValues
BiPredicate<String, String>
true
MetadataStore
为方便起见,可以直接在组件上配置这些选项。
下面的清单显示了所有可能的属性:MetadataStoreSelector
<idempotent-receiver>
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | Bean 的 ID。
自选。IdempotentReceiverInterceptor |
2 | 应用此侦听器的使用者终结点名称或模式。
用逗号 () 分隔名称(模式),例如 .
然后,使用与这些模式匹配的端点 Bean 名称来检索目标端点的 Bean(使用其后缀),并将 应用于这些 Bean。
必填。, endpoint="aaa, bbb*, ccc, *ddd, eee*fff" MessageHandler .handler IdempotentReceiverInterceptor |
3 | 一个 Bean 引用。
与 和 互斥。
如果未提供,则 或 之一是必需的。MessageSelector metadata-store key-strategy (key-expression) selector key-strategy key-strategy-expression |
4 | 标识 不接受消息时要向其发送消息的通道。
省略时,重复的消息将转发到带有标头的处理程序。
自选。IdempotentReceiverInterceptor duplicateMessage |
5 | 一个参考。
由底层 .
与 互斥。
自选。
默认使用不在应用程序执行之间保持状态的 internal。ConcurrentMetadataStore MetadataStoreSelector selector MetadataStoreSelector SimpleMetadataStore |
6 | 一个参考。
由底层 .
评估 from the request message.
与 和 互斥。
如果未提供 a,则 one of 或 是必需的。MessageProcessor MetadataStoreSelector idempotentKey selector key-expression selector key-strategy key-strategy-expression |
7 | 用于填充 .
由底层 .
使用请求消息作为评估上下文根对象来评估 an。
与 和 互斥。
如果未提供 a,则 one of 或 是必需的。ExpressionEvaluatingMessageProcessor MetadataStoreSelector idempotentKey selector key-strategy selector key-strategy key-strategy-expression |
8 | 一个参考。
由底层 .
评估 from the request 消息。
与 和 互斥。
默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息标头作为元数据 'value'。MessageProcessor MetadataStoreSelector value idempotentKey selector value-expression |
9 | 用于填充 .
由底层 .
通过使用请求消息作为评估上下文根对象来评估 。
与 和 互斥。
默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息标头作为元数据 'value'。ExpressionEvaluatingMessageProcessor MetadataStoreSelector value idempotentKey selector value-strategy |
10 | 对 bean 的引用,允许您通过比较键的旧值和新值来选择性地选择消息; 默认情况下。BiPredicate<String, String> null |
11 | 如果 拒绝消息,是否引发异常。
默认为 。
无论是否提供 a,都会应用它。IdempotentReceiverInterceptor false discard-channel |
对于 Java 配置, Spring 集成提供了方法级 Comments。
它用于标记具有消息注释 (,对象将应用于此终端节点。
以下示例演示如何使用注释:@IdempotentReceiver
method
@ServiceActivator
@Router, and others) to specify which `IdempotentReceiverInterceptor
@IdempotentReceiver
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
当你使用 Java DSL 时,你可以将拦截器添加到端点的建议链中,如下例所示:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
这仅适用于该方法。
从版本 4.3.1 开始,它实现了 ,并将 作为基类,以便更好地分离。
有关更多信息,请参阅 Handling Message Advice。IdempotentReceiverInterceptor MessageHandler.handleMessage(Message<?>) HandleMessageAdvice AbstractHandleMessageAdvice |