Spring Rabbit 框架支持同步和异步用例中的自动事务管理,具有许多不同的语义,可以通过声明方式选择,正如 Spring 事务的现有用户所熟悉的那样。 这使得许多(如果不是最常见的)消息传递模式易于实现。Spring中文文档

有两种方法可以向框架发出所需的事务语义信号。 在 和 中,都有一个标志,如果 ,则告诉框架使用事务通道,并以提交或回滚(取决于结果)结束所有操作(发送或接收),但有一个异常表示回滚。 另一个信号是提供一个带有 Spring 实现的外部事务,作为正在进行的操作的上下文。 如果在框架发送或接收消息时已经有一个事务正在进行中,并且标志为 ,则消息传递事务的提交或回滚将推迟到当前事务结束。 如果标志为 ,则不会将事务语义应用于消息传递操作(它是自动确认的)。RabbitTemplateSimpleMessageListenerContainerchannelTransactedtruePlatformTransactionManagerchannelTransactedtruechannelTransactedfalseSpring中文文档

该标志是配置时间设置。 在创建 AMQP 组件时(通常在应用程序启动时)声明和处理一次。 外部事务原则上更具动态性,因为系统在运行时响应当前线程状态。 但是,在实践中,当事务以声明方式分层到应用程序上时,它通常也是一种配置设置。channelTransactedSpring中文文档

对于 的同步用例,外部事务由调用方根据口味以声明方式或命令式方式提供(通常的 Spring 事务模型)。 以下示例显示了一种声明性方法(通常是首选,因为它是非侵入性的),其中模板已配置为:RabbitTemplatechannelTransacted=trueSpring中文文档

@Transactional
public void doSomething() {
    String incoming = rabbitTemplate.receiveAndConvert();
    // do some more database processing...
    String outgoing = processInDatabaseAndExtractReply(incoming);
    rabbitTemplate.convertAndSend(outgoing);
}

在前面的示例中,有效负载在标记为 的方法中作为消息正文接收、转换和发送。 如果数据库处理失败并出现异常,那么传入消息将返回给代理,并且不会发送传出消息。 这适用于具有事务方法链内部的任何操作(例如,除非直接操纵以提前提交事务)。String@TransactionalRabbitTemplateChannelSpring中文文档

对于具有 的异步用例,如果需要外部事务,则容器在设置侦听器时必须请求它。 为了指示需要外部事务,用户在配置容器时向容器提供实现。 以下示例演示如何执行此操作:SimpleMessageListenerContainerPlatformTransactionManagerSpring中文文档

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

}

在前面的示例中,事务管理器被添加为从另一个 Bean 定义(未显示)注入的依赖项,并且该标志也设置为 。 其效果是,如果侦听器因异常而失败,则事务将回滚,并且消息也将返回给代理。 值得注意的是,如果事务无法提交(例如,由于 数据库约束错误或连接问题),AMQP 事务也会回滚,并将消息返回给代理。 这有时被称为“尽最大努力 1 阶段提交”,是一种非常强大的可靠消息传递模式。 如果在前面的示例中将标志设置为(默认值),则仍将为侦听器提供外部事务,但所有消息传递操作都将自动确认,因此效果是即使在业务操作回滚时也要提交消息传递操作。channelTransactedtruechannelTransactedfalseSpring中文文档

有条件回滚

在版本 1.6.6 之前,在使用外部事务管理器(如 JDBC)时向容器添加回滚规则不起作用。 异常总是回滚事务。transactionAttributeSpring中文文档

此外,在容器的建议链中使用事务建议时,条件回滚不是很有用,因为所有侦听器异常都包装在 .ListenerExecutionFailedExceptionSpring中文文档

第一个问题已得到纠正,规则现已正确应用。 此外,现在提供了。 它是 的子类,唯一的区别是它知道 并将此类异常的原因用于规则。 此交易属性可以直接在容器中使用,也可以通过交易通知使用。ListenerFailedRuleBasedTransactionAttributeRuleBasedTransactionAttributeListenerExecutionFailedExceptionSpring中文文档

下面的示例使用此规则:Spring中文文档

