消息通道实现
Spring 集成提供了不同的消息通道实现。 以下各节将简要介绍每个选项。
PublishSubscribeChannel
该 implementation 将发送给它的任何 BROADCAST 到其所有订阅的处理程序。
这最常用于发送事件消息,其主要角色是通知(与文档消息相反,文档消息通常由单个处理程序处理)。
请注意,这仅用于发送。
由于它在调用其方法时直接向订阅者广播,因此使用者无法轮询消息(它没有实现,因此没有方法)。
相反,任何订阅者本身都必须是 ,并且依次调用订阅者的方法。PublishSubscribeChannel
Message
PublishSubscribeChannel
send(Message)
PollableChannel
receive()
MessageHandler
handleMessage(Message)
在版本 3.0 之前,在没有订阅者的 上调用该方法会返回 。
当与 a 一起使用时,会引发 a。
从版本 3.0 开始,行为已更改,因此,如果至少存在最小订阅者(并成功处理消息),则始终认为 a 成功。
可以通过设置属性来修改此行为,该属性默认为 .send
PublishSubscribeChannel
false
MessagingTemplate
MessageDeliveryException
send
minSubscribers
0
如果使用 ,则仅使用正确数量的订阅者进行此确定,因为消息的实际处理是异步执行的。TaskExecutor |
QueueChannel
该实现包装了一个队列。
与 不同,具有点对点语义。
换句话说,即使通道有多个消费者,也只有一个消费者应该接收发送到该通道的任何消费者。
它提供了一个默认的无参数构造函数(提供基本上无限的容量)以及一个接受队列容量的构造函数,如下面的清单所示:QueueChannel
PublishSubscribeChannel
QueueChannel
Message
Integer.MAX_VALUE
public QueueChannel(int capacity)
未达到其容量限制的通道将消息存储在其内部队列中,并且该方法会立即返回,即使没有接收方准备好处理该消息也是如此。
如果队列已达到容量上限,则发送方将阻止,直到队列中有 room 可用。
或者,如果您使用具有附加 timeout 参数的 send 方法,则队列将阻止,直到任一房间可用或超时期限已过(以先发生者为准)。
同样,如果队列中有消息可用,则调用会立即返回,但是,如果队列为空,则 receive 调用可能会阻止,直到消息可用或超时(如果提供)过去。
在任何一种情况下,都可以通过传递超时值 0 来强制立即返回,而不管队列的状态如何。
但是请注意,对 和 的 versions 的调用会无限期地阻止。send(Message<?>)
receive()
send()
receive()
timeout
PriorityChannel
虽然 强制执行先进先出 (FIFO) 排序,但 是一种替代实现,允许根据优先级在通道内对消息进行排序。
默认情况下,优先级由每条消息中的标头决定。
但是,对于自定义优先级确定逻辑,可以向构造函数提供 type 的 comparator。QueueChannel
PriorityChannel
priority
Comparator<Message<?>>
PriorityChannel
RendezvousChannel
这将启用“直接切换”场景,其中发送方会阻塞,直到另一方调用通道的方法。
另一方会阻止,直到发送方发送消息。
在内部,此实现与 非常相似,不同之处在于它使用 (零容量实现)。
这在发送方和接收方在不同的线程中操作的情况下效果很好,但异步将消息放入队列中是不合适的。
换句话说,使用 ,发送方知道某个接收方已经接受了该消息,而使用 ,该消息将存储到内部队列中,并且可能永远不会收到。RendezvousChannel
receive()
QueueChannel
SynchronousQueue
BlockingQueue
RendezvousChannel
QueueChannel
请记住,默认情况下,所有这些基于队列的通道都仅将消息存储在内存中。
当需要持久性时,可以在 'queue' 元素中提供 'message-store' 属性来引用持久性实现,也可以将本地通道替换为由持久性代理(如 JMS 支持的通道或通道适配器)支持的通道。
后一个选项允许您利用任何 JMS 提供程序的实现来实现消息持久性,如 JMS 支持中所述。
但是,当不需要在队列中缓冲时,最简单的方法是依赖 ,将在下一节中讨论。MessageStore DirectChannel |
这对于实现 request-reply 操作也很有用。
发送者可以创建一个临时的匿名实例,然后在构建 .
发送后,发送者可以立即调用(可选地提供超时值)以便在等待回复时阻止。
这与 Spring 集成的许多请求-回复组件内部使用的实现非常相似。RendezvousChannel
RendezvousChannel
Message
Message
receive
Message
DirectChannel
具有点对点语义,但其他方面比前面描述的任何基于队列的通道实现更类似于 。
它实现接口而不是接口,因此它将消息直接分派给订阅者。
但是,作为点对点通道,它与 的不同之处在于,它将每个通道发送到单个订阅的 。DirectChannel
PublishSubscribeChannel
SubscribableChannel
PollableChannel
PublishSubscribeChannel
Message
MessageHandler
除了是最简单的点对点通道选项之外,它最重要的功能之一是它使单个线程能够在通道的 “两侧” 执行操作。
例如,如果处理程序订阅 ,则向该通道发送 将在方法调用返回之前直接在发送方的线程中触发该处理程序的方法的调用。DirectChannel
Message
handleMessage(Message)
send()
提供具有此行为的通道实现的主要动机是支持必须跨越通道的事务,同时仍然受益于通道提供的抽象和松散耦合。
如果在事务范围内调用调用,则处理程序调用的结果(例如,更新数据库记录)在确定该事务的最终结果(提交或回滚)方面发挥作用。send()
由于这是最简单的选项,并且不会增加调度和管理 Poller 线程所需的任何额外开销,因此它是 Spring Integration 中的默认通道类型。
一般的思路是定义应用程序的通道,考虑哪些通道需要提供缓冲或限制输入,并将这些通道修改为 queue-based 。
同样,如果通道需要广播消息,则它不应是 a,而应是 .
稍后,我们将展示如何配置这些通道中的每一个。DirectChannel PollableChannels DirectChannel PublishSubscribeChannel |
内部委托给消息调度程序以调用其订阅的消息处理程序,并且该调度程序可以具有由 or 属性公开的负载平衡策略(互斥)。
消息调度器使用负载平衡策略来帮助确定当多个消息处理程序订阅同一通道时,如何在消息处理程序之间分发消息。
为方便起见,该属性公开了一个值枚举,这些值指向预先存在的 .
A (轮换处理程序之间的负载均衡) 和 (对于想要显式禁用负载均衡的情况) 是唯一可用的值。
将来的版本中可能会添加其他策略实现。
但是,从版本 3.0 开始,您可以提供自己的实现,并使用该属性注入它,该属性应指向实现的 bean,如下例所示:DirectChannel
load-balancer
load-balancer-ref
load-balancer
LoadBalancingStrategy
round-robin
none
LoadBalancingStrategy
load-balancer-ref
LoadBalancingStrategy
A 是仅支持无法取消订阅的单个订阅者的 a。
这对于不涉及其他订阅者且不需要通道拦截器的高吞吐量性能使用案例非常有用。FixedSubscriberChannel
SubscribableChannel
MessageHandler
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
请注意,和 属性是互斥的。load-balancer
load-balancer-ref
负载均衡还与 boolean 属性结合使用。
如果值为 true(默认值),则当前面的处理程序引发异常时,调度程序将回退到任何后续处理程序(根据需要)。
订单由处理程序本身定义的可选 order 值确定,如果不存在此类值,则由处理程序订阅的顺序确定。failover
failover
如果某种情况要求 Dispatcher 始终尝试调用第一个处理程序,然后在每次发生错误时以相同的固定顺序回退,则不应提供负载平衡策略。
换句话说,即使未启用负载均衡,调度程序仍支持 boolean 属性。
但是,如果没有负载平衡,处理程序的调用总是根据它们的顺序从第一个开始。
例如,当有 primary、secondary、tritiary 等的明确定义时,此方法效果很好。
使用命名空间支持时,任何端点上的属性都会确定顺序。failover
order
请记住,负载平衡仅在通道具有多个订阅消息处理程序时应用。
当使用 namespace 支持时,这意味着多个 endpoint 共享 attribute 中定义的相同通道引用。failover input-channel |
从版本 5.2 开始,当为 true 时,当前处理程序的失败以及失败的消息将分别记录在 or if configured 下。failover
debug
info
ExecutorChannel
这是一个点对点通道,支持与(负载平衡策略和 boolean 属性)相同的调度程序配置。
这两种调度通道类型之间的主要区别在于,委托给 的实例来执行调度。
这意味着 send 方法通常不会阻塞,但这也意味着处理程序调用可能不会发生在发送方的线程中。
因此,它不支持跨 sender 和 receiving handler 的事务。ExecutorChannel
DirectChannel
failover
ExecutorChannel
TaskExecutor
发件人有时可能会阻止。
例如,当将 a 与限制客户端的拒绝策略(如 )一起使用时,发送方的线程可以在线程池达到其最大容量且执行程序的工作队列已满时执行该方法。
由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行交易。TaskExecutor ThreadPoolExecutor.CallerRunsPolicy |
PartitionedChannel
从版本 6.1 开始,提供了一个实现。
这是点对点调度逻辑的扩展,并表示点对点调度逻辑,其中实际消耗在特定线程上处理,由从发送到此通道的消息评估的分区键确定。
此通道类似于上面提到的通道,但不同之处在于具有相同分区键的消息始终在同一线程中处理,同时保持 Sequences。
它不需要 external ,但可以使用自定义(例如 )进行配置 。
此工厂用于将单线程执行程序填充到每个分区的委托中。
默认情况下,消息标头用作分区键。
此通道可以配置为简单的 bean:PartitionedChannel
AbstractExecutorChannel
ExecutorChannel
TaskExecutor
ThreadFactory
Thread.ofVirtual().name("partition-", 0).factory()
MessageDispatcher
IntegrationMessageHeaderAccessor.CORRELATION_ID
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
通道将具有分区 - 专用线程;将使用标头来确定将在哪个分区中处理消息。
有关更多信息,请参见 class Javadocs。3
partitionKey
PartitionedChannel
FluxMessageChannel
这是一种将消息发送到内部供下游反应式订阅者按需使用的实现。
此 channel implementation 既不是 , 也不是 a ,因此只能使用实例从此 channel 中消费,以遵守反应流的背压性质。
另一方面,它通过其 Contract 实现 a,允许从反应式源发布者接收事件,将反应式流桥接到集成流中。
为了实现整个集成流的完全反应行为,必须在流中的所有端点之间放置这样的通道。FluxMessageChannel
org.reactivestreams.Publisher
"sinking"
reactor.core.publisher.Flux
SubscribableChannel
PollableChannel
org.reactivestreams.Subscriber
FluxMessageChannel
ReactiveStreamsSubscribableChannel
subscribeTo(Publisher<Message<?>>)
有关与 Reactive Streams 交互的更多信息,请参阅 Reactive Streams Support 。
作用域通道
Spring Integration 1.0 提供了一个实现,但从 2.0 开始已被删除。
现在,处理相同需求的更通用方法是向 channel 添加一个 attribute。
该属性的值可以是上下文中可用的范围的名称。
例如,在 Web 环境中,某些范围可用,并且任何自定义范围实现都可以注册到上下文中。
下面的示例展示了一个应用于通道的线程本地作用域,包括作用域本身的注册:ThreadLocalChannel
scope
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
上一个示例中定义的通道也在内部委托给队列,但该通道绑定到当前线程,因此队列的内容也类似地绑定。
这样,发送到通道的线程稍后可以接收这些相同的消息,但其他线程将无法访问它们。
虽然很少需要线程范围的通道,但在实例用于强制执行单个操作线程但任何回复消息都应发送到“终端”通道的情况下,它们可能很有用。
如果该终端通道是线程范围的,则原始发送线程可以从终端通道收集其回复。DirectChannel
现在,由于任何通道都可以被限定范围,因此除了 thread-Local 之外,您还可以定义自己的范围。