从版本 4.1 开始,Spring Integration 提供了幂等接收器企业集成模式的实现。 它是一种功能模式,整个幂等逻辑应该在应用程序中实现。 但是,为了简化决策,提供了该组件。 这是应用于方法的 AOP,可以根据其配置请求消息或将其标记为 。IdempotentReceiverInterceptorAdviceMessageHandler.handleMessage()filterduplicateSpring中文文档

例如,以前,您可以通过在 中使用自定义来实现此模式(请参阅筛选器)。 但是,由于此模式实际上定义了端点的行为,而不是端点本身,因此幂等接收器实现不提供端点组件。 相反,它应用于应用程序中声明的端点。MessageSelector<filter/>Spring中文文档

的逻辑基于提供的,如果该选择器不接受消息,则使用设置为 的标头来扩充它。 目标(或下游流)可以参考此标头来实现正确的幂等逻辑。 如果配置了 或 ,则不会将重复的消息发送到目标。 相反,它被丢弃了。 如果要丢弃(不执行任何操作)重复的消息,则应使用 配置 ,例如默认 bean。IdempotentReceiverInterceptorMessageSelectorduplicateMessagetrueMessageHandlerIdempotentReceiverInterceptordiscardChannelthrowExceptionOnRejection = trueMessageHandler.handleMessage()discardChannelNullChannelnullChannelSpring中文文档

为了维护消息之间的状态并提供比较幂等性消息的能力,我们提供了 . 它接受一个实现(它基于 创建查找键)和一个可选的(元数据存储)。 有关更多信息,请参见 MetadataStoreSelector Javadoc。 您还可以使用附加的 . 默认情况下,使用消息标头。MetadataStoreSelectorMessageProcessorMessageConcurrentMetadataStorevalueConcurrentMetadataStoreMessageProcessorMetadataStoreSelectortimestampSpring中文文档

通常,如果密钥没有现成值,则选择器会选择要接受的消息。 在某些情况下,比较键的当前值和新值以确定是否应接受消息很有用。 从版本 5.3 开始,提供了引用 ;第一个参数是旧值;返回以接受消息,并将旧值替换为 . 这对于减少密钥数量很有用;例如,在处理文件中的行时,可以将文件名存储在键中,将当前行号存储在值中。 然后,重新启动后,您可以跳过已处理的行。 有关示例,请参阅幂等下游处理拆分文件compareValuesBiPredicate<String, String>trueMetadataStoreSpring中文文档

为方便起见,这些选项可直接在组件上进行配置。 以下列表显示了所有可能的属性:MetadataStoreSelector<idempotent-receiver>Spring中文文档

