核心消息
消息传送通道
消息通道
虽然 在封装数据方面发挥着关键作用,但它将消息生成者与消息使用者分离。Message
MessageChannel
MessageChannel 接口
Spring 集成的顶级接口定义如下:MessageChannel
public interface MessageChannel {
boolean send(Message message);
boolean send(Message message, long timeout);
}
发送消息时,返回值为消息是否发送成功。
如果 send 调用超时或中断,则返回 。true
false
PollableChannel
由于消息通道可以也可能不缓冲消息(如 Spring 集成概述中所述),因此两个子接口定义了缓冲(可轮询)和非缓冲(可订阅)通道行为。
下面的清单显示了接口的定义:PollableChannel
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
与 send 方法一样,当接收消息时,如果出现超时或中断,则返回值为 null。
SubscribableChannel
基本接口由直接向其订阅的实例发送消息的通道实现。
因此,它们不提供用于轮询的 receive 方法。
相反,它们定义了管理这些订阅者的方法。
下面的清单显示了接口的定义:SubscribableChannel
MessageHandler
SubscribableChannel
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
消息通道实现
Spring 集成提供了不同的消息通道实现。 以下各节将简要介绍每个选项。
PublishSubscribeChannel
该 implementation 将发送给它的任何 BROADCAST 到其所有订阅的处理程序。
这最常用于发送事件消息,其主要角色是通知(与文档消息相反,文档消息通常由单个处理程序处理)。
请注意,这仅用于发送。
由于它在调用其方法时直接向订阅者广播,因此使用者无法轮询消息(它没有实现,因此没有方法)。
相反,任何订阅者本身都必须是 ,并且依次调用订阅者的方法。PublishSubscribeChannel
Message
PublishSubscribeChannel
send(Message)
PollableChannel
receive()
MessageHandler
handleMessage(Message)
在版本 3.0 之前,在没有订阅者的 上调用该方法会返回 。
当与 a 一起使用时,会引发 a。
从版本 3.0 开始,行为已更改,因此,如果至少存在最小订阅者(并成功处理消息),则始终认为 a 成功。
可以通过设置属性来修改此行为,该属性默认为 .send
PublishSubscribeChannel
false
MessagingTemplate
MessageDeliveryException
send
minSubscribers
0
如果使用 ,则仅使用正确数量的订阅者进行此确定,因为消息的实际处理是异步执行的。TaskExecutor |
QueueChannel
该实现包装了一个队列。
与 不同,具有点对点语义。
换句话说,即使通道有多个消费者,也只有一个消费者应该接收发送到该通道的任何消费者。
它提供了一个默认的无参数构造函数(提供基本上无限的容量)以及一个接受队列容量的构造函数,如下面的清单所示:QueueChannel
PublishSubscribeChannel
QueueChannel
Message
Integer.MAX_VALUE
public QueueChannel(int capacity)
未达到其容量限制的通道将消息存储在其内部队列中,并且该方法会立即返回,即使没有接收方准备好处理该消息也是如此。
如果队列已达到容量上限,则发送方将阻止,直到队列中有 room 可用。
或者,如果您使用具有附加 timeout 参数的 send 方法,则队列将阻止,直到任一房间可用或超时期限已过(以先发生者为准)。
同样,如果队列中有消息可用,则调用会立即返回,但是,如果队列为空,则 receive 调用可能会阻止,直到消息可用或超时(如果提供)过去。
在任何一种情况下,都可以通过传递超时值 0 来强制立即返回,而不管队列的状态如何。
但是请注意,对 和 的 versions 的调用会无限期地阻止。send(Message<?>)
receive()
send()
receive()
timeout
PriorityChannel
虽然 强制执行先进先出 (FIFO) 排序,但 是一种替代实现,允许根据优先级在通道内对消息进行排序。
默认情况下,优先级由每条消息中的标头决定。
但是,对于自定义优先级确定逻辑,可以向构造函数提供 type 的 comparator。QueueChannel
PriorityChannel
priority
Comparator<Message<?>>
PriorityChannel
RendezvousChannel
这将启用“直接切换”场景,其中发送方会阻塞,直到另一方调用通道的方法。
另一方会阻止,直到发送方发送消息。
在内部,此实现与 非常相似,不同之处在于它使用 (零容量实现)。
这在发送方和接收方在不同的线程中操作的情况下效果很好,但异步将消息放入队列中是不合适的。
换句话说,使用 ,发送方知道某个接收方已经接受了该消息,而使用 ,该消息将存储到内部队列中,并且可能永远不会收到。RendezvousChannel
receive()
QueueChannel
SynchronousQueue
BlockingQueue
RendezvousChannel
QueueChannel
请记住,默认情况下,所有这些基于队列的通道都仅将消息存储在内存中。
当需要持久性时,可以在 'queue' 元素中提供 'message-store' 属性来引用持久性实现,也可以将本地通道替换为由持久性代理(如 JMS 支持的通道或通道适配器)支持的通道。
后一个选项允许您利用任何 JMS 提供程序的实现来实现消息持久性,如 JMS 支持中所述。
但是,当不需要在队列中缓冲时,最简单的方法是依赖 ,将在下一节中讨论。MessageStore DirectChannel |
这对于实现 request-reply 操作也很有用。
发送者可以创建一个临时的匿名实例,然后在构建 .
发送后,发送者可以立即调用(可选地提供超时值)以便在等待回复时阻止。
这与 Spring 集成的许多请求-回复组件内部使用的实现非常相似。RendezvousChannel
RendezvousChannel
Message
Message
receive
Message
DirectChannel
具有点对点语义,但其他方面比前面描述的任何基于队列的通道实现更类似于 。
它实现接口而不是接口,因此它将消息直接分派给订阅者。
但是,作为点对点通道,它与 的不同之处在于,它将每个通道发送到单个订阅的 。DirectChannel
PublishSubscribeChannel
SubscribableChannel
PollableChannel
PublishSubscribeChannel
Message
MessageHandler
除了是最简单的点对点通道选项之外,它最重要的功能之一是它使单个线程能够在通道的 “两侧” 执行操作。
例如,如果处理程序订阅 ,则向该通道发送 将在方法调用返回之前直接在发送方的线程中触发该处理程序的方法的调用。DirectChannel
Message
handleMessage(Message)
send()
提供具有此行为的通道实现的主要动机是支持必须跨越通道的事务,同时仍然受益于通道提供的抽象和松散耦合。
如果在事务范围内调用调用,则处理程序调用的结果(例如,更新数据库记录)在确定该事务的最终结果(提交或回滚)方面发挥作用。send()
由于这是最简单的选项,并且不会增加调度和管理 Poller 线程所需的任何额外开销,因此它是 Spring Integration 中的默认通道类型。
一般的思路是定义应用程序的通道,考虑哪些通道需要提供缓冲或限制输入,并将这些通道修改为 queue-based 。
同样,如果通道需要广播消息,则它不应是 a,而应是 .
稍后,我们将展示如何配置这些通道中的每一个。DirectChannel PollableChannels DirectChannel PublishSubscribeChannel |
内部委托给消息调度程序以调用其订阅的消息处理程序,并且该调度程序可以具有由 or 属性公开的负载平衡策略(互斥)。
消息调度器使用负载平衡策略来帮助确定当多个消息处理程序订阅同一通道时,如何在消息处理程序之间分发消息。
为方便起见,该属性公开了一个值枚举,这些值指向预先存在的 .
A (轮换处理程序之间的负载均衡) 和 (对于想要显式禁用负载均衡的情况) 是唯一可用的值。
将来的版本中可能会添加其他策略实现。
但是,从版本 3.0 开始,您可以提供自己的实现,并使用该属性注入它,该属性应指向实现的 bean,如下例所示:DirectChannel
load-balancer
load-balancer-ref
load-balancer
LoadBalancingStrategy
round-robin
none
LoadBalancingStrategy
load-balancer-ref
LoadBalancingStrategy
A 是仅支持无法取消订阅的单个订阅者的 a。
这对于不涉及其他订阅者且不需要通道拦截器的高吞吐量性能使用案例非常有用。FixedSubscriberChannel
SubscribableChannel
MessageHandler
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
请注意,和 属性是互斥的。load-balancer
load-balancer-ref
负载均衡还与 boolean 属性结合使用。
如果值为 true(默认值),则当前面的处理程序引发异常时,调度程序将回退到任何后续处理程序(根据需要)。
订单由处理程序本身定义的可选 order 值确定,如果不存在此类值,则由处理程序订阅的顺序确定。failover
failover
如果某种情况要求 Dispatcher 始终尝试调用第一个处理程序,然后在每次发生错误时以相同的固定顺序回退,则不应提供负载平衡策略。
换句话说,即使未启用负载均衡,调度程序仍支持 boolean 属性。
但是,如果没有负载平衡,处理程序的调用总是根据它们的顺序从第一个开始。
例如,当有 primary、secondary、tritiary 等的明确定义时,此方法效果很好。
使用命名空间支持时,任何端点上的属性都会确定顺序。failover
order
请记住,负载平衡仅在通道具有多个订阅消息处理程序时应用。
当使用 namespace 支持时,这意味着多个 endpoint 共享 attribute 中定义的相同通道引用。failover input-channel |
从版本 5.2 开始,当为 true 时,当前处理程序的失败以及失败的消息将分别记录在 or if configured 下。failover
debug
info
ExecutorChannel
这是一个点对点通道,支持与(负载平衡策略和 boolean 属性)相同的调度程序配置。
这两种调度通道类型之间的主要区别在于,委托给 的实例来执行调度。
这意味着 send 方法通常不会阻塞,但这也意味着处理程序调用可能不会发生在发送方的线程中。
因此,它不支持跨 sender 和 receiving handler 的事务。ExecutorChannel
DirectChannel
failover
ExecutorChannel
TaskExecutor
发件人有时可能会阻止。
例如,当将 a 与限制客户端的拒绝策略(如 )一起使用时,发送方的线程可以在线程池达到其最大容量且执行程序的工作队列已满时执行该方法。
由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行交易。TaskExecutor ThreadPoolExecutor.CallerRunsPolicy |
PartitionedChannel
从版本 6.1 开始,提供了一个实现。
这是点对点调度逻辑的扩展,并表示点对点调度逻辑,其中实际消耗在特定线程上处理,由从发送到此通道的消息评估的分区键确定。
此通道类似于上面提到的通道,但不同之处在于具有相同分区键的消息始终在同一线程中处理,同时保持 Sequences。
它不需要 external ,但可以使用自定义(例如 )进行配置 。
此工厂用于将单线程执行程序填充到每个分区的委托中。
默认情况下,消息标头用作分区键。
此通道可以配置为简单的 bean:PartitionedChannel
AbstractExecutorChannel
ExecutorChannel
TaskExecutor
ThreadFactory
Thread.ofVirtual().name("partition-", 0).factory()
MessageDispatcher
IntegrationMessageHeaderAccessor.CORRELATION_ID
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
通道将具有分区 - 专用线程;将使用标头来确定将在哪个分区中处理消息。
有关更多信息,请参见 class Javadocs。3
partitionKey
PartitionedChannel
FluxMessageChannel
这是一种将消息发送到内部供下游反应式订阅者按需使用的实现。
此 channel implementation 既不是 , 也不是 a ,因此只能使用实例从此 channel 中消费,以遵守反应流的背压性质。
另一方面,它通过其 Contract 实现 a,允许从反应式源发布者接收事件,将反应式流桥接到集成流中。
为了实现整个集成流的完全反应行为,必须在流中的所有端点之间放置这样的通道。FluxMessageChannel
org.reactivestreams.Publisher
"sinking"
reactor.core.publisher.Flux
SubscribableChannel
PollableChannel
org.reactivestreams.Subscriber
FluxMessageChannel
ReactiveStreamsSubscribableChannel
subscribeTo(Publisher<Message<?>>)
有关与 Reactive Streams 交互的更多信息,请参阅 Reactive Streams Support 。
作用域通道
Spring Integration 1.0 提供了一个实现,但从 2.0 开始已被删除。
现在,处理相同需求的更通用方法是向 channel 添加一个 attribute。
该属性的值可以是上下文中可用的范围的名称。
例如,在 Web 环境中,某些范围可用,并且任何自定义范围实现都可以注册到上下文中。
下面的示例展示了一个应用于通道的线程本地作用域,包括作用域本身的注册:ThreadLocalChannel
scope
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
上一个示例中定义的通道也在内部委托给队列,但该通道绑定到当前线程,因此队列的内容也类似地绑定。
这样,发送到通道的线程稍后可以接收这些相同的消息,但其他线程将无法访问它们。
虽然很少需要线程范围的通道,但在实例用于强制执行单个操作线程但任何回复消息都应发送到“终端”通道的情况下,它们可能很有用。
如果该终端通道是线程范围的,则原始发送线程可以从终端通道收集其回复。DirectChannel
现在,由于任何通道都可以被限定范围,因此除了 thread-Local 之外,您还可以定义自己的范围。
通道拦截器
消息传递体系结构的一个优点是能够提供常见行为,并以非侵入性方式捕获有关通过系统传递的消息的有意义信息。
由于实例是发送到实例和从实例接收的,因此这些通道提供了拦截发送和接收操作的机会。
策略接口(如下面的清单所示)为这些操作中的每一个提供了方法:Message
MessageChannel
ChannelInterceptor
public interface ChannelInterceptor {
Message<?> preSend(Message<?> message, MessageChannel channel);
void postSend(Message<?> message, MessageChannel channel, boolean sent);
void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);
boolean preReceive(MessageChannel channel);
Message<?> postReceive(Message<?> message, MessageChannel channel);
void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}
实现接口后,向 channel 注册拦截器只需进行以下调用即可:
channel.addInterceptor(someChannelInterceptor);
返回实例的方法可用于转换 或 可以返回 'null' 以防止进一步处理(当然,任何方法都可以抛出 a )。
此外,该方法还可以返回以防止接收操作继续进行。Message
Message
RuntimeException
preReceive
false
请记住,调用仅与 相关。
事实上,该接口甚至没有定义方法。
这样做的原因是,当 a 被发送到 时,它会直接发送给零个或多个订阅者,具体取决于通道的类型(例如,
A 发送给其所有订阅者)。
因此,仅当将侦听器应用于 .receive() PollableChannels SubscribableChannel receive() Message SubscribableChannel PublishSubscribeChannel preReceive(…) postReceive(…) afterReceiveCompletion(…) PollableChannel |
Spring 集成还提供了 Wire Tap 模式的实现。
它是一个简单的拦截器,可以将 发送到另一个通道,而不会改变现有流。
它对于调试和监控非常有用。
Wire Tap 中显示了一个示例。Message
由于很少需要实现所有拦截器方法,因此该接口提供无操作方法(返回方法没有代码,-returning 方法按原样返回,方法返回)。void
Message
Message
boolean
true
拦截器方法的调用 Sequences 取决于通道的类型。
如前所述,基于队列的通道是唯一首先拦截该方法的通道。
此外,发送和接收拦截之间的关系取决于单独的发送方和接收方线程的计时。
例如,如果接收方在等待消息时已被阻止,则顺序可能如下所示:、、、、。
但是,如果接收方在发送方在通道上放置消息并已返回后进行轮询,则顺序将如下所示:、(经过一段时间)、、。
在这种情况下,经过的时间取决于许多因素,因此通常是不可预测的(事实上,接收可能永远不会发生)。
队列的类型也起着一定的作用(例如,rendezvous 与 priority)。
简而言之,您不能依赖超出 precedes 和 precedes 的事实之外的顺序。receive() preSend preReceive postReceive postSend preSend postSend preReceive postReceive preSend postSend preReceive postReceive |
从 Spring Framework 4.1 和 Spring Integration 4.1 开始,提供了新的方法:和.
它们在调用后调用,而不管引发的任何异常如何,这允许资源清理。
请注意,通道以与 initial 和 calls 相反的顺序调用列表中的这些方法。ChannelInterceptor
afterSendCompletion()
afterReceiveCompletion()
send()' and 'receive()
ChannelInterceptor
preSend()
preReceive()
从版本 5.1 开始,全局通道拦截器现在适用于动态注册的通道 - 例如通过使用 Java DSL 初始化的 bean 或在使用 Java DSL 时初始化的 bean。
以前,在刷新应用程序上下文后创建 bean 时,不会应用拦截器。beanFactory.initializeBean()
IntegrationFlowContext
此外,从版本 5.1 开始,当未收到消息时,不再调用;不再需要检查 .
以前,该方法被调用。
如果你有一个依赖于先前行为的拦截器,请改为实现,因为无论是否收到消息,都会调用该方法。ChannelInterceptor.postReceive()
null
Message<?>
afterReceiveCompleted()
从版本 5.2 开始,Spring Messaging 模块已被弃用,现在它扩展了该模块以实现向后兼容性。ChannelInterceptorAware InterceptableChannel |
MessagingTemplate
当引入端点及其各种配置选项时, Spring 集成为消息传递组件提供了一个基础,该组件支持从消息传递系统非侵入性地调用应用程序代码。
但是,有时需要从应用程序代码中调用消息传送系统。
为了在实现此类用例时方便,Spring 集成提供了一个支持跨消息通道的各种操作,包括请求和回复场景。
例如,可以发送请求并等待回复,如下所示:MessagingTemplate
MessagingTemplate template = new MessagingTemplate();
Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));
在前面的示例中,模板将在内部创建一个临时匿名通道。 还可以在模板上设置 'sendTimeout' 和 'receiveTimeout' 属性,并且还支持其他交换类型。 下面的清单显示了此类方法的签名:
public boolean send(final MessageChannel channel, final Message<?> message) { ...
}
public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}
public Message<?> receive(final PollableChannel<?> channel) { ...
}
Enter the GatewayProxyFactoryBean 中描述了一种侵入性较小的方法,该方法允许你使用payload或header值而不是实例来调用简单的接口。Message |
配置消息通道
要创建消息通道实例,可以使用 xml 元素或 Java 配置实例,如下所示:<channel/>
DirectChannel
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
当您使用不带任何子元素的元素时,它会创建一个实例 (a )。<channel/>
DirectChannel
SubscribableChannel
要创建发布-订阅通道,请使用元素(在 Java 中为 the),如下所示:<publish-subscribe-channel/>
PublishSubscribeChannel
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
您也可以提供各种子元素来创建任何可轮询的通道类型(如 消息通道实现中所述)。
以下部分显示了每种通道类型的示例。<queue/>
DirectChannel
配置
如前所述, 是默认类型。
下面的清单显示了定义谁:DirectChannel
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
默认通道具有循环负载均衡器,并且还启用了故障转移(有关更多详细信息,请参阅 DirectChannel
)。
要禁用其中一项或两项,请添加子元素( 的构造函数 )并按如下方式配置属性:<dispatcher/>
LoadBalancingStrategy
DirectChannel
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
数据类型 Channel 配置
有时,使用者只能处理特定类型的有效负载,这迫使您确保输入消息的有效负载类型。 首先想到的可能是使用消息过滤器。 但是,消息筛选器所能做的只是筛选出不符合使用者要求的消息。 另一种方法是使用基于内容的路由器,并将具有不合规数据类型的消息路由到特定的转换器,以强制转换和转换为所需的数据类型。 这将有效,但完成相同任务的更简单方法是应用 Datatype Channel 模式。 您可以为每个特定的负载数据类型使用单独的数据类型通道。
要创建仅接受包含特定有效负载类型的消息的数据类型通道,请在 channel 元素的属性中提供数据类型的完全限定类名,如下例所示:datatype
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
请注意,对于可分配给通道数据类型的任何类型的类型,类型检查都会通过。
换句话说,前面示例中的 the 将接受有效负载为 或 的消息。
可以将多个类型作为逗号分隔的列表提供,如下例所示:numberChannel
java.lang.Integer
java.lang.Double
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
因此,前面示例中的 'numberChannel' 只接受数据类型为 .
但是,如果消息的有效负载不是 required 类型,会发生什么情况呢?
这取决于你是否定义了一个名为 Spring 的 Conversion Service 实例的 bean。
如果没有,那么将立即抛出 an。
但是,如果已定义 Bean,则尝试将 Bean 用于将消息的有效负载转换为可接受的类型。java.lang.Number
integrationConversionService
Exception
integrationConversionService
您甚至可以注册自定义转换器。
例如,假设您将一条带有有效负载的消息发送到我们上面配置的 'numberChannel'。
您可以按如下方式处理该消息:String
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常,这将是一个完全合法的操作。 但是,由于我们使用 Datatype Channel,因此此类操作的结果将生成类似于以下内容的异常:
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
发生异常的原因是我们要求有效负载类型为 ,但我们发送了 .
所以我们需要一些东西来将 a 转换为 .
为此,我们可以实现类似于以下示例的转换器:Number
String
String
Number
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
然后,我们可以将其注册为 Integration Conversion Service 的转换器,如下例所示:
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
或者在类上标记有用于自动扫描的注释时。StringToIntegerConverter
@Component
当 'converter' 元素被解析时,如果尚未定义 bean,它会创建 bean。
有了该转换器,该操作现在将成功,因为数据类型通道使用该转换器将有效负载转换为 .integrationConversionService
send
String
Integer
有关负载类型转换的更多信息,请参阅负载类型转换。
从版本 4.0 开始,由 调用,它在应用程序上下文中查找转换服务。
要使用其他转换技术,您可以在通道上指定属性。
这必须是对 implementation 的引用。
仅使用方法。
它为转换器提供对消息标头的访问(如果转换可能需要标头中的信息,例如 )。
该方法只能返回已转换的有效负载或完整对象。
如果是后者,则转换器必须小心地从入站消息中复制所有 Headers。integrationConversionService
DefaultDatatypeChannelMessageConverter
message-converter
MessageConverter
fromMessage
content-type
Message
或者,您可以声明 ID 为 的 type ,并且该转换器由具有 .<bean/>
MessageConverter
datatypeChannelMessageConverter
datatype
QueueChannel
配置
要创建 ,请使用 sub-element.
您可以按如下方式指定通道的容量:QueueChannel
<queue/>
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
如果您没有为此子元素的 'capacity' 属性提供值,则生成的队列是无限的。
为避免内存不足等问题,我们强烈建议您为有界队列设置显式值。<queue/> |
持久配置QueueChannel
由于 a 提供了缓冲消息的功能,但默认情况下仅在内存中缓冲,因此它还引入了在系统故障时消息可能会丢失的可能性。
为了降低这种风险,a 可能由策略接口的持续实现提供支持。
有关 和 的更多详细信息,请参阅 Message Store。QueueChannel
QueueChannel
MessageGroupStore
MessageGroupStore
MessageStore
使用该属性时不允许使用该属性。capacity message-store |
当 收到 时,它会将消息添加到消息存储中。
从 轮询 a 时,会将其从邮件存储中删除。QueueChannel
Message
Message
QueueChannel
默认情况下,a 将其消息存储在内存中队列中,这可能会导致前面提到的消息丢失情况。
但是, Spring Integration 提供了持久存储,例如 .QueueChannel
JdbcChannelMessageStore
您可以通过添加属性来为 any 配置消息存储,如下例所示:QueueChannel
message-store
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(有关Java/Kotlin配置选项,请参阅以下示例。
Spring 集成 JDBC 模块还为许多流行的数据库提供了模式数据定义语言(DDL)。
这些模式位于该模块的org.springframework.integration.jdbc.store.channel包中()。spring-integration-jdbc
一个重要的功能是,对于任何事务性持久存储(例如 ),只要 Poller 配置了事务,只有在事务成功完成时,才能永久删除从存储中删除的消息。
否则,事务将回滚,并且不会丢失。JdbcChannelMessageStore Message |
随着越来越多的与 “NoSQL” 数据存储相关的 Spring 项目开始为这些存储提供底层支持,可以使用消息存储的许多其他实现。
如果找不到满足您特定需求的接口,您也可以提供自己的接口实现。MessageGroupStore
从版本 4.0 开始,我们建议将实例配置为尽可能使用 。
与一般邮件存储相比,这些存储通常针对此用途进行了优化。
如果 是 a ,则按优先级顺序在 FIFO 中接收消息。
优先级的概念由 message store 实现确定。
例如,以下示例显示了 MongoDB 通道消息存储的 Java 配置:QueueChannel
ChannelMessageStore
ChannelMessageStore
ChannelPriorityMessageStore
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlow.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
注意类。
这是使用 operations 的实现。MessageGroupQueue BlockingQueue MessageGroupStore |
自定义环境的另一个选项由 sub-element 的属性或其特定构造函数提供。
此属性提供对任何 implementation 的引用。
例如,Hazelcast 分布式 IQueue
可以按如下方式进行配置:QueueChannel
ref
<int:queue>
java.util.Queue
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel
配置
要创建 ,请使用 <publish-subscribe-channel/> 元素。
使用此元素时,您还可以指定用于发布消息的 (如果未指定,它将在发件人的线程中发布),如下所示:PublishSubscribeChannel
task-executor
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
如果提供 下游的 重排序器或聚合器 ,则可以将通道上的 “apply-sequence” 属性设置为 。
这样做表示通道应在传递消息之前设置 and 消息标头以及相关 ID。
例如,如果有五个订阅者,则 将设置为 ,并且消息的标头值范围为 到 。PublishSubscribeChannel
true
sequence-size
sequence-number
sequence-size
5
sequence-number
1
5
除了 之外,您还可以配置 .
默认情况下,它使用 implementation 将错误从 header 发送到 或 global 实例。
如果未配置 an,则忽略 ,并将异常直接抛出到调用方的线程中。Executor
ErrorHandler
PublishSubscribeChannel
MessagePublishingErrorHandler
MessageChannel
errorChannel
errorChannel
Executor
ErrorHandler
如果提供 或 downstream ,则可以将渠道上的“apply-sequence”属性设置为 。
这样做表示通道应在传递消息之前设置 sequence-size 和 sequence-number 消息头以及相关 ID。
例如,如果有五个订阅者,则 sequence-size 将设置为 ,并且消息将具有 sequence-number 标头值,范围从 到 。Resequencer
Aggregator
PublishSubscribeChannel
true
5
1
5
以下示例显示如何将标头设置为 :apply-sequence
true
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
该值是默认的,以便发布-订阅通道可以将完全相同的消息实例发送到多个出站通道。
由于 Spring Integration 强制执行有效负载和 Headers 引用的不变性,因此当标志设置为 时,通道会创建具有相同有效负载引用但不同 Headers 值的新实例。apply-sequence false true Message |
从版本 5.4.3 开始,还可以配置 its 选项,以指示此通道在没有订阅者时不会静默忽略消息。
当没有订阅者时,将引发带有消息的 A,并且此选项设置为 。PublishSubscribeChannel
requireSubscribers
BroadcastingDispatcher
MessageDispatchingException
Dispatcher has no subscribers
true
ExecutorChannel
要创建 ,请添加具有属性的子元素。
该属性的值可以引用上下文中的 any。
例如,这样做可以启用线程池的配置,以便将消息分派给订阅的处理程序。
如前所述,这样做会打破发送方和接收方之间的单线程执行上下文,以便处理程序的调用不会共享任何活动的事务上下文(即,处理程序可能会抛出,但调用已经成功返回)。
下面的示例演示如何使用元素并在属性中指定执行程序:ExecutorChannel
<dispatcher>
task-executor
TaskExecutor
Exception
send
dispatcher
task-executor
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
和 选项在 <dispatcher/> 子元素上也可用,如前面的
|
PriorityChannel
配置
要创建 ,请使用 sub-element,如下例所示:PriorityChannel
<priority-queue/>
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
默认情况下,通道会查询消息的 Headers。
但是,您可以改为提供自定义引用。
另外,请注意 (像其他类型一样) 确实支持 attribute 。
与 一样,它也支持属性。
以下示例演示了所有这些:priority
Comparator
PriorityChannel
datatype
QueueChannel
capacity
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
从版本 4.0 开始,子元素支持选项 ( ,在这种情况下不允许使用)。
邮件存储必须是 .
目前为 、 和 提供了 的实现。
有关更多信息,请参阅 QueueChannel
Configuration 和 Message Store 。
您可以在 Backing Message Channels 中找到示例配置。priority-channel
message-store
comparator
capacity
PriorityCapableChannelMessageStore
PriorityCapableChannelMessageStore
Redis
JDBC
MongoDB
RendezvousChannel
配置
当队列子元素为 时创建 A 。
它不提供前面描述的配置选项的任何其他配置选项,并且其队列不接受任何容量值,因为它是零容量直接切换队列。
以下示例演示如何声明 :RendezvousChannel
<rendezvous-queue>
RendezvousChannel
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
通道拦截器配置
消息通道也可能具有拦截器,如 通道拦截器中所述。
子元素可以添加到 (或更具体的元素类型) 。
你可以提供该属性来引用实现该接口的任何 Spring 托管对象,如下例所示:<interceptors/>
<channel/>
ref
ChannelInterceptor
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
通常,我们建议在单独的位置定义拦截器实现,因为它们通常提供可在多个通道之间重用的常见行为。
全局通道拦截器配置
Channel interceptors 提供了一种简洁明了的方式来为每个单独的 Channel 应用横切行为。 如果应该在多个 channel 上应用相同的行为,则为每个 channel 配置相同的拦截器集将不是最有效的方法。 为了避免重复配置,同时使拦截器能够应用于多个通道, Spring 集成提供了全局拦截器。 请考虑以下一对示例:
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
每个元素都允许您定义一个全局侦听器,该侦听器应用于与属性定义的任何模式匹配的所有通道。
在前面的情况下,全局拦截器应用于 'thing1' 通道和所有其他以 'thing2' 或 'input' 开头的通道,但不应用于以 'thing3' 开头的通道(从 5.0 版本开始)。<channel-interceptor/>
pattern
将此语法添加到模式中会导致一个可能的(尽管可能不太可能)问题。
如果你有一个名为 bean 的 bean,并且你在通道拦截器的模式中包含了 的模式,那么它不再匹配。
该模式现在匹配所有未命名的 bean。
在这种情况下,您可以使用 .
该模式与名为 .!thing1 !thing1 pattern thing1 ! \ \!thing1 !thing1 |
order 属性允许您管理当给定通道上有多个拦截器时,此拦截器的注入位置。 例如,通道 'inputChannel' 可以在本地配置单独的拦截器(见下文),如下例所示:
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
一个合理的问题是“相对于本地配置的其他拦截器或通过其他全局拦截器定义,全局拦截器是如何注入的?
当前的实现提供了一种简单的机制来定义拦截器执行的顺序。
属性中的正数确保在任何现有拦截器之后注入拦截器,而负数确保拦截器在现有拦截器之前注入。
这意味着,在前面的示例中,全局拦截器被注入在本地配置的 'wire-tap' 拦截器之后(因为它大于)。
如果有另一个全局拦截器与 matching ,则其顺序将通过比较两个拦截器的属性值来确定。
要在现有拦截器之前注入全局拦截器,请对属性使用负值。order
order
0
pattern
order
order
请注意,和 属性都是可选的。
的默认值为 0,的默认值为 '*'(以匹配所有通道)。order pattern order pattern |
丝锥
如前所述, Spring 集成提供了一个简单的 wire tap 拦截器。
您可以在元素中的任何通道上配置接线。
这样做对于调试特别有用,并且可以与 Spring 集成的日志记录通道适配器结合使用,如下所示:<interceptors/>
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter'还接受'expression'属性,以便您可以根据'payload'和'headers'变量评估 SPEL 表达式。
或者,要记录完整的消息结果,请为 'log-full-message' 属性提供值 。
默认情况下,仅记录有效负载。
将其设置为 enable logging all headers s go to payload.
'expression' 选项提供了最大的灵活性(例如, )。toString() true false true expression="payload.user.name" |
关于 wire tap 和其他类似组件(消息发布配置)的一个常见误解是,它们在本质上是自动异步的。 默认情况下,作为组件的 wire tap 不会异步调用。 相反, Spring 集成专注于配置异步行为的单一统一方法:消息通道。 使消息流的某些部分同步或异步的是在该流中配置的 Message Channel 的类型。 这是消息通道抽象的主要好处之一。 从框架成立之初,我们就一直强调消息通道作为框架的一等公民的需求和价值。 它不仅仅是 EIP 模式的内部隐式实现。 它作为可配置组件完全公开给最终用户。 因此,Wire Tap 组件仅负责执行以下任务:
-
通过点击通道(例如
channelA
) -
抓取每条消息
-
将消息发送到另一个通道(例如
channelB
)
它本质上是桥接模式的变体,但它封装在通道定义中(因此更容易在不中断流的情况下启用和禁用)。 此外,与桥接不同,它基本上是分叉另一个消息流。 该流是同步的还是异步的?答案取决于 'channelB' 的消息通道类型。 我们有以下选项:direct channel、pollable channel 和 executor channel。 最后两个打破了线程边界,使通过此类通道的通信异步,因为将消息从该通道分派到其订阅的处理程序发生在与用于将消息发送到该通道的线程不同的线程上。 这就是使您的 wire-tap 流同步或异步的原因。 它与框架中的其他组件(比如消息发布者)一致,并且通过让您无需提前担心(除了编写线程安全代码)特定代码段应该作为同步还是异步实现,从而增加了一定程度的一致性和简单性。 两个代码段(比如组件 A 和组件 B)在消息通道上的实际连接使它们的协作同步或异步。 你甚至可能希望将来从 synchronous 更改为 asynchronous ,而 message channel 让你无需接触代码即可快速完成。
关于窃听的最后一点是,尽管上面提供了默认情况下不异步的基本原理,但您应该记住,通常希望尽快传递消息。 因此,使用 asynchronous channel 选项作为 wire tap 的出站通道是很常见的。 但是,默认情况下不强制实施异步行为。 如果我们这样做,有许多用例会中断,包括您可能不想打破事务边界。 也许您使用 wire tap 模式进行审计,并且您确实希望在原始事务中发送审计消息。 例如,您可以将 wire tap 连接到 JMS 出站通道适配器。 这样,您可以获得两全其美的效果:1) JMS 消息的发送可以在事务中进行,而 2) 它仍然是一个 “即发即弃” 操作,从而防止主消息流中出现任何明显的延迟。
从版本 4.0 开始,当侦听器(例如 WireTap 类)引用通道时避免循环引用非常重要。
您需要将此类 channel 从当前拦截器拦截的 channels 中排除。
这可以通过适当的模式或编程方式完成。
如果您有引用 的自定义,请考虑实施 .
这样,框架会询问拦截器是否可以根据提供的模式拦截每个候选通道。
您还可以在拦截器方法中添加运行时保护,以确保通道不是拦截器引用的通道。
这同时使用了这两种技术。ChannelInterceptor channel VetoCapableInterceptor WireTap |
从版本 4.3 开始,具有采用 a 而不是 instance 的其他构造函数。
这对于 Java 配置以及使用通道自动创建逻辑时非常方便。
目标 bean 在稍后提供的 中解析,在与
拦截 器。WireTap
channelName
MessageChannel
MessageChannel
channelName
通道解析需要一个 ,因此 wire tap 实例必须是 Spring 管理的 bean。BeanFactory |
这种后期绑定方法还允许使用 Java DSL 配置简化典型的窃听模式,如下例所示:
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
条件接线器
可以使用 or 属性将 Wire Tap 设为有条件。
该 Bean 引用一个 Bean,该 Bean 可以在运行时确定消息是否应转到 tap 通道。
同样, is a boolean SpEL 表达式,执行相同的目的:如果表达式的计算结果为 ,则消息将发送到 tap 通道。selector
selector-expression
selector
MessageSelector
selector-expression
true
全局 Wire Tap 配置
可以将全局 wire tap 配置为 Global Channel Interceptor Configuration 的特殊情况。
为此,请配置一个 top level 元素。
现在,除了正常的命名空间支持之外,还支持 and 属性,并且其工作方式与它们对 .
以下示例说明如何配置全局 Wire Tap:wire-tap
wire-tap
pattern
order
channel-interceptor
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局 Wire Tap 提供了一种在外部配置单通道 Wire Tap 而无需修改现有通道配置的便捷方法。
为此,请将 该属性设置为 target channel name。
例如,您可以使用此技术配置测试用例以验证通道上的消息。pattern |
特殊频道
默认情况下,在应用程序上下文中定义了两个特殊通道:和 。
'nullChannel' ( 的实例 ) 的作用类似于 ,记录在该级别发送给它的任何消息并立即返回。
特殊处理适用于传输消息的有效负载:它立即在此通道中订阅,以启动反应流处理,尽管数据被丢弃。
从反应流处理(请参阅 )引发的错误记录在该级别下,以便进行可能的调查。
如果需要对此类错误执行任何操作,则可以将具有自定义项的 应用于生成回复 this 的消息处理程序。
任何时候,当你遇到一个你不关心的回复的通道解析错误时,你可以将受影响的组件的属性设置为 'nullChannel' (名称 'nullChannel' 在应用程序上下文中保留)。errorChannel
nullChannel
NullChannel
/dev/null
DEBUG
org.reactivestreams.Publisher
Subscriber.onError(Throwable)
warn
ReactiveRequestHandlerAdvice
Mono.doOnError()
Mono
nullChannel
output-channel
'errorChannel' 在内部用于发送错误消息,并且可以被自定义配置覆盖。 错误处理中对此进行了更详细的讨论。
有关消息通道和拦截器的更多信息,另请参阅 Java DSL 一章中的消息通道。
轮询器
本节描述了 Spring Integration 中 polling 的工作原理。
轮询消费者
当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一:
实际实现取决于这些终端节点连接到的通道类型。
连接到实现 org.springframework.messaging.SubscribableChannel
接口的通道适配器会生成一个 .
另一方面,连接到实现 org.springframework.messaging.PollableChannel
接口(例如 a )的通道适配器会生成一个 .EventDrivenConsumer
QueueChannel
PollingConsumer
轮询使用者允许 Spring 集成组件主动轮询消息,而不是以事件驱动的方式处理消息。
在许多消息传递方案中,它们代表一个关键的横切关注点。 在 Spring Integration 中,轮询使用者基于同名模式,Gregor Hohpe 和 Bobby Woolf 在 Enterprise Integration Patterns 一书中对此进行了描述。 您可以在该书的网站上找到该模式的描述。
Pollable 消息源
Spring 集成提供了轮询消费者模式的第二种变体。
使用入站通道适配器时,这些适配器通常由 .
例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器中描述的适配器配置了 Poller 以定期检索消息。
因此,当组件使用 Poller 进行配置时,生成的实例是以下类型之一:SourcePollingChannelAdapter
这意味着轮询器用于入站和出站消息传递方案。 以下是使用 Poller 的一些用例:
-
轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务
-
轮询内部(可轮询)消息通道
-
轮询内部服务(例如在 Java 类上重复执行方法)
AOP 建议类可以应用于 Poller ,例如用于启动事务的事务通知。
从版本 4.1 开始,提供了 a。
轮询器使用触发器来确定下一次轮询的时间。
这可用于抑制 (跳过) 轮询,可能是因为存在一些下游条件会阻止消息被处理。
要使用此建议,您必须为其提供 .
从版本 4.2.5 开始,提供了 a。
要使用它,你可以将实例作为 bean 添加到应用程序上下文中,将其注入到 ,并将其添加到 Poller 的建议链中。
要跳过轮询,请调用 。
要恢复轮询,请调用 。
版本 4.2 在此领域增加了更多灵活性。
参见 Message Sources 的条件轮询器。advice-chain PollSkipAdvice PollSkipAdvice PollSkipStrategy SimplePollSkipStrategy PollSkipAdvice skipPolls() reset() |
延迟确认轮询消息源
从版本 5.0.1 开始,某些模块提供的实现支持将确认推迟到下游流完成(或将消息移交给另一个线程)。
目前仅限于 和 .MessageSource
AmqpMessageSource
KafkaMessageSource
使用这些消息源,标头(请参阅 MessageHeaderAccessor
API)将添加到消息中。
当与可轮询消息源一起使用时,标头的值是 的实例,如下例所示:IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
AcknowledgmentCallback
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如 a )都支持该状态。
它的处理方式与 相同。KafkaMessageSource
REJECT
ACCEPT
应用程序可以随时确认消息,如下例所示:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果 the 被连接到 a ,当 poller 线程在下游流完成后返回到适配器时,适配器会检查确认是否已被确认,如果没有,则将其状态设置为它(或者 flow 是否引发异常)。
状态值在 AcknowledgmentCallback.Status
枚举中定义。MessageSource
SourcePollingChannelAdapter
ACCEPT
REJECT
Spring Integration 提供了对 .
这也负责设置或回调何时返回(或引发异常)。
以下示例演示如何使用 :MessageSourcePollingTemplate
MessageSource
ACCEPT
REJECT
AcknowledgmentCallback
MessageHandler
MessageSourcePollingTemplate
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在这两种情况下 ( 和 ),您都可以通过调用回调来禁用 auto ack/nack。
如果您将消息交给另一个线程并希望稍后确认,则可以执行此操作。
并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。SourcePollingChannelAdapter
MessageSourcePollingTemplate
noAutoAck()
消息源的条件轮询器
本节介绍如何使用条件 Poller。
背景
Advice
对象,在 ON POLLER 中,通知整个轮询任务(消息检索和处理)。
这些 “around advice” 方法无法访问 poll 的任何上下文 — 只能访问 poll 本身。
如前所述,这对于诸如使任务事务性或由于某些外部条件而跳过轮询等要求来说很好。
如果我们希望根据 poll 部分的结果采取一些行动,或者我们想根据条件调整 Poller 怎么办?对于这些实例, Spring 集成提供了“智能”轮询。advice-chain
receive
“智能”轮询
版本 5.3 引入了该界面。
中实现此接口的任何对象仅应用于操作 - 和 。
因此,它们只能应用于 或 。
此类实现以下方法:ReceiveMessageAdvice
Advice
advice-chain
receive()
MessageSource.receive()
PollableChannel.receive(timeout)
SourcePollingChannelAdapter
PollingConsumer
-
beforeReceive(Object source)
该方法在该方法之前调用。 它允许您检查和重新配置源。 返回将取消此轮询(类似于前面提到的)。Object.receive()
false
PollSkipAdvice
-
Message<?> afterReceive(Message<?> result, Object source)
该方法在该方法之后调用。 同样,您可以重新配置源或执行任何操作(可能取决于结果,如果源没有创建消息,则可能会有所不同)。 您甚至可以返回不同的消息receive()
null
线程安全
如果 an 更改了源,则不应使用 .
如果 an 更改了源,则此类更改不是线程安全的,并且可能会导致意外结果,尤其是对于高频轮询器。
如果需要并发处理 poll 结果,请考虑使用下游,而不是向 Poller 添加执行程序。 |
Advice Chain 订购
您应该了解在初始化期间如何处理通知链。 未实现的对象将应用于整个轮询过程,并且所有对象都首先按顺序调用,然后再调用 any 。
然后,围绕 source 方法按顺序调用对象。
例如,如果您有 objects 、 where 和 are ,则对象将按以下顺序应用: 。
此外,如果源已经是 ,则会在任何现有对象之后调用 。
如果您想更改订单,您必须自己连接代理。 |
SimpleActiveIdleReceiveMessageAdvice
此建议是 .
当与 一起使用时,它会根据上一次轮询是否生成消息来调整轮询频率。
poller 还必须具有对 same 的引用。ReceiveMessageAdvice
DynamicPeriodicTrigger
DynamicPeriodicTrigger
重要: 异步切换
SimpleActiveIdleReceiveMessageAdvice 根据结果修改触发器。
这仅在 poller 线程上调用通知时有效。
如果 Poller 具有 .
要在轮询结果后使用异步操作时使用此建议,请稍后执行异步切换,可能使用 .receive() task-executor ExecutorChannel |
CompoundTriggerAdvice
此建议允许根据 poll 是否返回消息来选择两个触发器之一。
考虑一个使用 . 实例是不可变的,因此一旦构建就无法更改。
考虑这样一个使用案例:我们希望使用 cron 表达式每小时触发一次轮询,但如果未收到任何消息,则每分钟轮询一次,并在检索到消息时恢复为使用 cron 表达式。CronTrigger
CronTrigger
为此,建议(和 poller)使用 a。
触发器的触发器可以是 .
当通知检测到未收到任何消息时,它会将辅助触发器添加到 .
调用实例的方法时,它会委托给辅助触发器(如果存在)。
否则,它将委托给主触发器。CompoundTrigger
primary
CronTrigger
CompoundTrigger
CompoundTrigger
nextExecutionTime
poller 还必须具有对 same 的引用。CompoundTrigger
以下示例显示了回退到每分钟的每小时 cron 表达式的配置:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
重要: 异步切换
CompoundTriggerAdvice 根据结果修改触发器。
这仅在 poller 线程上调用通知时有效。
如果 Poller 具有 .
要在轮询结果后使用异步操作时使用此建议,请稍后执行异步切换,可能使用 .receive() task-executor ExecutorChannel |
仅限 MessageSource 的建议
有些建议可能仅适用于 ,而对 则没有意义。
为此,仍然存在一个接口( 的扩展 )。
有关更多信息,请参见入站通道适配器:轮询多个服务器和目录。MessageSource.receive()
PollableChannel
MessageSourceMutator
ReceiveMessageAdvice
Channel Adapter
通道适配器是一个消息端点,它允许将单个发送方或接收方连接到消息通道。 Spring 集成提供了许多适配器来支持各种传输,例如 JMS、文件、HTTP、Web 服务、邮件等。 本参考指南的后续章节将讨论每个适配器。 但是,本章重点介绍简单但灵活的方法调用通道适配器支持。 有入站和出站适配器,每个适配器都可以使用 core 命名空间中提供的 XML 元素进行配置。 这些提供了一种扩展 Spring Integration 的简单方法,只要你有一个可以作为源或目标调用的方法。
配置入站通道适配器
元素(在 Java 配置中为 a)可以调用 Spring 管理对象上的任何方法,并在将方法的输出转换为 a 后将非 null 返回值发送到 。
激活适配器的订阅后,轮询器会尝试从源接收消息。
根据提供的配置,使用 调度 Poller。
要为单个通道适配器配置轮询间隔或 cron 表达式,你可以提供一个 'poller' 元素,其中包含一个调度属性,例如 'fixed-rate' 或 'cron'。
以下示例定义了两个实例:inbound-channel-adapter
SourcePollingChannelAdapter
MessageChannel
Message
TaskScheduler
inbound-channel-adapter
@Bean
public IntegrationFlow source1() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.fixedRate(5000)))
...
.get();
}
@Bean
public IntegrationFlow source2() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
...
.get();
}
public class SourceService {
@InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
Object method1() {
...
}
@InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
Object method2() {
...
}
}
@Bean
fun messageSourceFlow() =
integrationFlow( { GenericMessage<>(...) },
{ poller { it.fixedRate(5000) } }) {
...
}
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>
<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
<int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
另请参阅 Channel Adapter 表达式和脚本。
如果未提供 Poller,则必须在上下文中注册单个默认 Poller。 有关更多详细信息,请参阅 Endpoint Namespace Support 。 |
重要:轮询器配置
所有类型都由 a 提供支持,这意味着它们包含一个 poller 配置,该配置根据 Poller 中指定的配置轮询(以调用生成成为有效负载的值的自定义方法)。
以下示例显示了两个 Poller 的配置:
在第一个配置中,轮询任务每次轮询调用一次,并且在每个任务 (poll) 期间,根据属性值调用方法(导致消息生成)一次。
在第二种配置中,轮询任务每次轮询调用 10 次,或者直到它返回 'null',因此每次轮询可能会生成 10 条消息,而每次轮询的间隔为 1 秒。
但是,如果配置类似于以下示例,会发生什么情况:
请注意,没有指定。
正如我们稍后介绍的那样,(例如, , , , 和其他)中相同的 poller 配置将具有 for for 的默认值,这意味着“除非轮询方法返回null(可能是因为)中没有更多消息,否则不间断地执行轮询任务”,然后休眠一秒钟。 但是,在 中,它有点不同。
的默认值为 ,除非您将其显式设置为负值(如 )。
这确保了 Poller 可以对生命周期事件(例如启动和停止)做出反应,并防止它在自定义方法的实现可能永远不会返回 null 并且恰好是不可中断的情况下可能在无限循环中旋转。 但是,如果您确定您的方法可以返回 null,并且需要在每次轮询中轮询尽可能多的可用源,则应显式设置为负值,如下例所示:
从版本 5.5 开始,值 for 具有特殊含义 - 完全跳过调用,这可能被视为暂停此入站通道适配器,直到稍后将 for 更改为非零值,例如通过 Control Bus。 另请参阅 Global Default Poller 了解更多信息。 |
配置出站通道适配器
元素(用于 Java 配置)还可以将 a 连接到任何 POJO 消费者方法,该方法应该使用发送到该通道的消息的有效负载来调用。
以下示例说明如何定义出站通道适配器:outbound-channel-adapter
@ServiceActivator
MessageChannel
@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
return f -> f
.handle(myPojo, "handle");
}
public class MyPojo {
@ServiceActivator(channel = "channel1")
void handle(Object payload) {
...
}
}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
integrationFlow {
handle(myPojo, "handle")
}
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>
<beans:bean id="target" class="org.MyPojo"/>
如果要适配的通道是 ,则必须提供 poller 子元素( 上的 子注释 ),如下例所示:PollableChannel
@Poller
@ServiceActivator
public class MyPojo {
@ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
void handle(Object payload) {
...
}
}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
<int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>
<beans:bean id="target" class="org.MyPojo"/>
如果 POJO 使用者实现可以在其他定义中重用,则应使用属性。
但是,如果使用者实现仅由的单个定义引用,则可以将其定义为内部 bean,如下例所示:ref
<outbound-channel-adapter>
<outbound-channel-adapter>
<int:outbound-channel-adapter channel="channel" method="handle">
<beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
不允许在同一配置中同时使用 attribute 和 inner handler definition,因为它会产生不明确的条件。
此类配置会导致引发异常。ref <outbound-channel-adapter> |
可以在没有引用的情况下创建任何通道适配器,在这种情况下,它会隐式创建 .
创建的频道名称与 or 元素的属性匹配。
因此,如果未提供,则需要。channel
DirectChannel
id
<inbound-channel-adapter>
<outbound-channel-adapter>
channel
id
通道适配器表达式和脚本
与许多其他 Spring 集成组件一样,和 也提供了对 SPEL 表达式求值的支持。
要使用 SPEL,请在'expression'属性中提供表达式字符串,而不是提供用于在 Bean 上调用方法的'ref'和'method'属性。
计算表达式时,它遵循与 method-invocation 相同的协定,其中:每当评估结果为非 null 值时,an 的表达式都会生成一条消息,而 an 的表达式必须等效于返回 void 的方法调用。<inbound-channel-adapter>
<outbound-channel-adapter>
<inbound-channel-adapter>
<outbound-channel-adapter>
从 Spring Integration 3.0 开始,还可以使用 SPEL (甚至 a )子元素进行配置,因为需要比使用简单的 'expression' 属性所能实现的更复杂的事情。
如果使用 属性以脚本形式提供脚本,则还可以设置 ,从而允许定期刷新资源。
如果希望在每次轮询时检查脚本,则需要将此设置与 Poller 的触发器协调,如下例所示:<int:inbound-channel-adapter/>
<expression/>
<script/>
Resource
location
refresh-check-delay
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller max-messages-per-poll="1" fixed-delay="5000"/>
<script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>
另请参阅 when using the sub-element 上的属性。
有关表达式的更多信息,请参阅 Spring 表达式语言 (SpEL)。
有关脚本,请参阅 Groovy 支持和脚本支持。cacheSeconds
ReloadableResourceBundleExpressionSource
<expression/>
() 是一个端点,它通过定期触发以轮询某些底层 .
由于在轮询时没有消息对象,因此表达式和脚本无权访问 root ,因此大多数其他消息传递 SpEL 表达式中没有可用的有效负载或标头属性。
该脚本可以生成并返回带有 Headers 和 payload 的完整对象,也可以只生成并返回一个 payload,该 payload 由框架添加到具有基本 Headers 的消息中。<int:inbound-channel-adapter/> SourcePollingChannelAdapter MessageSource Message Message |
消息桥
消息桥是连接两个消息通道或通道适配器的相对简单的端点。
例如,您可能希望将 a 连接到 a,以便订阅终端节点不必担心任何轮询配置。
相反,消息桥提供轮询配置。PollableChannel
SubscribableChannel
通过在两个通道之间提供中间轮询器,您可以使用消息传递桥来限制入站消息。
Poller 的触发器确定消息到达第二个通道的速率,而 Poller 的属性对吞吐量实施限制。maxMessagesPerPoll
消息桥的另一个有效用途是连接两个不同的系统。 在这种情况下, Spring 集成的作用仅限于在这些系统之间建立连接并在必要时管理 Poller。 在两个系统之间至少有一个转换器,以便在它们的格式之间进行转换,这可能更常见。 在这种情况下,通道可以作为转换器端点的 'input-channel' 和 'output-channel' 提供。 如果不需要数据格式转换,则消息桥可能确实足够了。
使用 XML 配置 Bridge
您可以使用 element is 在两个消息通道或通道适配器之间创建消息桥。
为此,请提供 and 属性,如下例所示:<bridge>
input-channel
output-channel
<int:bridge input-channel="input" output-channel="output"/>
如上所述,消息桥的一个常见用例是将 连接到 .
在执行此角色时,消息传送桥还可以用作限制器:PollableChannel
SubscribableChannel
<int:bridge input-channel="pollable" output-channel="subscribable">
<int:poller max-messages-per-poll="10" fixed-rate="5000"/>
</int:bridge>
您可以使用类似的机制来连接通道适配器。
下面的示例显示了 Spring 集成命名空间中 和 adapters 之间的简单“echo”:stdin
stdout
stream
<int-stream:stdin-channel-adapter id="stdin"/>
<int-stream:stdout-channel-adapter id="stdout"/>
<int:bridge id="echo" input-channel="stdin" output-channel="stdout"/>
类似的配置适用于其他(可能更有用的)Channel Adapter 桥接,例如文件到 JMS 或邮件到文件。 接下来的章节将介绍各种通道适配器。
如果未在桥接上定义 'output-channel',则使用入站消息提供的回复通道(如果可用)。 如果 output 和 reply channel 都不可用,则会引发异常。 |
使用 Java 配置配置 Bridge
以下示例演示如何使用注释在 Java 中配置网桥:@BridgeFrom
@Bean
public PollableChannel polled() {
return new QueueChannel();
}
@Bean
@BridgeFrom(value = "polled", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public SubscribableChannel direct() {
return new DirectChannel();
}
以下示例演示如何使用注释在 Java 中配置网桥:@BridgeTo
@Bean
@BridgeTo(value = "direct", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public PollableChannel polled() {
return new QueueChannel();
}
@Bean
public SubscribableChannel direct() {
return new DirectChannel();
}
或者,您可以使用 ,如下例所示:BridgeHandler
@Bean
@ServiceActivator(inputChannel = "polled",
poller = @Poller(fixedRate = "5000", maxMessagesPerPoll = "10"))
public BridgeHandler bridge() {
BridgeHandler bridge = new BridgeHandler();
bridge.setOutputChannelName("direct");
return bridge;
}