消息通道实现

Spring 集成提供了不同的消息通道实现。 以下各节将简要介绍每个选项。spring-doc.cadn.net.cn

PublishSubscribeChannel

PublishSubscribeChannelimplementation 广播任何Message发送到它的所有订阅处理程序。 这最常用于发送事件消息,其主要角色是通知(与文档消息相反,文档消息通常由单个处理程序处理)。 请注意,PublishSubscribeChannel仅用于发送。 由于它会直接向订阅者广播,因此当send(Message)方法,使用者无法轮询消息(它不会实现PollableChannel,因此没有receive()方法)。 相反,任何订阅者本身都必须是MessageHandler和订阅者的handleMessage(Message)方法。spring-doc.cadn.net.cn

在 3.0 版本之前,调用send方法在PublishSubscribeChannel没有返回订阅者false. 当与MessagingTemplate一个MessageDeliveryException被抛出。 从版本 3.0 开始,行为发生了变化,使得send如果至少存在最小订阅者(并成功处理消息),则始终被视为成功。 可以通过设置minSubscribers属性,默认为0.spring-doc.cadn.net.cn

如果您使用TaskExecutor,则仅使用正确数量的订户进行此确定,因为消息的实际处理是异步执行的。

QueueChannel

QueueChannelimplementation 包装一个队列。 与PublishSubscribeChannelQueueChannel具有点对点语义。 换句话说,即使通道有多个消费者,也只有一个消费者应该接收任何Message发送到该通道。 它提供了一个默认的无参数构造函数(提供基本上不受限制的容量Integer.MAX_VALUE)以及接受队列容量的构造函数,如下面的清单所示:spring-doc.cadn.net.cn

public QueueChannel(int capacity)

未达到其容量限制的通道将消息存储在其内部队列中,而send(Message<?>)method 会立即返回,即使没有接收方准备好处理该消息。 如果队列已达到容量上限,则发送方将阻止,直到队列中有 room 可用。 或者,如果您使用具有附加 timeout 参数的 send 方法,则队列将阻止,直到任一房间可用或超时期限已过(以先发生者为准)。 同样,receive()如果队列中有消息可用,则 call 会立即返回,但是,如果队列为空,则 receive 调用可能会阻止,直到消息可用或超时(如果提供)过去。 在任何一种情况下,都可以通过传递超时值 0 来强制立即返回,而不管队列的状态如何。 但请注意,对send()receive()没有timeoutparameter 块。spring-doc.cadn.net.cn

PriorityChannel

QueueChannel强制执行先进先出 (FIFO) 排序,则PriorityChannel是一种替代实现,它允许根据优先级在通道内对消息进行排序。 默认情况下,优先级由priority标头。 但是,对于自定义优先级确定逻辑,类型为Comparator<Message<?>>可以提供给PriorityChannel构造 函数。spring-doc.cadn.net.cn

RendezvousChannel

RendezvousChannel启用“直接切换”场景,其中发送者阻塞,直到另一方调用通道的receive()方法。 另一方会阻止,直到发送方发送消息。 在内部,这个实现与QueueChannel,不同之处在于它使用SynchronousQueue(零容量实现BlockingQueue). 这在发送方和接收方在不同的线程中作的情况下效果很好,但异步将消息放入队列中是不合适的。 换句话说,使用RendezvousChannel,发送方知道某个接收方已经接受了该消息,而使用QueueChannel,则消息将存储到内部队列中,并且可能永远不会收到。spring-doc.cadn.net.cn

请记住,默认情况下,所有这些基于队列的通道都仅将消息存储在内存中。 当需要持久性时,您可以在 'queue' 元素中提供 'message-store' 属性来引用持久性MessageStore实现,或者你可以将本地通道替换为由持久代理支持的通道,例如 JMS 支持的通道或通道适配器。 后一个选项允许您利用任何 JMS 提供程序的实现来实现消息持久性,如 JMS 支持中所述。 但是,当不需要在队列中缓冲时,最简单的方法是依赖DirectChannel,将在下一节中讨论。