<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)
1 Bean 的 ID。 自选。IdempotentReceiverInterceptor
2 应用此侦听器的使用方终结点名称或模式。 用逗号 () 分隔名称(模式),例如 . 然后,使用匹配这些模式的终端节点 Bean 名称来检索目标终端节点的 Bean(使用其后缀),并将 应用于这些 bean。 必填。,endpoint="aaa, bbb*, ccc, *ddd, eee*fff"MessageHandler.handlerIdempotentReceiverInterceptor
3 Bean 引用。 与 和 互斥。 如果未提供,则为“或”之一为必需项。MessageSelectormetadata-storekey-strategy (key-expression)selectorkey-strategykey-strategy-expression
4 标识不接受消息时要向其发送消息的通道。 如果省略,则重复的消息将转发到带有标头的处理程序。 自选。IdempotentReceiverInterceptorduplicateMessage
5 参考。 由底层 . 与 互斥。 自选。 默认使用不跨应用程序执行维护状态的内部。ConcurrentMetadataStoreMetadataStoreSelectorselectorMetadataStoreSelectorSimpleMetadataStore
6 参考。 由底层 . 从请求消息中计算。 与 和 互斥。 如果未提供 a,则需要 or 之一。MessageProcessorMetadataStoreSelectoridempotentKeyselectorkey-expressionselectorkey-strategykey-strategy-expression
7 用于填充 . 由底层 . 使用请求消息作为评估上下文根对象来评估。 与 和 互斥。 如果未提供 a,则需要 or 之一。ExpressionEvaluatingMessageProcessorMetadataStoreSelectoridempotentKeyselectorkey-strategyselectorkey-strategykey-strategy-expression
8 参考。 由底层 . 计算 from 请求消息的 a。 与 和 互斥。 默认情况下,“MetadataStoreSelector”使用“timestamp”消息标头作为元数据“值”。MessageProcessorMetadataStoreSelectorvalueidempotentKeyselectorvalue-expression
9 用于填充 . 由底层 . 通过使用请求消息作为评估上下文根对象来评估 a。 与 和 互斥。 默认情况下,“MetadataStoreSelector”使用“timestamp”消息标头作为元数据“值”。ExpressionEvaluatingMessageProcessorMetadataStoreSelectorvalueidempotentKeyselectorvalue-strategy
10 对 Bean 的引用,允许您通过比较键的旧值和新值来选择消息; 默认情况下。BiPredicate<String, String>null
11 是否在拒绝消息时引发异常。 默认值为 。 无论是否提供 a,它都会被应用。IdempotentReceiverInterceptorfalsediscard-channel
1 Bean 的 ID。 自选。IdempotentReceiverInterceptor
2 应用此侦听器的使用方终结点名称或模式。 用逗号 () 分隔名称(模式),例如 . 然后,使用匹配这些模式的终端节点 Bean 名称来检索目标终端节点的 Bean(使用其后缀),并将 应用于这些 bean。 必填。,endpoint="aaa, bbb*, ccc, *ddd, eee*fff"MessageHandler.handlerIdempotentReceiverInterceptor
3 Bean 引用。 与 和 互斥。 如果未提供,则为“或”之一为必需项。MessageSelectormetadata-storekey-strategy (key-expression)selectorkey-strategykey-strategy-expression
4 标识不接受消息时要向其发送消息的通道。 如果省略,则重复的消息将转发到带有标头的处理程序。 自选。IdempotentReceiverInterceptorduplicateMessage
5 参考。 由底层 . 与 互斥。 自选。 默认使用不跨应用程序执行维护状态的内部。ConcurrentMetadataStoreMetadataStoreSelectorselectorMetadataStoreSelectorSimpleMetadataStore
6 参考。 由底层 . 从请求消息中计算。 与 和 互斥。 如果未提供 a,则需要 or 之一。MessageProcessorMetadataStoreSelectoridempotentKeyselectorkey-expressionselectorkey-strategykey-strategy-expression
7 用于填充 . 由底层 . 使用请求消息作为评估上下文根对象来评估。 与 和 互斥。 如果未提供 a,则需要 or 之一。ExpressionEvaluatingMessageProcessorMetadataStoreSelectoridempotentKeyselectorkey-strategyselectorkey-strategykey-strategy-expression
8 参考。 由底层 . 计算 from 请求消息的 a。 与 和 互斥。 默认情况下,“MetadataStoreSelector”使用“timestamp”消息标头作为元数据“值”。MessageProcessorMetadataStoreSelectorvalueidempotentKeyselectorvalue-expression
9 用于填充 . 由底层 . 通过使用请求消息作为评估上下文根对象来评估 a。 与 和 互斥。 默认情况下,“MetadataStoreSelector”使用“timestamp”消息标头作为元数据“值”。ExpressionEvaluatingMessageProcessorMetadataStoreSelectorvalueidempotentKeyselectorvalue-strategy
10 对 Bean 的引用,允许您通过比较键的旧值和新值来选择消息; 默认情况下。BiPredicate<String, String>null
11 是否在拒绝消息时引发异常。 默认值为 。 无论是否提供 a,它都会被应用。IdempotentReceiverInterceptorfalsediscard-channel

对于 Java 配置,Spring Integration 提供了方法级注解。 它用于标记具有消息传递注释 (,对象应用于此端点。 以下示例演示如何使用批注:@IdempotentReceivermethod@ServiceActivator@Router, and others) to specify which `IdempotentReceiverInterceptor@IdempotentReceiverSpring中文文档

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

使用 Java DSL 时,可以将侦听器添加到端点的建议链中,如以下示例所示:Spring中文文档

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
仅针对该方法而设计。 从版本 4.3.1 开始,它实现了 ,作为基类,以便更好地解离。 有关详细信息,请参阅处理消息建议IdempotentReceiverInterceptorMessageHandler.handleMessage(Message<?>)HandleMessageAdviceAbstractHandleMessageAdvice
仅针对该方法而设计。 从版本 4.3.1 开始,它实现了 ,作为基类,以便更好地解离。 有关详细信息,请参阅处理消息建议IdempotentReceiverInterceptorMessageHandler.handleMessage(Message<?>)HandleMessageAdviceAbstractHandleMessageAdvice