线程屏障

有时,我们需要暂停消息流线程,直到发生其他异步事件。 例如,考虑一个向 RabbitMQ 发布消息的 HTTP 请求。 在 RabbitMQ 代理发出收到消息的确认之前,我们可能希望不回复用户。spring-doc.cn

在版本 4.2 中, Spring 集成为此目的引入了该组件。 底层是 . 此类还实现 ,其中传递给方法的消息释放方法中的相应线程(如果存在)。<barrier/>MessageHandlerBarrierMessageHandlerMessageTriggerActiontrigger()handleRequestMessage()spring-doc.cn

挂起的线程和触发器线程通过对消息调用 a 进行关联。 当消息发送到 时,线程将暂停长达几毫秒,等待相应的触发器消息。 默认关联策略使用 header。 当触发器消息以相同的关联到达时,线程将被释放。 发送到 after release 的消息是使用 . 默认情况下,消息是两个负载中的一个,并且标头是使用 .CorrelationStrategyinput-channelrequestTimeoutIntegrationMessageHeaderAccessor.CORRELATION_IDoutput-channelMessageGroupProcessorCollection<?>DefaultAggregatingMessageGroupProcessorspring-doc.cn

如果首先调用该方法(或在主线程超时后调用),则该方法将暂停,直到等待挂起消息到达为止。 如果您不想暂停触发器线程,请考虑将 transfer transfer 交给 a,以便其线程被暂停。trigger()triggerTimeoutTaskExecutor
在 5.4 版本之前,请求和触发消息只有一个选项,但在某些用例中,最好为这些操作设置不同的超时。 因此,已经引入了选项。timeoutrequestTimeouttriggerTimeout

该属性确定如果挂起的线程在触发器消息到达之前超时时要采取的操作。 默认情况下,它是 ,这意味着端点返回 ,流结束,线程返回给调用方。 当 , a 被抛出时。requires-replyfalsenulltrueReplyRequiredExceptionspring-doc.cn

你可以以编程方式调用该方法(使用名称获取 bean 引用,— 其中 是屏障端点的 bean 名称)。 或者,您可以配置 an 以触发发布。trigger()barrier.handlerbarrier<outbound-channel-adapter/>spring-doc.cn

只能暂停一个具有相同关联的线程。 相同的关联可以多次使用,但只能同时使用一次。 如果第二个线程以相同的关联到达,则会引发异常。

以下示例演示如何使用自定义标头进行关联:spring-doc.cn

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

根据消息先到达的线程,发送消息的线程或发送消息的线程最多等待 10 秒,直到另一条消息到达。 当消息被释放时,通道将收到一条消息,该消息结合了调用自定义 Bean 的结果,名为 . 如果主线程超时并且触发器稍后到达,则可以配置将延迟触发器发送到的 discard 通道。 如果请求消息未及时到达,触发器消息也会被丢弃。inreleaseoutMessageGroupProcessormyOutputProcessorspring-doc.cn

有关此组件的示例,请参阅 barrier 示例应用程序spring-doc.cn