RendezvousChannel对于实现请求-答复作也很有用。 发送者可以创建一个临时的匿名实例RendezvousChannel,然后在构建Message. 发送后Message,发送者可以立即调用receive(可选提供超时值)以便在等待回复时阻止Message. 这与 Spring 集成的许多请求-回复组件内部使用的实现非常相似。spring-doc.cadn.net.cn

DirectChannel

DirectChannel具有点对点语义,但在其他方面更类似于PublishSubscribeChannel比前面描述的任何基于队列的 Channel 实现都要多。 它实现了SubscribableChannel接口而不是PollableChannel接口,因此它将消息直接分派给订阅者。 但是,作为点对点通道,它与PublishSubscribeChannel因为它将每个Message到单个订阅MessageHandler.spring-doc.cadn.net.cn

除了是最简单的点对点通道选项之外,它最重要的功能之一是它使单个线程能够在通道的 “两侧” 执行作。 例如,如果处理程序订阅了DirectChannel,然后发送Message触发对该处理程序的handleMessage(Message)方法,在send()method invocation 可以返回。spring-doc.cadn.net.cn

提供具有此行为的通道实现的主要动机是支持必须跨越通道的事务,同时仍然受益于通道提供的抽象和松散耦合。 如果send()调用时,处理程序调用的结果(例如,更新数据库记录)在确定该事务的最终结果(提交或回滚)方面发挥作用。spring-doc.cadn.net.cn

由于DirectChannel是最简单的选项,并且不会增加调度和管理 Poller 线程所需的任何额外开销,它是 Spring Integration 中的默认通道类型。 一般的思路是定义应用程序的通道,考虑哪些通道需要提供缓冲或限制输入,然后将这些通道修改为基于队列的PollableChannels. 同样,如果频道需要广播消息,则它不应该是DirectChannel而是一个PublishSubscribeChannel. 稍后,我们将展示如何配置这些通道中的每一个。

DirectChannel在内部委托给消息调度程序来调用其订阅的消息处理程序,并且该调度程序可以具有由load-balancerload-balancer-refattributes (互斥)。 消息调度器使用负载平衡策略来帮助确定当多个消息处理程序订阅同一通道时,如何在消息处理程序之间分发消息。 为方便起见,load-balancerattribute 公开一个值枚举,这些值指向预先存在的LoadBalancingStrategy. 一个round-robin(在轮换的处理程序之间进行负载均衡)和none(对于想要显式禁用负载均衡的情况)是唯一可用的值。 将来的版本中可能会添加其他策略实现。 但是,从版本 3.0 开始,您可以提供自己的LoadBalancingStrategy并使用load-balancer-ref属性,该属性应指向实现LoadBalancingStrategy,如下例所示:spring-doc.cadn.net.cn

一个FixedSubscriberChannel是一个SubscribableChannel仅支持单个MessageHandler无法取消订阅的订阅者。 这对于不涉及其他订阅者且不需要通道拦截器的高吞吐量性能使用案例非常有用。spring-doc.cadn.net.cn

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

请注意,load-balancerload-balancer-ref属性是互斥的。spring-doc.cadn.net.cn

负载均衡还与布尔值failover财产。 如果failovervalue 为 true(默认值),则当前面的处理程序引发异常时,调度程序将回退到任何后续处理程序(根据需要)。 订单由处理程序本身定义的可选 order 值确定,如果不存在此类值,则由处理程序订阅的顺序确定。spring-doc.cadn.net.cn

如果某种情况要求 Dispatcher 始终尝试调用第一个处理程序,然后在每次发生错误时以相同的固定顺序回退,则不应提供负载平衡策略。 换句话说,调度程序仍然支持failoverboolean 属性,即使未启用负载均衡也是如此。 但是,如果没有负载平衡,处理程序的调用总是根据它们的顺序从第一个开始。 例如,当有 primary、secondary、tritiary 等的明确定义时,此方法效果很好。 使用命名空间支持时,order属性确定顺序。spring-doc.cadn.net.cn

