核心消息
消息通道
消息通道
虽然Message在封装数据方面发挥着关键作用,但真正将消息生产者与消息消费者解耦的是MessageChannel。
The MessageChannel 接口
Spring Integration 的顶级 MessageChannel 接口定义如下:
public interface MessageChannel {
boolean send(Message message);
boolean send(Message message, long timeout);
}
当发送消息时,如果消息发送成功,则返回值为true。
如果发送调用超时或被中断,则返回值为false。
PollableChannel
由于消息通道可能缓冲也可能不缓冲消息(如Spring Integration 概述中所讨论),因此有两个子接口分别定义了可轮询的缓冲通道行为和不可缓冲的可订阅通道行为。
以下代码片段展示了PollableChannel接口的定义:
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
如同发送消息的方法一样,在接收消息时,如果发生超时或中断,返回值为null。
SubscribableChannel
The SubscribableChannel 基础接口由直接将消息发送到其订阅的 MessageHandler 实例的通道实现。 因此,它们不提供用于轮询的接收方法。 相反,它们定义了用于管理这些订阅者的方法。 以下清单显示了 SubscribableChannel 接口的定义:
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
消息通道实现
Spring Integration 提供了不同的消息通道实现。 以下部分简要描述每一种。
PublishSubscribeChannel
The PublishSubscribeChannel 实现会将任何发送到它的 Message 广播到其所有订阅处理程序。
这通常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常旨在由单个处理程序处理)。
请注意,PublishSubscribeChannel 仅用于发送。
由于当其 send(Message) 方法被调用时,它会直接将其消息广播给其订阅者,消费者无法轮询消息(它不实现 PollableChannel,因此没有 receive() 方法)。
相反,任何订阅者本身必须是 MessageHandler,并且会调用订阅者的 handleMessage(Message) 方法。
在3.0版本之前,对没有订阅者的PublishSubscribeChannel调用send方法返回false。
当与MessagingTemplate一起使用时,会抛出一个MessageDeliveryException异常。
从3.0版本开始,行为已更改,只要至少存在最小数量的订阅者(并且成功处理消息),调用send方法总是被视为成功的。
可以通过设置minSubscribers属性来修改此行为,默认值为0。
如果使用了TaskExecutor,则仅根据正确的订阅者数量来确定这一点,因为消息的实际处理是异步进行的。 |
QueueChannel
The QueueChannel 实现包装了一个队列。
与 PublishSubscribeChannel 不同,QueueChannel 具有点对点语义。
换句话说,即使通道有多名消费者,也只能有一个消费者接收到发送到该通道的任何 Message 。
它提供了一个无参数构造函数(提供了一种本质上未定义容量的 Integer.MAX_VALUE ),以及一个接受队列容量作为参数的构造函数,如下所示:
public QueueChannel(int capacity)
一个尚未达到容量限制的通道会将其消息存储在其内部队列中,并且 send(Message<?>) 方法会立即返回,即使没有接收者准备好处理该消息。
如果队列已达到容量限制,发送方将阻塞,直到队列中有可用空间。
或者,如果您使用带有额外超时参数的 send 方法,则队列将阻塞,直到有可用空间或超时时间过去,以先发生者为准。
同样地,如果队列上有可用消息,receive() 调用会立即返回;但如果队列为空,则 receive 调用可能会阻塞,直到有可用消息或提供的超时时间过去(如果提供了的话)。
在任何一种情况下,都可以通过传递值为 0 的超时参数来强制立即返回,无论队列状态如何。
然而需要注意的是,对不带 timeout 参数的 send() 和 receive() 版本的调用将无限期阻塞。
PriorityChannel
whereas the QueueChannel 强制遵循先进先出(FIFO)顺序,PriorityChannel 是另一种实现方式,允许根据优先级在通道内对消息进行排序。
默认情况下,优先级由每个消息内的 priority 头部确定。然而,为了自定义优先级确定逻辑,可以向 PriorityChannel 构造函数提供类型为 Comparator<Message<?>> 的比较器。
RendezvousChannel
The RendezvousChannel 启用了“直接传递”场景,其中发送者会阻塞直到另一方调用通道的 receive() 方法。
另一方则会在发送者发送消息之前阻塞。
内部实现上,这种做法与 QueueChannel 非常相似,只是它使用了一个 SynchronousQueue(即零容量的 BlockingQueue 实现)。
在发送者和接收者运行在不同线程中但不适合异步地将消息存入队列的情况下,这种方法表现良好。
换句话说,在使用 RendezvousChannel 的情况下,发送者知道某个接收方已经接受了该消息,而使用 QueueChannel 时,则可能只是将消息存储到了内部队列并且永远无法被接收。
请记住,所有这些基于队列的通道默认都仅在内存中存储消息。
如果需要持久化,您可以在'queue'元素内提供一个'message-store'属性来引用一个持久的MessageStore实现,或者用一个由持久消息代理支持的通道替换本地通道,例如基于JMS的通道或通道适配器。
后一种选项允许您利用任何JMS提供者的消息持久化实现,如JMS支持中所讨论的那样。
然而,当不需要在队列中进行缓冲时,最简单的方法是依赖下一节中讨论的DirectChannel。 |
The RendezvousChannel 也可以用于实现请求-响应操作。
发送方可以创建一个临时且匿名的 RendezvousChannel 实例,然后在构建一个 Message 时将其设置为 'replyChannel' 头部。
发送该 Message 后,发送方可以立即调用 receive(可选地提供超时值)来阻塞并等待响应 Message。
这与 Spring Integration 的许多请求-响应组件内部实现非常相似。
DirectChannel
该 DirectChannel 具有点对点的语义,但在其他方面更类似于 PublishSubscribeChannel 而不是之前描述的任何基于队列的通道实现。
它实现了 SubscribableChannel 接口而不是 PollableChannel 接口,因此直接将消息分发给订阅者。
然而,作为一个点对点通道,它与 PublishSubscribeChannel 不同之处在于,它会将每个 Message 发送到一个单一的已订阅的 MessageHandler。
除了是最简单的点对点通道选项之外,另一个最重要的特性是它可以使得单一线程在同一时间执行“通道两端”的操作。
例如,如果一个处理器订阅了 DirectChannel,那么向该通道发送 Message 会直接触发处理器的 handleMessage(Message) 方法,在发送者线程的方法返回之前。
提供具有此行为的通道实现的主要动机是支持必须跨越通道的事务,同时仍能受益于通道提供的抽象和松耦合。
如果 send() 调用在事务范围内被调用,则处理程序的调用结果(例如更新数据库记录)将在决定该事务的最终结果(提交或回滚)中发挥作用。
由于 DirectChannel 是最简单的选项,且不会为轮询器的线程调度和管理带来任何额外的开销,因此它是 Spring Integration 中的默认通道类型。
一般思路是定义应用程序的通道,考虑哪些通道需要提供缓冲或限制输入,然后将这些通道修改为基于队列的 PollableChannels。
同样地,如果一个通道需要广播消息,它就不应该是 DirectChannel,而应该是 PublishSubscribeChannel。
稍后,我们将展示如何配置每种类型的通道。 |
The DirectChannel 内部委托给一个消息调度器来调用其订阅的消息处理器,而该调度器可以通过 load-balancer 或 load-balancer-ref 个属性(互斥)来暴露负载均衡策略。负载均衡策略用于帮助消息分发器在多个消息处理器订阅同一个通道时,决定如何分配消息。作为方便,load-balancer 属性暴露了一个枚举值的集合,指向已存在的 LoadBalancingStrategy 实现。A round-robin(负载均衡在轮询中跨多个处理器分配)和 none(对于希望明确禁用负载均衡的情况)是唯一可用的值。其他策略实现可能在未来的版本中添加。然而,自从版本 3.0,您可以提供自己的 LoadBalancingStrategy 实现,并使用 load-balancer-ref 属性将其注入,该属性应指向一个实现了 LoadBalancingStrategy 的 Bean,如下示例所示:
一个FixedSubscriberChannel是一个仅支持单一SubscribableChannel订阅者且该订阅者无法取消订阅的MessageHandler。
这在没有其他订阅者参与且不需要通道拦截器的情况下,对于高吞吐量性能用例非常有用。
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
注意,load-balancer 和 load-balancer-ref 属性是互斥的。
负载均衡也与布尔类型的 failover 属性一起工作。
如果 failover 值为 true(默认值),则在前置处理器抛出异常时,分发器会回退到任何后续处理器(必要时)。
处理器的顺序由其自身定义的可选顺序值决定,或者如果没有此类值,则按照处理器订阅的顺序。
如果某些情况要求调度器每次都尝试调用第一个处理器,并且在每次发生错误时都按照相同的固定顺序进行回退,则不应提供负载均衡策略。
换句话说,即使未启用负载均衡,调度器仍然支持failover布尔属性。
然而,在没有负载均衡的情况下,处理器的调用始终从第一个开始,根据它们的顺序。
例如,当有一个明确的一级、二级、三级等定义时,这种方法效果很好。
当使用命名空间支持时,任何端点上的order属性决定了其顺序。
请注意,负载均衡和failover仅在通道有多个订阅的消息处理器时适用。
当使用命名空间支持时,这意味着多个端点共享同一个在input-channel属性中定义的通道引用。 |
自 5.2 版本起,当failover为真时,当前处理器的失败及其失败消息会被记录在debug或根据配置记录在info。
ExecutorChannel
ExecutorChannel 是一个点对点通道,支持与 DirectChannel 相同的调度配置(负载均衡策略和 failover 布尔属性)。
这两种调度通道类型的关键区别在于,ExecutorChannel 会委托给一个 TaskExecutor 的实例来执行调度。
这意味着 send 方法通常不会阻塞,但这同时也意味着处理器的调用可能不在发送者的线程中发生。
因此,它不支持跨越发送者和接收处理器的事务。
发送者有时会被阻塞。
例如,当使用 TaskExecutor 与限制策略(如 ThreadPoolExecutor.CallerRunsPolicy)进行客户端速率限制时,发送者的线程可以在线程池达到最大容量且执行者的任务队列已满的情况下随时执行方法。
由于这种情况只会在不可预测的方式下发生,你不应依赖它来进行事务处理。 |
PartitionedChannel
自 Spring 框架的 6.1 版本起,提供了一个 PartitionedChannel 实现。
这是对 AbstractExecutorChannel 的扩展,并代表了一种点到点分发逻辑,在这种逻辑中,实际消费会在一个特定的线程上进行处理,这个线程是由发送给此通道的消息中的分区键评估确定的。
该通道类似于上述提及的 ExecutorChannel,但不同之处在于具有相同分区键的消息总是由同一个线程处理,从而保留了消息的顺序。
它不需要外部的 TaskExecutor,但是可以配置自定义的 ThreadFactory(例如 Thread.ofVirtual().name("partition-", 0).factory())。
此工厂用于根据每个分区填充单线程执行器到一个 MessageDispatcher 代理中。
默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头被用作分区键。
该通道可以配置为简单的 Bean:
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
该通道将有3分区 - 专用线程;并将使用partitionKey头来确定消息将在哪个分区处理。
有关更多信息,请参见PartitionedChannel类的Javadoc。
FluxMessageChannel
FluxMessageChannel 是一个针对 "sinking" 的已发送消息的 org.reactivestreams.Publisher 实现,它将消息发送至内部 reactor.core.publisher.Flux,供下游响应式订阅者按需消费。
该通道实现既不是 SubscribableChannel,也不是 PollableChannel,因此只能使用 org.reactivestreams.Subscriber 实例从该通道进行消费,以遵循响应式流的背压特性。
另一方面,FluxMessageChannel 实现了 ReactiveStreamsSubscribableChannel,其 subscribeTo(Publisher<Message<?>>) 契约允许从响应式源发布者接收事件,从而将响应式流桥接到集成流程中。
为了实现整个集成流程的完全响应式行为,此类通道必须放置在流程中的所有端点之间。
有关与响应式流的交互的更多信息,请参阅 响应式流支持。
作用域通道
Spring Integration 1.0 提供了一个 ThreadLocalChannel 实现,但该实现已于 2.0 版本中移除。
现在处理相同需求的更通用方法是向一个 channel 添加一个 scope 属性。属性值可以是一个在上下文中可用的作用域名称。
例如,在 Web 环境中,某些作用域是可用的,并且任何自定义的作用域实现都可以注册到上下文中。下面的例子展示了如何将线程局部作用域应用到一个 channel 中,包括作用域本身的注册:
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
在前面示例中定义的通道内部也会委托给一个队列,但该通道绑定到了当前线程上,因此队列的内容也同样被绑定。
这样,在发送消息到通道的线程之后可以从终端通道接收这些相同的消息,但其他线程无法访问它们。
虽然按线程范围定义的通道很少需要使用,但在仅使用DirectChannel实例来确保单一操作线程的情况下,并且任何回复消息都应该发往一个“终端”通道时,这样的通道可以非常有用。如果这个终端通道是按线程范围定义的,那么原始发送线程可以从终端通道收集其回复。
现在,由于任何频道都可以进行范围限定,您除了可以使用线程局部变量之外还可以定义自己的范围。
通道拦截器
消息架构的一个优势是能够以非侵入的方式提供通用行为,并捕获系统中传递的消息的有意义信息。
由于Message实例发送到和从MessageChannel实例接收,因此这些通道提供了拦截发送和接收操作的机会。
如以下列表所示,ChannelInterceptor策略接口提供每个操作的方法:
public interface ChannelInterceptor {
Message<?> preSend(Message<?> message, MessageChannel channel);
void postSend(Message<?> message, MessageChannel channel, boolean sent);
void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);
boolean preReceive(MessageChannel channel);
Message<?> postReceive(Message<?> message, MessageChannel channel);
void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}
在实现接口后,将拦截器注册到通道上只需进行以下调用:
channel.addInterceptor(someChannelInterceptor);
返回 Message 实例的方法可用于转换 Message,或者返回 'null' 以阻止进一步处理(当然,任何方法都可能抛出 RuntimeException)。
此外,preReceive 方法也可以返回 false 以阻止接收操作继续进行。
请记住,receive() 次调用仅与 PollableChannels 相关。
实际上,SubscribableChannel 接口甚至没有定义 receive() 方法。
其原因在于,当向 SubscribableChannel 发送 Message 时,它会根据通道类型(例如,PublishSubscribeChannel 会发送给其所有订阅者)直接发送给零个或多个订阅者。
因此,preReceive(…)、postReceive(…) 和 afterReceiveCompletion(…) 拦截器方法仅在拦截器应用于 PollableChannel 时才会被调用。 |
Spring Integration 还提供了 旁路监听 模式的实现。
它是一个简单的拦截器,将 Message 发送到另一个通道,而不会以其他方式改变现有流程。
它在调试和监控中非常有用。
示例见 旁路监听。
因为很少需要实现拦截器方法的所有功能,该接口提供了无操作的方法(返回void的方法没有代码,返回Message的方法直接返回Message,而返回boolean的方法返回true)。
拦截器方法的调用顺序取决于通道类型。如前面所述,基于队列的通道是唯一一个在最初就被拦截了receive()方法的地方。另外,发送和接收拦截之间的关系取决于单独的发送线程和接收线程的时间安排。例如,如果接收者已经在等待消息时被阻塞,则顺序可能是:preSend,preReceive,postReceive,postSend。然而,如果接收方在发送方将消息放入通道并返回后进行轮询,顺序将如下所示:preSend、postSend(经过一段时间)、preReceive、postReceive。在这种情况下所经过的时间取决于多个因素,因此通常是不可预测的(事实上,接收操作可能永远不会发生)。队列的类型也起着作用(例如,rendezvous 与优先级)。简而言之,除了 preSend 在 postSend 之前、preReceive 在 postReceive 之前这一事实外,你不能依赖其他顺序。
|
从 Spring Framework 4.1 和 Spring Integration 4.1 开始,ChannelInterceptor 提供了新的方法:afterSendCompletion() 和 afterReceiveCompletion()。 它们在 send()' and 'receive() 调用之后被调用,无论是否抛出任何异常,这允许进行资源清理。 请注意,通道会按照与初始 preSend() 和 preReceive() 调用相反的顺序,在 ChannelInterceptor 列表上调用这些方法。
自 Spring Framework 5.1 版本起,全局频道拦截器现在也适用于动态注册的频道——例如通过使用 beanFactory.initializeBean() 或 IntegrationFlowContext 初始化 Java DSL 中定义的 beans。
此前,当应用上下文刷新后创建的 beans 不会应用拦截器。
从5.1版本开始,如果没有收到消息不再调用ChannelInterceptor.postReceive();因此,再不需要检查null或Message<?>。
以前的方法会调用。如果您有一个依赖于之前行为的拦截器,请实现afterReceiveCompleted(),因为无论是否接收到消息,该方法都会被调用。
从 5.2 版本开始,ChannelInterceptorAware 已弃用,现在出于向后兼容性考虑,它扩展了来自 Spring Messaging 模块的 InterceptableChannel。 |
MessagingTemplate
当介绍了端点及其各种配置选项后,Spring Integration 为消息组件提供了一个基础,使您的应用代码能够非侵入式地从消息系统中被调用。
然而,在某些情况下也需要从您的应用代码中调用消息系统。
为了在实现此类用例时提供便利,Spring Integration 提供了一个MessagingTemplate,它支持各种跨消息通道的操作,包括请求和回复场景等。
例如,您可以发送一个请求并等待回复,如下所示:
MessagingTemplate template = new MessagingTemplate();
Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));
在前面的示例中,模板内部会创建一个临时匿名通道。 '发送超时'和'接收超时'属性也可以设置在模板上,并且还支持其他交换类型。 以下列表展示了此类方法的签名:
public boolean send(final MessageChannel channel, final Message<?> message) { ...
}
public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}
public Message<?> receive(final PollableChannel<?> channel) { ...
}
一种侵入性更小的方法,允许您通过负载或头值调用简单接口,而不是使用 Message 实例,在 进入 GatewayProxyFactoryBean 中有描述。 |
配置消息通道
要创建一个消息通道实例,您可以通过以下方式之一使用<channel/>元素进行XML配置或使用DirectChannel实例进行Java配置:
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
当您使用<channel/>元素而没有任何子元素时,会创建一个DirectChannel实例(一个SubscribableChannel)。
要创建发布/订阅频道,请使用<publish-subscribe-channel/>元素(在Java中为PublishSubscribeChannel),如下所示:
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
您还可以提供各种 <queue/> 子元素,以创建任意可轮询通道类型(如消息通道实现中所描述)。
以下各节展示了每种通道类型的示例。
DirectChannel配置
如先前所述,DirectChannel 是默认类型。
以下列出了如何定义一个:
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
默认通道具有轮询负载均衡器,并且已启用故障转移(有关更多详细信息,请参阅DirectChannel)。
要禁用其中一项或两项功能,请添加一个<dispatcher/>子元素(即DirectChannel的LoadBalancingStrategy构造函数),并按以下方式配置属性:
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
数据类型通道配置
有时候,消费者只能处理特定类型的消息负载,这就要求你确保输入消息的类型。 最初的想法可能是使用消息过滤器。但是,消息过滤器所能做的只是过滤掉不符合消费者需求的消息。 另一种方法是使用内容路由,并将不符合数据类型的message导向特定的transformer进行转换和转换为所需的数据类型。 这可以实现目标,但更简单的方法是应用数据类型通道模式。你可以为每种具体的payload数据类型使用单独的数据类型通道。
若要创建一个仅接受包含特定有效负载类型消息的数据类型通道,请在通道的 datatype 属性中提供数据类型的全限定类名,如下例所示:
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
注意,类型检查会通过任何可以分配给通道数据类型的类型。
换句话说,在前面的示例中,numberChannel 将接受载荷为 java.lang.Integer 或 java.lang.Double 的消息。
多个类型可以通过逗号分隔列表提供,如下一个示例所示:
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
那么,在前面的示例中,'numberChannel' 只接受数据类型为 java.lang.Number 的消息。
但是,如果消息的有效载荷不是所需的数据类型会发生什么情况呢?
这取决于你是否定义了一个名为 integrationConversionService 的 bean,并且该 bean 是 Spring 转换服务的一个实例。
如果没有,则会立即抛出一个 Exception 异常。
但是,如果你已经定义了 integrationConversionService bean,则会尝试将消息的有效载荷转换为可接受的数据类型。
您可以注册自定义转换器。
例如,假设您向我们上面配置的'numberChannel'发送了一个String负载的消息。
您可以按照以下方式处理该消息:
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常,这将是一个完全合法的操作。 然而,由于我们使用了 Datatype Channel,此类操作的结果将生成类似以下的异常:
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
由于我们要求负载类型为Number,但发送的是String,因此发生了异常。
所以我们需要一些方法将String转换为Number。
为此,我们可以实现类似于以下示例的转换器:
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
然后我们可以将其注册为转换器,使用集成转换服务,如下例所示:
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
在StringToIntegerConverter类中,当该类被标记有@Component注解进行自动扫描时。
当解析 'converter' 元素时,如果尚未定义该元素,则会创建 integrationConversionService bean。
有了这个转换器后,send 操作现在将成功执行,因为数据类型通道使用该转换器将 String 载荷转换为 Integer。
有关负载类型转换的更多信息,请参阅负载类型转换。
从版本4.0开始,integrationConversionService通过DefaultDatatypeChannelMessageConverter调用,后者会在应用上下文中查找转换服务。
要使用不同的转换技术,您可以在频道上指定message-converter属性。此属性必须引用一个MessageConverter实现。
仅使用fromMessage方法。
该方法为转换器提供了访问消息头的权限(如果转换需要来自头部的信息,例如content-type)。
此方法只能返回已转换的有效负载或完整的Message对象。如果是后者,则转换器必须小心地从入站消息中复制所有头部信息。
您可以声明一个类型为MessageConverter、ID为datatypeChannelMessageConverter的<bean/>,并且该转换器将被所有具有datatype类型的通道使用。
QueueChannel配置
要创建一个QueueChannel,使用<queue/>子元素。
您可以按照以下方式指定通道的容量:
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
如果没有为这个 <queue/> 子元素的 'capacity' 属性提供值,生成的队列将是无界的。
为了避免内存耗尽等问题,我们强烈建议您为有界队列设置一个显式的值。 |
持久的QueueChannel配置
由于 QueueChannel 提供了消息缓冲能力,但默认情况下仅在内存中缓冲,因此也存在系统故障时消息可能丢失的风险。
为缓解此风险,QueueChannel 可以基于 MessageGroupStore 策略接口的持久化实现来构建。
有关 MessageGroupStore 和 MessageStore 的更多详情,请参见 消息存储。
capacity 属性在使用了 message-store 属性时不允许。 |
当一个QueueChannel接收到一个Message时,它会将消息添加到消息存储中。
当从一个QueueChannel中轮询出一个Message时,该消息将被移除出消息存储。
默认情况下,一个QueueChannel将其消息存储在一个内存队列中,这可能会导致之前提到的消息丢失的情况。
然而,Spring Integration 提供了持久化存储方式,例如JdbcChannelMessageStore。
您可以为任何QueueChannel配置消息存储,通过添加message-store属性,如下例所示:
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(见下方的Java/Kotlin 配置选项示例。)
Spring Integration JDBC 模块还提供了针对多种流行数据库的模式数据定义语言(DDL)。
这些模式位于该模块的 org.springframework.integration.jdbc.store.channel 包中 (spring-integration-jdbc)。
一个重要的功能是,对于任何事务性持久存储(例如JdbcChannelMessageStore),只要轮询器配置了事务,在事务成功完成之前,从存储中移除的消息不会被永久删除。否则,如果事务回滚,则消息Message不会丢失。 |
随着越来越多的与"NoSQL"数据存储相关的 Spring 项目提供对这些存储的底层支持,许多其他消息存储实现也已可用。
如果您找不到符合您特定需求的实现,也可以自行提供对 MessageGroupStore 接口的实现。
自 4.0 版本起,我们建议尽可能配置 QueueChannel 个实例以使用 ChannelMessageStore。
与通用消息存储相比,这些通常针对此用途进行了优化。
如果 ChannelMessageStore 是 ChannelPriorityMessageStore,则消息将按优先级顺序以先进先出(FIFO)方式接收。
优先级的概念由消息存储实现决定。
例如,以下示例展示了 MongoDB 通道消息存储 的 Java 配置:
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlow.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
注意MessageGroupQueue类。
那是使用BlockingQueue操作的MessageGroupStore实现。 |
另一种自定义 QueueChannel 环境的方式是通过 <int:queue> 子元素或其特定构造函数中的 ref 属性提供。
该属性提供了对任何 java.util.Queue 实现的引用。
例如,可以按如下方式配置 Hazelcast 分布式 IQueue:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel配置
要创建一个PublishSubscribeChannel,可以使用<publish-subscribe-channel/> 元素。
在使用此元素时,还可以指定用于发布消息的task-executor(如果没有指定,则在发送者的线程中进行发布),如下所示:
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
如果您在PublishSubscribeChannel下游提供重组器或聚合器,则可以将通道的'apply-sequence'属性设置为true。
这样做表示通道应在传递消息之前设置sequence-size和sequence-number消息头以及关联ID。
例如,如果有五个订阅者,则sequence-size将设置为5,并且消息的sequence-number头值范围将从1到5。
除了 Executor,您还可以配置一个 ErrorHandler。
默认情况下,PublishSubscribeChannel 使用 MessagePublishingErrorHandler 实现将错误从 errorChannel 头发送到的 MessageChannel,或注入到全局 errorChannel 实例中。
如果未配置 Executor,则忽略 ErrorHandler,异常将直接抛给调用者的线程。
如果在下游提供一个Resequencer或Aggregator,紧跟着上游的PublishSubscribeChannel,您可以将通道上的'apply-sequence'属性设置为true。
这样表示该通道应当在传递消息之前设置序列大小和序列号消息标头以及关联ID。
例如,如果有五个订阅者,序列大小会被设置为5,而消息的序列号标头值将从1到5不等。
以下示例展示了如何将 apply-sequence 头设置为 true:
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
The apply-sequence 值默认为 false,这样发布-订阅通道就可以将完全相同的消息实例发送到多个传出通道。
由于 Spring Integration 强制实现负载和头引用的不可变性,当该标志设置为 true 时,通道会创建新的 Message 实例,这些实例具有相同的负载引用但有不同的头值。 |
自 5.4.3 版本起,PublishSubscribeChannel 可以通过其 BroadcastingDispatcher 的 requireSubscribers 选项进行配置,以指示此通道在没有订阅者时不会默默地忽略消息。
当没有订阅者且此选项设置为 true 时,会抛出一个带有 Dispatcher has no subscribers 标签的 MessageDispatchingException 异常。
ExecutorChannel
要创建一个ExecutorChannel,添加一个<dispatcher>子元素并带有task-executor属性。
该属性的值可以引用上下文中任何TaskExecutor。
例如,这样做可以在向订阅处理程序发送消息时配置线程池进行调度。
正如前面提到的,这样做会打破发送方和接收方之间的单线程执行上下文,使得任何活跃的事务上下文不会被处理程序调用所共享(也就是说,处理程序可能会抛出Exception异常,但send的调用已经成功返回)。
以下示例展示了如何使用dispatcher元素并在task-executor属性中指定执行器:
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
|
|
PriorityChannel配置
要创建一个PriorityChannel,可以使用<priority-queue/>子元素,如下例所示:
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
默认情况下,通道会咨询消息的priority标头。
但是,你可以提供一个自定义的Comparator引用。另外,请注意,PriorityChannel(和其他类型一样)也支持datatype属性。
就像QueueChannel一样,它还支持一个capacity属性。
以下示例演示了所有这些内容:
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
自 4.0 版本起,priority-channel 子元素支持 message-store 选项(此时不允许使用 comparator 和 capacity)。
消息存储必须是 PriorityCapableChannelMessageStore。
目前为 Redis、JDBC 和 MongoDB 提供了 PriorityCapableChannelMessageStore 的实现。
有关更多信息,请参阅 QueueChannel 配置 和 消息存储。
您可以在 后备消息通道 中找到示例配置。
RendezvousChannel配置
当队列子元素为<rendezvous-queue>时,会创建一个RendezvousChannel。
它不提供任何额外的配置选项给前面描述的选项,且其队列不接受任何容量值,因为它是一个零容量的直接传递队列。
以下示例展示了如何声明一个RendezvousChannel:
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
通道拦截器配置
消息通道也可以拥有拦截器,如通道拦截器中所述。
<interceptors/>子元素可以添加到<channel/>(或更具体的元素类型)中。
您可以提供ref属性来引用任何实现了ChannelInterceptor接口的Spring托管对象,如下例所示:
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
一般情况下,我们建议将拦截器实现定义在单独的位置,因为它们通常提供可以跨多个渠道复用的常用行为。
全局通道拦截器配置
通道拦截器提供了一种简洁且清晰的方式来为每个单独的通道应用横切行为。 如果相同的横切行为需要在多个通道上应用,为每个通道配置相同的一组拦截器将不是最有效的方式。 为了避免重复配置同时也能让拦截器应用于多个通道,Spring Integration 提供了全局拦截器。 考虑以下示例对:
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
每个<channel-interceptor/>元素允许您定义一个全局拦截器,该拦截器应用于匹配pattern属性定义的所有通道。
在前面的情况下,全局拦截器将应用到'thing1'通道以及所有以'thing2'或'input'开头的其他通道(从版本5.0开始),但不包括以'thing3'开头的通道。
在模式中添加此语法可能会引起一个(尽管可能不太可能发生的问题)。
如果你有一个名为!thing1的bean,并且你在通道拦截器的pattern模式中包含了!thing1这个模式,那么它不再匹配。
现在的模式匹配所有不名为thing1的bean。
在这种情况下,你可以通过使用\来转义模式中的!。
模式\!thing1可以匹配一个名为!thing1的bean。 |
order 属性允许您在给定通道上存在多个拦截器时,管理此拦截器的注入位置。 例如,'inputChannel' 通道可以配置本地独立的拦截器(见下文),如下例所示:
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
一个合理的问题是:"全局拦截器相对于其他在本地或通过其他全局拦截器定义配置的拦截器是如何注入的?"
当前实现提供了一种简单的机制来定义拦截器的执行顺序。
order属性中的正数确保拦截器在任何现有拦截器之后注入,而负数确保拦截器在现有拦截器之前注入。
这意味着,在前面的示例中,全局拦截器是在'wire-tap'拦截器(在本地配置)之后注入的(因为其order大于0)。
如果存在另一个具有匹配pattern的全局拦截器,其顺序将通过比较两个拦截器的order属性值来确定。
若要在现有拦截器之前注入全局拦截器,请将order属性设置为负值。
注意,order 和 pattern 属性都是可选的。
order 的默认值为 0,而 pattern 的默认值是 '*'(表示匹配所有频道)。 |
消息拦截
正如前面提到的,Spring 集成提供了一个简单的 wiretap 监听器拦截器。
您可以在<interceptors/>元素内的任何通道上配置一个 wiretap。
这样做特别适用于调试,可以与 Spring 集成的日志通道适配器结合使用,如下所示:
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter' 也接受 'expression' 属性,以便您可以针对 'payload' 和 'headers' 变量评估 SpEL 表达式。
或者,若要记录完整的消息 toString() 结果,请将 'log-full-message' 属性的值提供为 true。
默认情况下,其值为 false,因此仅记录负载(payload)。
将其设置为 true 可在记录负载的同时启用所有头的记录。
'expression' 选项提供了最大的灵活性(例如 expression="payload.user.name")。 |
关于 wire tap 和其他类似组件(消息发布配置)的一个常见误解是,它们本质上是自动异步的。 默认情况下,wire tap 作为组件并非以异步方式调用。 相反,Spring Integration 专注于一种统一的异步行为配置方法:消息通道。 消息流中某些部分是同步还是异步,取决于在该流中配置的 Message Channel 类型。 这正是消息通道抽象的主要优势之一。 从框架诞生之初,我们就一直强调消息通道作为框架一等公民的必要性和价值。 它不仅仅是 EIP 模式的内部隐式实现。 它对最终用户完全暴露为一个可配置的组件。 因此,wire tap 组件仅负责执行以下任务:
-
通过监听通道(例如
channelA)拦截消息流 -
抓取每个消息
-
将消息发送到另一个通道(例如,
channelB)
这实际上是一种桥接模式的变体,但它是封装在通道定义之内的(因此,在不中断流的情况下更容易启用和禁用)。也不同步桥接器,它基本上会 fork 另一个消息流。是同步还是异步流程?答案取决于'message channel' 'channelB'的类型。我们有以下选项:直接通道、可轮询通道和执行器通道。最后一段打破了线程边界,使得通过这样的通道进行通信是异步的,因为从该通道发送消息到其订阅处理程序的消息分派发生在与发送消息到该通道使用的线程不同的线程上。这就是会让您的线程流同步或异步的原因。它与框架内的其他组件(如消息发布器)保持一致,并通过避免您在编写线程安全代码之外还要预先考虑某个特定代码片段应实现为同步还是异步的方式,为您增加了统一性和简化性。两个代码片段(例如组件 A 和组件 B)通过消息通道进行实际连接,正是这种连接决定了它们的协作是同步的还是异步的。您甚至可能希望将来从同步模式切换到异步模式,而消息通道(message channel)让您能够迅速完成这一切换,且无需触碰任何代码。
关于 wire tap 的最后一个要点是,尽管在上面提供了不默认异步的理由,但仍需记住尽早将消息转交给接收方通常是很有益的。 因此,在 wire tap 的传出通道中使用异步通道选项是很常见的做法。 然而,默认情况下并不会强制执行这种异步行为。 如果我们这样做,会破坏许多用例,包括你可能不想打破事务边界。 或许你使用 wire tap 模式进行审计,而希望审计消息在原始事务中发送。 例如,你可以将 wire tap 连接到一个 JMS 出站通道适配器。 这样,你就可以同时获得以下好处:1) 可以在一个事务中发送一个 JMS 消息,2) 仍然可以实现“即发即弃”的行为,从而避免对主要消息流产生明显延迟。
从版本 4.0 开始,当拦截器(例如 WireTap 类)引用通道时,避免循环引用非常重要。
您需要将此类通道排除在当前拦截器所拦截的通道之外。
这可以通过适当的模式或编程方式实现。
如果您有一个自定义的 ChannelInterceptor 引用了 channel,请考虑实现 VetoCapableInterceptor。
这样,框架会根据提供的模式询问拦截器是否可以拦截每个候选通道。
您还可以在拦截器方法中添加运行时保护,以确保该通道不是由拦截器引用的通道。
WireTap 同时使用了这两种技术。 |
从版本 4.3 开始,WireTap具有额外的构造函数,这些构造函数接受一个channelName而不是一个
MessageChannel实例。这在 Java 配置和使用通道自动创建逻辑时非常方便。
目标MessageChannel bean 是在提供的channelName稍后解析的,在首次与拦截器交互时完成。
通道解析需要一个BeanFactory,因此wire tap实例必须是Spring管理的bean。 |
此晚期绑定的方法还允许通过使用Java DSL配置简化典型的探针模式,如下例所示:
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
条件性线路监听
通过使用selector或selector-expression属性,可以对截获进行条件设置。
selector引用了一个MessageSelector bean,该bean可以在运行时决定消息是否应该发送到截获通道。
同样地,selector-expression是一个布尔SpEL表达式,其作用相同:如果该表达式的值为true(即为true),则消息会被发送到截获通道。
全局 Wire Tap 配置
可以将全局线程捕获配置为全局通道拦截器配置的一种特殊情形。
为此,配置一个顶层的wire-tap元素。
现在,除了正常的wire-tap命名空间支持外,还支持pattern和order属性,并且它们的工作方式与channel-interceptor中的相同。
以下示例展示了如何配置全局线程捕获:
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局线程截获提供了一种方便的方法,在不修改现有通道配置的情况下,对外部配置单通道线程截获。
为此,请将pattern属性设置为目标通道名称。
例如,您可以使用此技术来配置一个测试用例以验证通道上的消息。 |
特殊频道
两个默认定义在应用上下文中的特殊通道是:errorChannel 和 nullChannel。'nullChannel'(一个NullChannel的实例)类似于/dev/null,会将任何发送给它的消息记录在DEBUG级别,并立即返回。对于传输消息中org.reactivestreams.Publisher载荷的特殊处理是:立即订阅此通道,以启动响应式流处理,尽管数据会被丢弃。一个来自响应式流处理的错误(参见Subscriber.onError(Throwable))会被记录在warn级别日志中,以便进行可能的调查。如果需要对此类错误执行任何操作,可以将带有 Mono.doOnError() 自定义的 ReactiveRequestHandlerAdvice 应用于消息处理器,从而将 Mono 回复发送到此 nullChannel。任何时间您遇到回复的通道解析错误,而这些回复您又不关心时,可以将受影响组件的output-channel属性设置为'nullChannel'(在应用上下文中,'nullChannel'是保留名称)。
'errorChannel' 用于内部发送错误消息,可使用自定义配置进行覆盖。 有关此内容的更详细说明,请参见 错误处理。
有关消息通道和拦截器的更多信息,请参见 Java DSL 章节中的 消息通道。
轮询器
此部分描述了在Spring Integration中轮询的工作原理。
轮询消费者
当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下之一的实例:
实际实现取决于这些端点连接到的通道类型。
连接到实现 org.springframework.messaging.SubscribableChannel 接口的通道的通道适配器会生成 EventDrivenConsumer 的实例。
另一方面,连接到实现 org.springframework.messaging.PollableChannel 接口(例如 QueueChannel)的通道的通道适配器会生成 PollingConsumer 的实例。
轮询消费者让Spring Integration组件主动轮询消息,而不是以事件驱动的方式处理消息。
它们在许多消息场景中代表一个关键的横切关注点。 在 Spring Integration 中,轮询消费者基于同名模式,该模式由 Gregor Hohpe 和 Bobby Woolf 在其著作《企业集成模式》中进行描述。 您可以在本书的网站上找到该模式的描述。
可轮询消息源
Spring Integration 提供了轮询消费者模式的第二种变体。
当使用入站通道适配器时,这些适配器通常被 SourcePollingChannelAdapter 包装。
例如,在从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器 中描述的适配器会配置一个轮询器以定期检索消息。
因此,当组件配置了轮询器时,生成的实例属于以下类型之一:
这表示轮询器在入站和出站消息场景中都有使用。 以下是一些使用轮询器的用例:
-
轮询某些外部系统,例如FTP服务器、数据库和Web服务
-
轮询内部(可轮询)消息通道
-
轮询内部服务(例如,反复执行Java类中的方法)
AOP advice 类可以应用于轮询器,例如在 advice-chain 中,比如用于启动事务的事务 advice。
从 4.1 版本开始,提供了一个 PollSkipAdvice。
轮询器使用触发器来确定下一次轮询的时间。
PollSkipAdvice 可用于抑制(跳过)某次轮询,也许是因为存在某些下游条件会阻止消息被处理。
要使用此 advice,您必须为其提供一个 PollSkipStrategy 的实现。
从 4.2.5 版本开始,提供了一个 SimplePollSkipStrategy。
要使用它,您可以将其实例作为 bean 添加到应用程序上下文中,将其注入到 PollSkipAdvice 中,然后将其添加到轮询器的 advice 链中。
要跳过轮询,请调用 skipPolls()。
要恢复轮询,请调用 reset()。
4.2 版本在此方面增加了更多灵活性。
请参阅 消息源的有条件轮询器。 |
延迟确认可轮询消息源
自5.0.1版本起,某些模块提供了MessageSource实现,支持在下游流完成(或把消息交给另一个线程)时延迟确认。
目前这仅限于AmqpMessageSource和KafkaMessageSource模块。
使用这些消息源时,会将IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK标头(参见MessageHeaderAccessor API)添加到消息中。
当与可轮询的消息源一起使用时,该标头的值是AcknowledgmentCallback的实例,如下例所示:
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如,KafkaMessageSource)都支持 REJECT 状态。
它被视为与 ACCEPT 相同。
应用程序可以在任何时间确认一条消息,如下例所示:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果 MessageSource 连接到 SourcePollingChannelAdapter,当下游流程完成后轮询线程返回适配器时,适配器会检查确认是否已经完成;若未完成,则将其状态设置为 ACCEPT(如果流程抛出异常,则设置为 REJECT)。
状态值定义在 AcknowledgmentCallback.Status 枚举 中。
Spring Integration 提供了MessageSourcePollingTemplate来执行对MessageSource的即时轮询。
这也会负责在MessageHandler回调返回(或抛出异常)时为AcknowledgmentCallback设置ACCEPT或REJECT。
以下示例展示了如何使用MessageSourcePollingTemplate进行轮询:
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在这两种情况下(SourcePollingChannelAdapter 和 MessageSourcePollingTemplate),你都可以通过在回调中调用 noAutoAck() 来禁用自动确认/否定确认。
如果你将消息传递给另一个线程并希望稍后再进行确认,可能会这样做。
并非所有实现都支持此功能(例如,Apache Kafka 就不支持,因为偏移量提交必须在同一线程上执行)。
消息源的有条件轮询器
此部分介绍了如何使用条件性检查器。
背景
Advice 个对象,在一个 advice-chain 上进行轮询,建议整个轮询任务(包括消息检索和处理)。
这些“环绕通知”方法无法访问任何轮询的上下文——只能访问轮询本身。
这适用于如使任务事务性或因某些外部条件跳过轮询等要求,正如之前所述。
如果我们希望根据 receive 部分的结果采取一些行动,或者希望根据条件调整轮询器呢?对于这些情况,Spring Integration 提供了“智能”轮询。
“智能”轮询
Version 5.3 引入了 ReceiveMessageAdvice 接口。
任何在 Advice 中实现此接口的 advice-chain 对象仅应用于 receive() 操作 - MessageSource.receive() 和 PollableChannel.receive(timeout)。
因此,它们只能用于 SourcePollingChannelAdapter 或 PollingConsumer。
此类实现以下方法:
-
beforeReceive(Object source)This method is called before theObject.receive()method. It lets you examine and reconfigure the source. Returningfalsecancels this poll (similar to thePollSkipAdvicementioned earlier). -
Message<?> afterReceive(Message<?> result, Object source)This method is called after thereceive()method. Again, you can reconfigure the source or take any action (perhaps depending on the result, which can benullif there was no message created by the source). You can even return a different message
|
线程安全
如果 |
|
建议链排序
您应该了解在初始化过程中如何执行建议链。
|
SimpleActiveIdleReceiveMessageAdvice
此建议是ReceiveMessageAdvice的一个简单实现。
当与一个DynamicPeriodicTrigger一起使用时,它会根据上次轮询是否产生了消息来调整轮询频率。
轮询器还必须引用同一个DynamicPeriodicTrigger。
重要提示:异步交接
SimpleActiveIdleReceiveMessageAdvice 根据 receive() 的结果来修改触发条件。
这仅在建议被调用时处于轮询线程的情况下生效。
如果轮询线程有 task-executor,则不会生效。
要在轮询结果之后使用异步操作,请稍后执行异步交接,或许可以通过使用 ExecutorChannel 来实现。 |
CompoundTriggerAdvice
此建议允许根据轮询是否返回消息来选择两种触发器之一。
考虑一个使用 CronTrigger 的轮询器。CronTrigger 实例是不可变的,因此构造后不能被修改。
考虑这样一个用例:我们希望每小时使用 cron 表达式触发一次轮询,但如果未收到消息,则每分钟进行一次轮询,并在检索到消息时重新使用 cron 表达式。
The advice (and poller) 使用一个 CompoundTrigger 来达到这个目的。
The trigger 的 primary 触发器可以是一个 CronTrigger。
当建议(advice)检测到没有接收到消息时,它会将次要触发器添加到 CompoundTrigger 中。
当 CompoundTrigger 实例的 nextExecutionTime 方法被调用时,如果存在次要触发器,则委托给次要触发器;否则,委托给主要触发器。
必须将.poller也引用同一个CompoundTrigger。
以下示例展示了使用每分钟作为cron表达式备用的每小时配置:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
重要提示:异步交接
CompoundTriggerAdvice 根据 receive() 的结果来修改触发条件。
这仅在建议被调用时处于轮询线程的情况下生效。
如果轮询线程有 task-executor,则不会生效。
要在轮询结果之后使用异步操作,请稍后执行异步交接,或许可以通过使用 ExecutorChannel 来实现。 |
仅 MessageSource 的提示
一些建议可能仅适用于MessageSource.receive(),而对PollableChannel并无意义。
为此,MessageSourceMutator接口(作为ReceiveMessageAdvice的扩展)仍然保留。
有关更多信息,请参阅入站通道适配器:轮询多个服务器和目录。
通道适配器
频道适配器是一个消息端点,它允许将单一发送者或接收者连接到一个消息通道。 Spring Integration 提供了多种适配器以支持各种传输方式,例如 JMS、文件、HTTP、Web 服务、邮件等。 本参考指南后续章节将讨论每种适配器。 然而,本章专注于简单但灵活的方法调用频道适配器支持。 既有入站适配器也有出站适配器,并且每个适配器都可以通过核心命名空间提供的 XML 元素进行配置。 这些提供了方便的扩展 Spring Integration 的方式,只要您有一个可以作为源或目的地调用的方法即可。
配置入站通道适配器
一个inbound-channel-adapter元素(在Java配置中为SourcePollingChannelAdapter)可以调用任何Spring管理的对象的方法,并将非空返回值发送到MessageChannel,同时将方法的输出转换为Message。
当适配器的订阅激活时,轮询器会尝试从源接收消息。
轮询器根据提供的配置使用TaskScheduler进行调度。
要为单独的通道适配器配置轮询间隔或cron表达式,可以提供一个带有调度属性(如'fixed-rate'或'cron')的'poller'元素。
以下示例定义了两个inbound-channel-adapter实例:
@Bean
public IntegrationFlow source1() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.fixedRate(5000)))
...
.get();
}
@Bean
public IntegrationFlow source2() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
...
.get();
}
public class SourceService {
@InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
Object method1() {
...
}
@InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
Object method2() {
...
}
}
@Bean
fun messageSourceFlow() =
integrationFlow( { GenericMessage<>(...) },
{ poller { it.fixedRate(5000) } }) {
...
}
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>
<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
<int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
也请参见通道适配器表达式和脚本。
| 如果没有提供轮询器,则必须在上下文中注册单个默认轮询器。 有关更多详细信息,请参阅 端点命名空间支持。 |
|
重要提示:轮询器配置
所有
在第一种配置中,轮询任务每轮调用一次,并且,在每次任务(轮询)期间,根据
注意,这里没有指定 然而,在 然而,如果您确定您的方法可以返回 null,并且需要在每次轮询中尽可能多地获取所有可用源,则应显式地将
自 5.5 版本起, 另请参阅 全局默认轮询器 以获取更多信息。 |
配置出站通道适配器
一个outbound-channel-adapter元素(对于Java配置来说是一个@ServiceActivator)可以连接一个MessageChannel,使其能够将接收到的消息通道中的消息内容作为参数调用任何POJO消费者方法。
下面的示例展示了如何定义出站频道适配器:
@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
return f -> f
.handle(myPojo, "handle");
}
public class MyPojo {
@ServiceActivator(channel = "channel1")
void handle(Object payload) {
...
}
}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
integrationFlow {
handle(myPojo, "handle")
}
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>
<beans:bean id="target" class="org.MyPojo"/>
如果正在适配的通道是 PollableChannel,则必须提供一个 poller 子元素(即 @ServiceActivator 上的 @Poller 子注解),如下例所示:
public class MyPojo {
@ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
void handle(Object payload) {
...
}
}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
<int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>
<beans:bean id="target" class="org.MyPojo"/>
您应该使用一个ref属性,如果POJO消费者实现可以在其他<outbound-channel-adapter>定义中重用。
然而,如果消费者实现仅被单个定义的<outbound-channel-adapter>引用,则可以将其定义为内部bean,如下例所示:
<int:outbound-channel-adapter channel="channel" method="handle">
<beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
在同一<outbound-channel-adapter>配置中同时使用ref属性和内部处理器定义是不允许的,因为这会导致一个模糊的状态。
这样的配置会抛出一个异常。 |
任何频道适配器都可以不依赖于channel引用创建,在这种情况下,它会隐式地创建一个DirectChannel的实例。
创建的频道名称与id属性相匹配,该属性位于<inbound-channel-adapter>或<outbound-channel-adapter>元素中。
因此,如果未提供channel,则必须提供id。
通道适配器表达式与脚本
类似于其他Spring Integration组件,<inbound-channel-adapter>和<outbound-channel-adapter>也提供了SpEL表达式评估的支持。
要使用SpEL,请在'expression'属性中提供表达式字符串,而不是使用用于对bean进行方法调用的'ref'和'method'属性。
当表达式被评估时,它遵循与方法调用相同的契约:对于<inbound-channel-adapter>中的表达式,只要评估结果不是空值,就会生成一条消息;而对于<outbound-channel-adapter>中的表达式,必须等同于一个返回void的方法调用。
Spring Integration 3.0 及以上版本中,一个 <int:inbound-channel-adapter/> 也可以配置为 SpEL <expression/>(甚至可以是 <script/>)的子元素,以满足比简单 'expression' 属性所能实现的更为复杂的场景。
如果通过使用 location 属性提供脚本作为 Resource,还可以设置 refresh-check-delay,这允许资源定期重新加载。
如果您希望在每次轮询时都检查脚本,就需要与轮询器的触发器协调设置,如下例所示:
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller max-messages-per-poll="1" fixed-delay="5000"/>
<script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>
使用 <expression/> 子元素时,请同时参考 ReloadableResourceBundleExpressionSource 上的 cacheSeconds 属性。
有关表达式的更多信息,请参阅 Spring 表达式语言 (SpEL)。
有关脚本的内容,请参阅 Groovy 支持 和 脚本支持。
The <int:inbound-channel-adapter/> (SourcePollingChannelAdapter) 是一个端点,通过周期性触发来定期调用以从某些底层的 MessageSource 中拉取消息。
在拉取时,由于没有消息对象,表达式和脚本无法访问根 脚本可以生成并返回一个完整的 |
消息桥接
消息桥接是一个相对简单的端点,用于连接两个消息通道或适配器。
例如,您可能希望将一个PollableChannel连接到一个SubscribableChannel,以便订阅的端点不需要担心任何轮询配置。
相反,消息桥接提供这些轮询配置。
通过在两个通道之间提供一个中介探测器,您可以通过消息桥来限制流入的消息速率。
探测器的触发器决定了到达第二个通道的消息率,并且探测器的maxMessagesPerPoll属性强制执行吞吐量上限。
消息桥接的另一个有效用途是连接两个不同的系统。 在这种场景下,Spring Integration 的作用仅限于在这些系统之间建立连接并管理轮询器(如果必要的话)。 更常见的情况是在这两个系统之间至少配置一个转换器,以在它们的格式之间进行转换。 在这种情况下,通道可以作为转换器端点的 'input-channel'(输入通道)和 'output-channel'(输出通道)提供。 如果不需要数据格式转换,那么消息桥接本身可能就足够了。
使用 XML 配置桥接
可以使用<bridge>元素来创建两个消息通道或适配器之间的消息桥接。
要实现这一点,请提供input-channel和output-channel属性,如下例所示:
<int:bridge input-channel="input" output-channel="output"/>
正如前面所述,消息桥接器的一个常见用例是将一个PollableChannel连接到一个SubscribableChannel。
在此角色中执行时,消息桥接器也可能充当流量控制器:
<int:bridge input-channel="pollable" output-channel="subscribable">
<int:poller max-messages-per-poll="10" fixed-rate="5000"/>
</int:bridge>
您可以使用类似的机制来连接通道适配器。
以下示例展示了 Spring Integration 的 stream 命名空间中的 stdin 和 stdout 适配器之间的简单“回显”:
<int-stream:stdin-channel-adapter id="stdin"/>
<int-stream:stdout-channel-adapter id="stdout"/>
<int:bridge id="echo" input-channel="stdin" output-channel="stdout"/>
类似的配置同样适用于其他(可能更实用的)Channel Adapter桥梁,例如文件到JMS或邮件到文件。 后续章节将涵盖各种Channel Adapter。
| 如果桥接(bridge)中没有定义 'output-channel',则会使用入站消息提供的回复通道(reply channel),如果可用的话。 如果没有提供输出通道(output channel)或回复通道,则会抛出异常。 |
使用 Java 配置桥接
以下示例展示了如何使用@0注解在Java中配置一个桥接器:
@Bean
public PollableChannel polled() {
return new QueueChannel();
}
@Bean
@BridgeFrom(value = "polled", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public SubscribableChannel direct() {
return new DirectChannel();
}
以下示例展示了如何使用@0注解在Java中配置一个桥接器:
@Bean
@BridgeTo(value = "direct", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public PollableChannel polled() {
return new QueueChannel();
}
@Bean
public SubscribableChannel direct() {
return new DirectChannel();
}
可以使用BridgeHandler,如以下示例所示:
@Bean
@ServiceActivator(inputChannel = "polled",
poller = @Poller(fixedRate = "5000", maxMessagesPerPoll = "10"))
public BridgeHandler bridge() {
BridgeHandler bridge = new BridgeHandler();
bridge.setOutputChannelName("direct");
return bridge;
}