@Bean
public AbstractMessageListenerContainer container() {
    ...
    container.setTransactionManager(transactionManager);
    RuleBasedTransactionAttribute transactionAttribute =
        new ListenerFailedRuleBasedTransactionAttribute();
    transactionAttribute.setRollbackRules(Collections.singletonList(
        new NoRollbackRuleAttribute(DontRollBackException.class)));
    container.setTransactionAttribute(transactionAttribute);
    ...
}

关于回滚已接收消息的说明

AMQP 事务仅适用于发送到代理的消息和确认器。 因此,当 Spring 事务发生回滚并收到消息时,Spring AMQP 不仅要回滚事务,还要手动拒绝消息(有点讨厌,但这不是规范所说的)。 对邮件拒绝执行的操作与事务无关,并且取决于属性(默认值:)。 有关拒绝失败消息的详细信息,请参阅消息侦听器和异步案例defaultRequeueRejectedtrueSpring中文文档

有关 RabbitMQ 事务及其限制的更多信息,请参阅 RabbitMQ 代理语义Spring中文文档

在 RabbitMQ 2.7.0 之前,此类消息(以及通道关闭或中止时未确认的任何消息)都位于 Rabbit 代理的队列后面。 从 2.7.0 开始,被拒绝的消息会转到队列的前面,其方式类似于 JMS 回滚消息。
以前,在事务回滚时,本地事务与提供 a 时的消息重新排队不一致。 在前一种情况下,应用正常的重新排队逻辑( 或 )(请参阅消息侦听器和异步情况)。 使用事务管理器时,消息在回滚时无条件地重新排队。 从版本 2.0 开始,行为是一致的,并且在这两种情况下都应用正常的重新排队逻辑。 若要恢复到以前的行为,可以将容器的属性设置为 。 请参阅消息侦听器容器配置TransactionManagerAmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=falsealwaysRequeueWithTxManagerRollbacktrue
在 RabbitMQ 2.7.0 之前,此类消息(以及通道关闭或中止时未确认的任何消息)都位于 Rabbit 代理的队列后面。 从 2.7.0 开始,被拒绝的消息会转到队列的前面,其方式类似于 JMS 回滚消息。
以前,在事务回滚时,本地事务与提供 a 时的消息重新排队不一致。 在前一种情况下,应用正常的重新排队逻辑( 或 )(请参阅消息侦听器和异步情况)。 使用事务管理器时,消息在回滚时无条件地重新排队。 从版本 2.0 开始,行为是一致的,并且在这两种情况下都应用正常的重新排队逻辑。 若要恢复到以前的行为,可以将容器的属性设置为 。 请参阅消息侦听器容器配置TransactionManagerAmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=falsealwaysRequeueWithTxManagerRollbacktrue

RabbitTransactionManager

RabbitTransactionManager 是在外部事务中执行 Rabbit 操作并与外部事务同步的替代方法。 此事务管理器是 PlatformTransactionManager 接口的实现,应与单个 Rabbit 一起使用。ConnectionFactorySpring中文文档

此策略无法提供 XA 事务 — 例如,为了在消息传递和数据库访问之间共享事务。

需要应用程序代码才能通过后续通道创建的标准调用来检索事务性 Rabbit 资源。 当使用 Spring AMQP 的 RabbitTemplate 时,它会自动检测一个线程绑定的通道并自动参与其事务。ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)Connection.createChannel()Spring中文文档

使用 Java 配置,可以使用以下 Bean 设置新的 RabbitTransactionManager:Spring中文文档

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

如果首选 XML 配置,则可以在 XML 应用程序上下文文件中声明以下 Bean:Spring中文文档

<bean id="rabbitTxManager"
      class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>
此策略无法提供 XA 事务 — 例如,为了在消息传递和数据库访问之间共享事务。

事务同步

将 RabbitMQ 事务与其他事务(例如 DBMS)同步可提供“尽力而为一阶段提交”语义。 在事务同步的完成后阶段,RabbitMQ 事务可能无法提交。 基础结构将此记录为错误,但不会对调用代码引发异常。 从版本 2.3.10 开始,您可以在事务提交到处理事务的同一线程上后调用。 如果没有发生异常,它只会返回;否则,它将抛出一个属性,该属性表示完成的同步状态。spring-txConnectionUtils.checkAfterCompletion()AfterCompletionFailedExceptionSpring中文文档

通过调用 ;这是一个全局标志,适用于所有线程。ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)Spring中文文档