请记住,负载均衡和failover仅当通道具有多个 subscribed message 处理程序时应用。 当使用命名空间支持时,这意味着多个端点共享input-channel属性。

从版本 5.2 开始,当failover为 true,则当前处理程序的失败以及失败的消息将记录在debuginfo如果分别配置。spring-doc.cadn.net.cn

ExecutorChannel

ExecutorChannel是点对点渠道,支持与DirectChannel(负载均衡策略和failoverboolean 属性)。 这两种调度通道类型之间的主要区别在于ExecutorChanneldelegates 传递给TaskExecutor执行调度。 这意味着 send 方法通常不会阻塞,但这也意味着处理程序调用可能不会发生在发送方的线程中。 因此,它不支持跨 sender 和 receiving handler 的事务。spring-doc.cadn.net.cn

发件人有时可能会阻止。 例如,当使用TaskExecutor使用限制客户端的拒绝策略(例如ThreadPoolExecutor.CallerRunsPolicy),则发送方的线程可以在线程池达到其最大容量且执行程序的工作队列已满时执行该方法。 由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行交易。

PartitionedChannel

从版本 6.1 开始,PartitionedChannelimplementation 的 implementation 的 S 这是AbstractExecutorChannel和 表示点对点调度逻辑,其中实际消耗在特定线程上处理,由从发送到此通道的消息评估的分区键确定。 此通道类似于ExecutorChannel,但不同的是,具有相同分区键的消息始终在同一线程中处理,同时保持 Sequences。 它不需要外部TaskExecutor,但可以使用自定义ThreadFactory(例如Thread.ofVirtual().name("partition-", 0).factory()). 此工厂用于将单线程 executor 填充到MessageDispatcherdelegate,每个分区。 默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID消息标头用作分区键。 此通道可以配置为简单的 bean:spring-doc.cadn.net.cn

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}

通道将具有3partitions - 专用线程;将使用partitionKey标头来确定将在哪个分区中处理消息。 看PartitionedChannelclass Javadocs 了解更多信息。spring-doc.cadn.net.cn

FluxMessageChannel

FluxMessageChannel是一个org.reactivestreams.Publisherimplementation for"sinking"将消息发送到内部reactor.core.publisher.Flux供下游反应式订阅者按需使用。 此 channel 实现既不是SubscribableChannel,也不是PollableChannel,所以只有org.reactivestreams.Subscriber实例可用于从此通道使用,以遵守反应式流的背压性质。 另一方面,FluxMessageChannel实现一个ReactiveStreamsSubscribableChannel及其subscribeTo(Publisher<Message<?>>)Contract 允许从反应式源发布者接收事件,将反应式流桥接到集成流中。 为了实现整个集成流的完全反应行为,必须在流中的所有端点之间放置这样的通道。spring-doc.cadn.net.cn

有关与 Reactive Streams 交互的更多信息,请参阅 Reactive Streams Supportspring-doc.cadn.net.cn

作用域通道

Spring 集成 1.0 提供了一个ThreadLocalChannel实现,但从 2.0 开始已被删除。 现在,处理相同需求的更通用方法是添加scope属性分配给频道。 该属性的值可以是上下文中可用的范围的名称。 例如,在 Web 环境中,某些范围可用,并且任何自定义范围实现都可以注册到上下文中。 下面的示例展示了一个应用于通道的线程本地作用域,包括作用域本身的注册:spring-doc.cadn.net.cn

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

上一个示例中定义的通道也在内部委托给队列,但该通道绑定到当前线程,因此队列的内容也类似地绑定。 这样,发送到通道的线程稍后可以接收这些相同的消息,但其他线程将无法访问它们。 虽然很少需要线程范围的通道,但它们在以下情况下可能很有用DirectChannel实例被用来强制执行单个线程作,但任何回复消息都应该发送到 “terminal” 通道。 如果该终端通道是线程范围的,则原始发送线程可以从终端通道收集其回复。spring-doc.cadn.net.cn

现在,由于任何通道都可以被限定范围,因此除了 thread-Local 之外,您还可以定义自己的范围。spring-doc.cadn.net.cn