幂等接收器企业集成模式
从版本 4.1 开始, Spring 集成提供了幂等接收器企业集成模式的实现。
它是一个函数模式,整个幂等逻辑应该在应用程序中实现。
但是,为了简化决策,IdempotentReceiverInterceptor
组件。
这是一个 AOPAdvice
应用于MessageHandler.handleMessage()
方法,并且可以filter
请求消息或将其标记为duplicate
,根据其配置。
以前,您可以通过使用自定义MessageSelector
在<filter/>
(请参阅 Filter)。
但是,由于此模式实际上定义了终端节点的行为,而不是终端节点本身,因此幂等接收器实现不提供终端节点组件。
相反,它应用于应用程序中声明的端点。
的逻辑IdempotentReceiverInterceptor
基于提供的MessageSelector
并且,如果该选择器不接受邮件,则会使用duplicateMessage
header 设置为true
.
目标MessageHandler
(或下游流)可以查阅此标头以实现正确的幂等逻辑。
如果IdempotentReceiverInterceptor
配置了discardChannel
或throwExceptionOnRejection = true
,则不会将重复的消息发送到目标MessageHandler.handleMessage()
.
相反,它被丢弃了。
如果要丢弃(不处理)重复的消息,则discardChannel
应配置NullChannel
,例如默认的nullChannel
豆。
为了维护消息之间的状态并提供比较消息以实现幂等性的功能,我们提供了MetadataStoreSelector
.
它接受一个MessageProcessor
实现(它根据Message
) 和可选的ConcurrentMetadataStore
(元数据存储)。
请参阅MetadataStoreSelector
Javadoc了解更多信息。
您还可以自定义value
为ConcurrentMetadataStore
通过使用额外的MessageProcessor
.
默认情况下,MetadataStoreSelector
使用timestamp
消息标头。
通常,如果键没有现有值,选择器会选择一条消息进行接受。
在某些情况下,比较键的当前值和新值以确定是否应接受该消息非常有用。
从版本 5.3 开始,compareValues
属性,该属性引用BiPredicate<String, String>
;第一个参数是旧值;返回true
接受消息并将旧值替换为MetadataStore
.
这对于减少键的数量很有用;例如,在处理文件中的行时,可以将文件名存储在 Key 中,将当前行号存储在 Value 中。
然后,在重新启动后,您可以跳过已处理的行。
有关示例,请参阅Idempotent Downstream Processing a Split File。
为方便起见,MetadataStoreSelector
选项可以直接在<idempotent-receiver>
元件。
下面的清单显示了所有可能的属性:
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | 的 IDIdempotentReceiverInterceptor 豆。
自选。 |
2 | 应用此侦听器的使用者终结点名称或模式。
用逗号 (, ),例如endpoint="aaa, bbb*, ccc, *ddd, eee*fff" .
然后,使用与这些模式匹配的端点 Bean 名称来检索目标端点的MessageHandler bean(使用其.handler suffix) 和IdempotentReceiverInterceptor 应用于这些 bean。
必填。 |
3 | 一个MessageSelector bean 引用。
互斥与metadata-store 和key-strategy (key-expression) .
什么时候selector 未提供,则为key-strategy 或key-strategy-expression 是必需的。 |
4 | 标识在使用IdempotentReceiverInterceptor 不接受它。
省略时,重复消息将转发给带有duplicateMessage 页眉。
自选。 |
5 | 一个ConcurrentMetadataStore 参考。
由底层MetadataStoreSelector .
互斥与selector .
自选。
默认的MetadataStoreSelector 使用内部SimpleMetadataStore 这不会在应用程序执行之间保持状态。 |
6 | 一个MessageProcessor 参考。
由底层MetadataStoreSelector .
计算idempotentKey 从请求消息中。
互斥与selector 和key-expression .
当selector 未提供,则为key-strategy 或key-strategy-expression 是必需的。 |
7 | 用于填充ExpressionEvaluatingMessageProcessor .
由底层MetadataStoreSelector .
计算idempotentKey 通过使用请求消息作为评估上下文根对象。
互斥与selector 和key-strategy .
当selector 未提供,则为key-strategy 或key-strategy-expression 是必需的。 |
8 | 一个MessageProcessor 参考。
由底层MetadataStoreSelector .
计算value 对于idempotentKey 从请求消息中。
互斥与selector 和value-expression .
默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息标头作为元数据 'value'。 |
9 | 用于填充ExpressionEvaluatingMessageProcessor .
由底层MetadataStoreSelector .
计算value 对于idempotentKey 通过使用请求消息作为评估上下文根对象。
互斥与selector 和value-strategy .
默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息标头作为元数据 'value'。 |
10 | 对BiPredicate<String, String> bean,它允许您通过比较键的旧值和新值来选择性地选择消息;null 默认情况下。 |
11 | 如果IdempotentReceiverInterceptor 拒绝该消息。
默认为false .
无论discard-channel 。 |
对于 Java 配置, Spring 集成提供了方法级的@IdempotentReceiver
注解。
它用于标记method
具有消息传递注释 (@ServiceActivator
,@Router, and others) to specify which `IdempotentReceiverInterceptor
对象将应用于此终端节点。
以下示例演示如何使用@IdempotentReceiver
注解:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
当你使用 Java DSL 时,你可以将拦截器添加到端点的建议链中,如下例所示:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
这IdempotentReceiverInterceptor 专为MessageHandler.handleMessage(Message<?>) 方法。
从版本 4.3.1 开始,它实现了HandleMessageAdvice ,使用AbstractHandleMessageAdvice 作为基类,以便更好地分离。
有关更多信息,请参阅 Handling Message Advice。 |