了解消息流中的事务

Spring 集成公开了几个钩子来解决消息流的事务需求。 为了更好地理解这些钩子以及如何从中受益,我们必须首先重新审视可用于启动消息流的 6 种机制,并了解如何在每种机制中满足这些流的事务需求。spring-doc.cn

以下 6 种机制启动消息流(本手册中提供了每种机制的详细信息):spring-doc.cn

  • Gateway proxy(网关代理):基本的消息传递网关。spring-doc.cn

  • 消息通道:与方法的直接交互(例如 )。MessageChannelchannel.send(message)spring-doc.cn

  • 消息发布者:在 Spring Bean 上启动消息流作为方法调用的副产品的方法。spring-doc.cn

  • 入站通道适配器和网关:基于将第三方系统与 Spring Integration 消息传递系统(例如)连接起来来启动消息流的方法。[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channelspring-doc.cn

  • Scheduler:根据预配置的调度程序分发的事件来启动消息流的方法。spring-doc.cn

  • Poller:与调度器类似,这是根据预配置的 Poller 分发的调度或基于间隔的事件来启动消息流的方法。spring-doc.cn

我们可以将这六种机制分为两大类:spring-doc.cn

  • 由用户进程启动的消息流:此类别中的示例场景是调用网关方法或显式向 . 换句话说,这些消息流依赖于要启动的第三方进程(例如,您编写的某些代码)。MessageMessageChannelspring-doc.cn

  • 守护进程启动的消息流:此类别中的示例场景包括轮询器轮询消息队列以使用轮询的消息启动新的消息流,或者调度程序通过创建新消息并在预定义的时间启动消息流来调度进程。spring-doc.cn

显然,网关代理属于第一类,入站适配器和网关、调度程序和 Poller 属于第二类。MessageChannel.send(…​)MessagePublisherspring-doc.cn

那么,如何满足每个类别中各种场景中的事务需求,以及 Spring Integration 是否需要为特定场景的事务提供明确的东西呢? 或者,您可以使用 Spring 的事务支持吗?spring-doc.cn

Spring 本身为事务 Management 提供了一流的支持。 因此,我们的目标不是提供新的东西,而是使用 Spring 从其现有的事务支持中受益。 换句话说,作为一个框架,我们必须将钩子公开给 Spring 的事务管理功能。 但是,由于 Spring Integration 配置基于 Spring 配置,因此我们不需要总是公开这些钩子,因为 Spring 已经公开了它们。 毕竟,每个 Spring 集成组件都是一个 Spring Bean。spring-doc.cn

考虑到这个目标,我们可以再次考虑两种情况:由用户进程发起的消息流和由守护进程发起的消息流。spring-doc.cn

由用户进程启动并在 Spring 应用程序上下文中配置的消息流受此类进程的通常事务配置的约束。 因此,Spring 集成不需要显式配置它们来支持事务。 事务可以而且应该通过 Spring 的标准事务支持来启动。 Spring 集成消息流自然地遵循组件的事务语义,因为它本身是由 Spring 配置的。 例如,网关或服务激活器方法可以用 进行注释,或者可以在 XML 配置中使用指向应为事务性的特定方法的切入点表达式来定义 a。 最重要的是,在这些情况下,您可以完全控制事务配置和边界。@TransactionalTransactionInterceptorspring-doc.cn

但是,当涉及到由 daemon 进程启动的消息流时,情况略有不同。 尽管这些流程由开发人员配置,但并不直接涉及要启动的人工或其他流程。 这些是基于触发器的流,由触发器进程(守护进程)根据进程的配置启动。 例如,我们可以让调度程序在每个星期五晚上启动一个消息流。 我们还可以配置一个触发器,每秒启动一次消息流,依此类推。 因此,我们需要一种方法让这些基于触发器的流程知道我们打算使生成的消息流成为事务性的,以便在启动新的消息流时可以创建一个 Transaction 上下文。 换句话说,我们需要公开一些事务配置,但仅限于委托给 Spring 已经提供的事务支持(就像我们在其他场景中所做的那样)。spring-doc.cn

Poller 事务支持

Spring 集成为 Poller 提供事务支持。 Poller 是一种特殊类型的组件,因为在 Poller 任务中,我们可以对本身是事务性的资源进行调用,从而将调用包含在事务的边界中,从而可以在任务失败时回滚它。 如果我们要添加对 channels 的相同支持,则添加的事务将影响从 call 开始的所有下游组件。 这为事务划分提供了相当广泛的范围,而没有任何充分的理由,特别是当 Spring 已经提供了几种方法来满足任何下游组件的事务需求时。 但是,包含在事务边界中的方法是 Poller 的“强烈理由”。receive()receive()send()receive()spring-doc.cn

任何时候配置 Poller 时,都可以使用子元素及其属性提供事务配置,如下例所示:transactionalspring-doc.cn

<int:poller max-messages-per-poll="1" fixed-rate="1000">
    <transactional transaction-manager="txManager"
                   isolation="DEFAULT"
                   propagation="REQUIRED"
                   read-only="true"
                   timeout="1000"/>
</poller>

前面的配置看起来类似于本机 Spring 事务配置。 您仍然必须提供对事务管理器的引用,并指定事务属性或依赖默认值(例如,如果未指定 'transaction-manager' 属性,则默认为名为 'transactionManager' 的 Bean)。 在内部,该进程包装在 Spring 的原生事务中,其中负责处理事务。 有关如何配置事务管理器、事务管理器的类型(例如 JTA、Datasource 等)以及与事务配置相关的其他详细信息的更多信息,请参见 Spring 框架参考指南TransactionInterceptorspring-doc.cn

在前面的配置中,此 Poller 启动的所有消息流都是事务性的。 有关 Poller 的事务配置的更多信息和详细信息,请参见Polling and Transactionsspring-doc.cn

除了事务之外,在运行 Poller 时,你可能需要解决更多的横切关注点。 为了帮助解决这个问题,poller 元素接受一个子元素,它允许你定义一个自定义的通知实例链,以应用于 Poller。 (有关更多详细信息,请参阅 Pollable Message Source。 在 Spring Integration 2.0 中,Poller 经历了重构工作,现在使用代理机制来解决事务关注点以及其他横切关注点。 这项工作的重大变化之一是,我们使 和 元素互斥。 这背后的基本原理是,如果您需要多个建议,其中一个是 Transaction advice,您可以像以前一样方便地将其包含在 中,但具有更多的控制权,因为您现在可以选择按所需的顺序放置建议。 以下示例显示了如何执行此操作:<advice-chain><transactional><advice-chain><advice-chain>spring-doc.cn

<int:poller max-messages-per-poll="1" fixed-rate="10000">
  <advice-chain>
    <ref bean="txAdvice"/>
    <ref bean="someOtherAdviceBean" />
    <beans:bean class="foo.bar.SampleAdvice"/>
  </advice-chain>
</poller>

<tx:advice id="txAdvice" transaction-manager="txManager">
  <tx:attributes>
    <tx:method name="get*" read-only="true"/>
    <tx:method name="*"/>
  </tx:attributes>
</tx:advice>

前面的示例显示了 Spring Transaction advice () 的基于 XML 的基本配置,并将其包含在 Poller 定义的配置中。 如果你只需要解决 Poller 的事务性问题,你仍然可以使用该元素来方便。txAdvice<advice-chain><transactional>spring-doc.cn

事务边界

另一个重要因素是 Message 流中 Transactions 的边界。 当事务启动时,事务上下文将绑定到当前线程。 因此,无论您的 Message 流中有多少个端点和通道,只要您确保流在同一线程上继续,您的事务上下文就会被保留。 一旦你通过引入 Pollable ChannelExecutor Channel 来打破它,或者在某些服务中手动启动一个新线程,事务边界也会被打破。 从本质上讲,Transaction 将就在那里结束,如果在线程之间成功进行了切换,则该流将被视为成功,并且将发送 COMMIT 信号,即使该流将继续并且仍可能导致下游某个地方出现 Exception。 如果这样的流是同步的,那么该 Exception 可能会被抛回到 Message 流的发起者,该发起者也是事务上下文的发起者,并且该事务将导致 ROLLBACK。 中间立场是在打破线程边界的任何点使用事务通道。 例如,您可以使用委托给事务性 MessageStore 策略的 Queue-backed Channel,也可以使用 JMS 支持的通道。spring-doc.cn

事务同步

在某些环境中,它有助于将操作与包含整个流的事务同步。 例如,考虑在执行大量数据库更新的流开始时。 如果事务提交,我们可能希望将文件移动到某个目录,而如果事务回滚,我们可能希望将其移动到某个目录。<file:inbound-channel-adapter/>successfailurespring-doc.cn

Spring Integration 2.2 引入了将这些操作与事务同步的功能。 此外,如果您没有 “真实” 事务,但仍希望在成功或失败时执行不同的操作,则可以配置 a。 有关更多信息,请参阅伪交易PseudoTransactionManagerspring-doc.cn

下面的清单显示了此功能的关键 strategy 接口:spring-doc.cn

public interface TransactionSynchronizationFactory {

    TransactionSynchronization create(Object key);
}

public interface TransactionSynchronizationProcessor {

    void processBeforeCommit(IntegrationResourceHolder holder);

    void processAfterCommit(IntegrationResourceHolder holder);

    void processAfterRollback(IntegrationResourceHolder holder);

}

工厂负责创建 TransactionSynchronization 对象。 您可以实现自己的 Framework,也可以使用框架提供的 Framework:。 此实现返回一个 ,该实现委托给 : 的默认实现。 此处理器支持三个 SPEL 表达式:、 和 。DefaultTransactionSynchronizationFactoryTransactionSynchronizationTransactionSynchronizationProcessorExpressionEvaluatingTransactionSynchronizationProcessorbeforeCommitExpressionafterCommitExpressionafterRollbackExpressionspring-doc.cn

对于熟悉交易的人来说,这些操作应该是不言自明的。 在每种情况下,变量都是原始 。 在某些情况下,其他 SPEL 变量是可用的,具体取决于 Poller 轮询的内容。 例如,the 提供变量,该变量引用消息源的 . 同样,the 提供变量,该变量引用 poll 创建的 。#rootMessageMessageSourceMongoDbMessageSource#mongoTemplateMongoTemplateRedisStoreMessageSource#storeRedisStorespring-doc.cn

要为特定的 Poller 启用该功能,你可以使用属性提供对 Poller 元素的引用。TransactionSynchronizationFactory<transactional/>synchronization-factoryspring-doc.cn

从版本 5.0 开始, Spring Integration 提供了 ,当没有配置但建议链中存在 advice 类型的 advice 时,默认情况下将其应用于轮询端点。 当使用任何开箱即用的实现时,轮询端点会将轮询的消息绑定到当前的事务上下文,并在事务通知后抛出异常时将其作为 in 中提供。 当使用未 implement 的自定义事务建议时,您可以显式配置 a 以实现此行为。 在任何一种情况下,它都会成为发送到 的 的有效负载,原因是通知引发的原始异常。 以前,它的有效负载是通知引发的原始异常,并且没有提供对信息的引用,因此很难确定事务提交问题的原因。PassThroughTransactionSynchronizationFactoryTransactionSynchronizationFactoryTransactionInterceptorTransactionSynchronizationFactoryfailedMessageMessagingExceptionTransactionInterceptorPassThroughTransactionSynchronizationFactoryMessagingExceptionErrorMessageerrorChannelErrorMessagefailedMessagespring-doc.cn

为了简化这些组件的配置, Spring 集成为默认工厂提供了名称空间支持。 下面的示例展示了如何使用名称空间来配置文件入站通道适配器:spring-doc.cn

<int-file:inbound-channel-adapter id="inputDirPoller"
    channel="someChannel"
    directory="/foo/bar"
    filter="filter"
    comparator="testComparator">
    <int:poller fixed-rate="5000">
        <int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
    </int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
        channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
        channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

SPEL 评估的结果作为有效负载发送到 or(在本例中,这将是 or — 方法调用的结果)。committedChannelrolledBackChannelBoolean.TRUEBoolean.FALSEjava.io.File.renameTo()spring-doc.cn

如果你希望发送整个有效负载以进行进一步的 Spring 集成处理,请使用'payload'表达式。spring-doc.cn

请务必了解这会将操作与事务同步。 它不会使本质上不是事务性的资源实际上是事务性的。 相反,事务(无论是 JDBC 还是其他事务)在轮询之前启动,并在流完成时提交或回滚,然后是同步操作。spring-doc.cn

如果您提供 custom ,则它负责创建资源同步,以便在事务完成时自动取消绑定资源。 默认值通过返回 的子类来实现此目的,默认值返回 。TransactionSynchronizationFactoryTransactionSynchronizationFactoryResourceHolderSynchronizationshouldUnbindAtCompletion()truespring-doc.cn

除了 和 表达式之外,还支持 。 在这种情况下,如果评估(或下游处理)引发异常,则事务将回滚而不是提交。after-commitafter-rollbackbefore-commitspring-doc.cn

请务必了解这会将操作与事务同步。 它不会使本质上不是事务性的资源实际上是事务性的。 相反,事务(无论是 JDBC 还是其他事务)在轮询之前启动,并在流完成时提交或回滚,然后是同步操作。spring-doc.cn

如果您提供 custom ,则它负责创建资源同步,以便在事务完成时自动取消绑定资源。 默认值通过返回 的子类来实现此目的,默认值返回 。TransactionSynchronizationFactoryTransactionSynchronizationFactoryResourceHolderSynchronizationshouldUnbindAtCompletion()truespring-doc.cn

伪交易

阅读 Transaction Synchronization 部分后,您可能会认为在流完成时执行这些 “成功” 或 “失败” 操作会很有用,即使 Poller 下游没有 “真正的” 事务资源(例如 JDBC)。 例如,考虑一个 “<file:inbound-channel-adapter/>” 后跟一个 “<ftp:outbout-channel-adapter/>”。 这两个组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败将输入文件移动到不同的目录。spring-doc.cn

为了提供此功能,框架提供了一个 ,即使不涉及实际的事务资源,也可以启用上述配置。 如果流正常完成,则调用 和 synchronizations。 失败时,将调用同步。 因为它不是真正的事务,所以不会发生实际的提交或回滚。 伪交易是用于启用同步功能的工具。PseudoTransactionManagerbeforeCommitafterCommitafterRollbackspring-doc.cn

要使用 ,您可以将其定义为 <bean/>,就像配置真正的事务管理器一样。 以下示例显示了如何执行此操作:PseudoTransactionManagerspring-doc.cn

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />

反应式事务

从版本 5.3 开始,a 也可以与返回反应式类型的端点的通知一起使用。 这包括 和 implementations (例如 ),它们生成带有 或 payload 的消息。 当它们的回复有效负载也是某种反应类型时,所有其他生成回复的消息处理程序实现都可以依赖 a。ReactiveTransactionManagerTransactionInterceptorMessageSourceReactiveMessageHandlerReactiveMongoDbMessageSourceFluxMonoReactiveTransactionManagerspring-doc.cn