核心消息

消息传送通道

消息通道

虽然 在封装数据方面发挥着关键作用,但它将消息生成者与消息使用者分离。MessageMessageChannelspring-doc.cn

MessageChannel 接口

Spring 集成的顶级接口定义如下:MessageChannelspring-doc.cn

public interface MessageChannel {

    boolean send(Message message);

    boolean send(Message message, long timeout);
}

发送消息时,返回值为消息是否发送成功。 如果 send 调用超时或中断,则返回 。truefalsespring-doc.cn

PollableChannel

由于消息通道可以也可能不缓冲消息(如 Spring 集成概述中所述),因此两个子接口定义了缓冲(可轮询)和非缓冲(可订阅)通道行为。 下面的清单显示了接口的定义:PollableChannelspring-doc.cn

public interface PollableChannel extends MessageChannel {

    Message<?> receive();

    Message<?> receive(long timeout);

}

与 send 方法一样,当接收消息时,如果出现超时或中断,则返回值为 null。spring-doc.cn

SubscribableChannel

基本接口由直接向其订阅的实例发送消息的通道实现。 因此,它们不提供用于轮询的 receive 方法。 相反,它们定义了管理这些订阅者的方法。 下面的清单显示了接口的定义:SubscribableChannelMessageHandlerSubscribableChannelspring-doc.cn

public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

消息通道实现

Spring 集成提供了不同的消息通道实现。 以下各节将简要介绍每个选项。spring-doc.cn

PublishSubscribeChannel

该 implementation 将发送给它的任何 BROADCAST 到其所有订阅的处理程序。 这最常用于发送事件消息,其主要角色是通知(与文档消息相反,文档消息通常由单个处理程序处理)。 请注意,这仅用于发送。 由于它在调用其方法时直接向订阅者广播,因此使用者无法轮询消息(它没有实现,因此没有方法)。 相反,任何订阅者本身都必须是 ,并且依次调用订阅者的方法。PublishSubscribeChannelMessagePublishSubscribeChannelsend(Message)PollableChannelreceive()MessageHandlerhandleMessage(Message)spring-doc.cn

在版本 3.0 之前,在没有订阅者的 上调用该方法会返回 。 当与 a 一起使用时,会引发 a。 从版本 3.0 开始,行为已更改,因此,如果至少存在最小订阅者(并成功处理消息),则始终认为 a 成功。 可以通过设置属性来修改此行为,该属性默认为 .sendPublishSubscribeChannelfalseMessagingTemplateMessageDeliveryExceptionsendminSubscribers0spring-doc.cn

如果使用 ,则仅使用正确数量的订阅者进行此确定,因为消息的实际处理是异步执行的。TaskExecutor
QueueChannel

该实现包装了一个队列。 与 不同,具有点对点语义。 换句话说,即使通道有多个消费者,也只有一个消费者应该接收发送到该通道的任何消费者。 它提供了一个默认的无参数构造函数(提供基本上无限的容量)以及一个接受队列容量的构造函数,如下面的清单所示:QueueChannelPublishSubscribeChannelQueueChannelMessageInteger.MAX_VALUEspring-doc.cn

public QueueChannel(int capacity)

未达到其容量限制的通道将消息存储在其内部队列中,并且该方法会立即返回,即使没有接收方准备好处理该消息也是如此。 如果队列已达到容量上限,则发送方将阻止,直到队列中有 room 可用。 或者,如果您使用具有附加 timeout 参数的 send 方法,则队列将阻止,直到任一房间可用或超时期限已过(以先发生者为准)。 同样,如果队列中有消息可用,则调用会立即返回,但是,如果队列为空,则 receive 调用可能会阻止,直到消息可用或超时(如果提供)过去。 在任何一种情况下,都可以通过传递超时值 0 来强制立即返回,而不管队列的状态如何。 但是请注意,对 和 的 versions 的调用会无限期地阻止。send(Message<?>)receive()send()receive()timeoutspring-doc.cn

PriorityChannel

虽然 强制执行先进先出 (FIFO) 排序,但 是一种替代实现,允许根据优先级在通道内对消息进行排序。 默认情况下,优先级由每条消息中的标头决定。 但是,对于自定义优先级确定逻辑,可以向构造函数提供 type 的 comparator。QueueChannelPriorityChannelpriorityComparator<Message<?>>PriorityChannelspring-doc.cn

RendezvousChannel

这将启用“直接切换”场景,其中发送方会阻塞,直到另一方调用通道的方法。 另一方会阻止,直到发送方发送消息。 在内部,此实现与 非常相似,不同之处在于它使用 (零容量实现)。 这在发送方和接收方在不同的线程中操作的情况下效果很好,但异步将消息放入队列中是不合适的。 换句话说,使用 ,发送方知道某个接收方已经接受了该消息,而使用 ,该消息将存储到内部队列中,并且可能永远不会收到。RendezvousChannelreceive()QueueChannelSynchronousQueueBlockingQueueRendezvousChannelQueueChannelspring-doc.cn

请记住,默认情况下,所有这些基于队列的通道都仅将消息存储在内存中。 当需要持久性时,可以在 'queue' 元素中提供 'message-store' 属性来引用持久性实现,也可以将本地通道替换为由持久性代理(如 JMS 支持的通道或通道适配器)支持的通道。 后一个选项允许您利用任何 JMS 提供程序的实现来实现消息持久性,如 JMS 支持中所述。 但是,当不需要在队列中缓冲时,最简单的方法是依赖 ,将在下一节中讨论。MessageStoreDirectChannel

这对于实现 request-reply 操作也很有用。 发送者可以创建一个临时的匿名实例,然后在构建 . 发送后,发送者可以立即调用(可选地提供超时值)以便在等待回复时阻止。 这与 Spring 集成的许多请求-回复组件内部使用的实现非常相似。RendezvousChannelRendezvousChannelMessageMessagereceiveMessagespring-doc.cn

DirectChannel

具有点对点语义,但其他方面比前面描述的任何基于队列的通道实现更类似于 。 它实现接口而不是接口,因此它将消息直接分派给订阅者。 但是,作为点对点通道,它与 的不同之处在于,它将每个通道发送到单个订阅的 。DirectChannelPublishSubscribeChannelSubscribableChannelPollableChannelPublishSubscribeChannelMessageMessageHandlerspring-doc.cn

除了是最简单的点对点通道选项之外,它最重要的功能之一是它使单个线程能够在通道的 “两侧” 执行操作。 例如,如果处理程序订阅 ,则向该通道发送 将在方法调用返回之前直接在发送方的线程中触发该处理程序的方法的调用。DirectChannelMessagehandleMessage(Message)send()spring-doc.cn

提供具有此行为的通道实现的主要动机是支持必须跨越通道的事务,同时仍然受益于通道提供的抽象和松散耦合。 如果在事务范围内调用调用,则处理程序调用的结果(例如,更新数据库记录)在确定该事务的最终结果(提交或回滚)方面发挥作用。send()spring-doc.cn

由于这是最简单的选项,并且不会增加调度和管理 Poller 线程所需的任何额外开销,因此它是 Spring Integration 中的默认通道类型。 一般的思路是定义应用程序的通道,考虑哪些通道需要提供缓冲或限制输入,并将这些通道修改为 queue-based 。 同样,如果通道需要广播消息,则它不应是 a,而应是 . 稍后,我们将展示如何配置这些通道中的每一个。DirectChannelPollableChannelsDirectChannelPublishSubscribeChannel

内部委托给消息调度程序以调用其订阅的消息处理程序,并且该调度程序可以具有由 or 属性公开的负载平衡策略(互斥)。 消息调度器使用负载平衡策略来帮助确定当多个消息处理程序订阅同一通道时,如何在消息处理程序之间分发消息。 为方便起见,该属性公开了一个值枚举,这些值指向预先存在的 . A (轮换处理程序之间的负载均衡) 和 (对于想要显式禁用负载均衡的情况) 是唯一可用的值。 将来的版本中可能会添加其他策略实现。 但是,从版本 3.0 开始,您可以提供自己的实现,并使用该属性注入它,该属性应指向实现的 bean,如下例所示:DirectChannelload-balancerload-balancer-refload-balancerLoadBalancingStrategyround-robinnoneLoadBalancingStrategyload-balancer-refLoadBalancingStrategyspring-doc.cn

A 是仅支持无法取消订阅的单个订阅者的 a。 这对于不涉及其他订阅者且不需要通道拦截器的高吞吐量性能使用案例非常有用。FixedSubscriberChannelSubscribableChannelMessageHandlerspring-doc.cn

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

请注意,和 属性是互斥的。load-balancerload-balancer-refspring-doc.cn

负载均衡还与 boolean 属性结合使用。 如果值为 true(默认值),则当前面的处理程序引发异常时,调度程序将回退到任何后续处理程序(根据需要)。 订单由处理程序本身定义的可选 order 值确定,如果不存在此类值,则由处理程序订阅的顺序确定。failoverfailoverspring-doc.cn

如果某种情况要求 Dispatcher 始终尝试调用第一个处理程序,然后在每次发生错误时以相同的固定顺序回退,则不应提供负载平衡策略。 换句话说,即使未启用负载均衡,调度程序仍支持 boolean 属性。 但是,如果没有负载平衡,处理程序的调用总是根据它们的顺序从第一个开始。 例如,当有 primary、secondary、tritiary 等的明确定义时,此方法效果很好。 使用命名空间支持时,任何端点上的属性都会确定顺序。failoverorderspring-doc.cn

请记住,负载平衡仅在通道具有多个订阅消息处理程序时应用。 当使用 namespace 支持时,这意味着多个 endpoint 共享 attribute 中定义的相同通道引用。failoverinput-channel

从版本 5.2 开始,当为 true 时,当前处理程序的失败以及失败的消息将分别记录在 or if configured 下。failoverdebuginfospring-doc.cn

ExecutorChannel

这是一个点对点通道,支持与(负载平衡策略和 boolean 属性)相同的调度程序配置。 这两种调度通道类型之间的主要区别在于,委托给 的实例来执行调度。 这意味着 send 方法通常不会阻塞,但这也意味着处理程序调用可能不会发生在发送方的线程中。 因此,它不支持跨 sender 和 receiving handler 的事务。ExecutorChannelDirectChannelfailoverExecutorChannelTaskExecutorspring-doc.cn

发件人有时可能会阻止。 例如,当将 a 与限制客户端的拒绝策略(如 )一起使用时,发送方的线程可以在线程池达到其最大容量且执行程序的工作队列已满时执行该方法。 由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行交易。TaskExecutorThreadPoolExecutor.CallerRunsPolicy
PartitionedChannel

从版本 6.1 开始,提供了一个实现。 这是点对点调度逻辑的扩展,并表示点对点调度逻辑,其中实际消耗在特定线程上处理,由从发送到此通道的消息评估的分区键确定。 此通道类似于上面提到的通道,但不同之处在于具有相同分区键的消息始终在同一线程中处理,同时保持 Sequences。 它不需要 external ,但可以使用自定义(例如 )进行配置 。 此工厂用于将单线程执行程序填充到每个分区的委托中。 默认情况下,消息标头用作分区键。 此通道可以配置为简单的 bean:PartitionedChannelAbstractExecutorChannelExecutorChannelTaskExecutorThreadFactoryThread.ofVirtual().name("partition-", 0).factory()MessageDispatcherIntegrationMessageHeaderAccessor.CORRELATION_IDspring-doc.cn

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}

通道将具有分区 - 专用线程;将使用标头来确定将在哪个分区中处理消息。 有关更多信息,请参见 class Javadocs。3partitionKeyPartitionedChannelspring-doc.cn

FluxMessageChannel

这是一种将消息发送到内部供下游反应式订阅者按需使用的实现。 此 channel implementation 既不是 , 也不是 a ,因此只能使用实例从此 channel 中消费,以遵守反应流的背压性质。 另一方面,它通过其 Contract 实现 a,允许从反应式源发布者接收事件,将反应式流桥接到集成流中。 为了实现整个集成流的完全反应行为,必须在流中的所有端点之间放置这样的通道。FluxMessageChannelorg.reactivestreams.Publisher"sinking"reactor.core.publisher.FluxSubscribableChannelPollableChannelorg.reactivestreams.SubscriberFluxMessageChannelReactiveStreamsSubscribableChannelsubscribeTo(Publisher<Message<?>>)spring-doc.cn

有关与 Reactive Streams 交互的更多信息,请参阅 Reactive Streams Supportspring-doc.cn

作用域通道

Spring Integration 1.0 提供了一个实现,但从 2.0 开始已被删除。 现在,处理相同需求的更通用方法是向 channel 添加一个 attribute。 该属性的值可以是上下文中可用的范围的名称。 例如,在 Web 环境中,某些范围可用,并且任何自定义范围实现都可以注册到上下文中。 下面的示例展示了一个应用于通道的线程本地作用域,包括作用域本身的注册:ThreadLocalChannelscopespring-doc.cn

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

上一个示例中定义的通道也在内部委托给队列,但该通道绑定到当前线程,因此队列的内容也类似地绑定。 这样,发送到通道的线程稍后可以接收这些相同的消息,但其他线程将无法访问它们。 虽然很少需要线程范围的通道,但在实例用于强制执行单个操作线程但任何回复消息都应发送到“终端”通道的情况下,它们可能很有用。 如果该终端通道是线程范围的,则原始发送线程可以从终端通道收集其回复。DirectChannelspring-doc.cn

现在,由于任何通道都可以被限定范围,因此除了 thread-Local 之外,您还可以定义自己的范围。spring-doc.cn

通道拦截器

消息传递体系结构的一个优点是能够提供常见行为,并以非侵入性方式捕获有关通过系统传递的消息的有意义信息。 由于实例是发送到实例和从实例接收的,因此这些通道提供了拦截发送和接收操作的机会。 策略接口(如下面的清单所示)为这些操作中的每一个提供了方法:MessageMessageChannelChannelInterceptorspring-doc.cn

public interface ChannelInterceptor {

    Message<?> preSend(Message<?> message, MessageChannel channel);

    void postSend(Message<?> message, MessageChannel channel, boolean sent);

    void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

    boolean preReceive(MessageChannel channel);

    Message<?> postReceive(Message<?> message, MessageChannel channel);

    void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}

实现接口后,向 channel 注册拦截器只需进行以下调用即可:spring-doc.cn

channel.addInterceptor(someChannelInterceptor);

返回实例的方法可用于转换 或 可以返回 'null' 以防止进一步处理(当然,任何方法都可以抛出 a )。 此外,该方法还可以返回以防止接收操作继续进行。MessageMessageRuntimeExceptionpreReceivefalsespring-doc.cn

请记住,调用仅与 相关。 事实上,该接口甚至没有定义方法。 这样做的原因是,当 a 被发送到 时,它会直接发送给零个或多个订阅者,具体取决于通道的类型(例如, A 发送给其所有订阅者)。 因此,仅当将侦听器应用于 .receive()PollableChannelsSubscribableChannelreceive()MessageSubscribableChannelPublishSubscribeChannelpreReceive(…​)postReceive(…​)afterReceiveCompletion(…​)PollableChannel

Spring 集成还提供了 Wire Tap 模式的实现。 它是一个简单的拦截器,可以将 发送到另一个通道,而不会改变现有流。 它对于调试和监控非常有用。 Wire Tap 中显示了一个示例。Messagespring-doc.cn

由于很少需要实现所有拦截器方法,因此该接口提供无操作方法(返回方法没有代码,-returning 方法按原样返回,方法返回)。voidMessageMessagebooleantruespring-doc.cn

拦截器方法的调用 Sequences 取决于通道的类型。 如前所述,基于队列的通道是唯一首先拦截该方法的通道。 此外,发送和接收拦截之间的关系取决于单独的发送方和接收方线程的计时。 例如,如果接收方在等待消息时已被阻止,则顺序可能如下所示:、、、、。 但是,如果接收方在发送方在通道上放置消息并已返回后进行轮询,则顺序将如下所示:、(经过一段时间)、、。 在这种情况下,经过的时间取决于许多因素,因此通常是不可预测的(事实上,接收可能永远不会发生)。 队列的类型也起着一定的作用(例如,rendezvous 与 priority)。 简而言之,您不能依赖超出 precedes 和 precedes 的事实之外的顺序。receive()preSendpreReceivepostReceivepostSendpreSendpostSendpreReceivepostReceivepreSendpostSendpreReceivepostReceive

从 Spring Framework 4.1 和 Spring Integration 4.1 开始,提供了新的方法:和. 它们在调用后调用,而不管引发的任何异常如何,这允许资源清理。 请注意,通道以与 initial 和 calls 相反的顺序调用列表中的这些方法。ChannelInterceptorafterSendCompletion()afterReceiveCompletion()send()' and 'receive()ChannelInterceptorpreSend()preReceive()spring-doc.cn

从版本 5.1 开始,全局通道拦截器现在适用于动态注册的通道 - 例如通过使用 Java DSL 初始化的 bean 或在使用 Java DSL 时初始化的 bean。 以前,在刷新应用程序上下文后创建 bean 时,不会应用拦截器。beanFactory.initializeBean()IntegrationFlowContextspring-doc.cn

此外,从版本 5.1 开始,当未收到消息时,不再调用;不再需要检查 . 以前,该方法被调用。 如果你有一个依赖于先前行为的拦截器,请改为实现,因为无论是否收到消息,都会调用该方法。ChannelInterceptor.postReceive()nullMessage<?>afterReceiveCompleted()spring-doc.cn

从版本 5.2 开始,Spring Messaging 模块已被弃用,现在它扩展了该模块以实现向后兼容性。ChannelInterceptorAwareInterceptableChannel

MessagingTemplate

当引入端点及其各种配置选项时, Spring 集成为消息传递组件提供了一个基础,该组件支持从消息传递系统非侵入性地调用应用程序代码。 但是,有时需要从应用程序代码中调用消息传送系统。 为了在实现此类用例时方便,Spring 集成提供了一个支持跨消息通道的各种操作,包括请求和回复场景。 例如,可以发送请求并等待回复,如下所示:MessagingTemplatespring-doc.cn

MessagingTemplate template = new MessagingTemplate();

Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));

在前面的示例中,模板将在内部创建一个临时匿名通道。 还可以在模板上设置 'sendTimeout' 和 'receiveTimeout' 属性,并且还支持其他交换类型。 下面的清单显示了此类方法的签名:spring-doc.cn

public boolean send(final MessageChannel channel, final Message<?> message) { ...
}

public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}

public Message<?> receive(final PollableChannel<?> channel) { ...
}
Enter the GatewayProxyFactoryBean中描述了一种侵入性较小的方法,该方法允许你使用payload或header值而不是实例来调用简单的接口。Message

配置消息通道

要创建消息通道实例,可以使用 xml 元素或 Java 配置实例,如下所示:<channel/>DirectChannelspring-doc.cn

Java
@Bean
public MessageChannel exampleChannel() {
    return new DirectChannel();
}
XML 格式
<int:channel id="exampleChannel"/>

当您使用不带任何子元素的元素时,它会创建一个实例 (a )。<channel/>DirectChannelSubscribableChannelspring-doc.cn

要创建发布-订阅通道,请使用元素(在 Java 中为 the),如下所示:<publish-subscribe-channel/>PublishSubscribeChannelspring-doc.cn

Java
@Bean
public MessageChannel exampleChannel() {
    return new PublishSubscribeChannel();
}
XML 格式
<int:publish-subscribe-channel id="exampleChannel"/>

您也可以提供各种子元素来创建任何可轮询的通道类型(如 消息通道实现中所述)。 以下部分显示了每种通道类型的示例。<queue/>spring-doc.cn

DirectChannel配置

如前所述, 是默认类型。 下面的清单显示了定义谁:DirectChannelspring-doc.cn

Java
@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
XML 格式
<int:channel id="directChannel"/>

默认通道具有循环负载均衡器,并且还启用了故障转移(有关更多详细信息,请参阅 DirectChannel)。 要禁用其中一项或两项,请添加子元素( 的构造函数 )并按如下方式配置属性:<dispatcher/>LoadBalancingStrategyDirectChannelspring-doc.cn

Java
@Bean
public MessageChannel failFastChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(false);
    return channel;
}

@Bean
public MessageChannel failFastChannel() {
    return new DirectChannel(null);
}
XML 格式
<int:channel id="failFastChannel">
    <int:dispatcher failover="false"/>
</channel>

<int:channel id="channelWithFixedOrderSequenceFailover">
    <int:dispatcher load-balancer="none"/>
</int:channel>
数据类型 Channel 配置

有时,使用者只能处理特定类型的有效负载,这迫使您确保输入消息的有效负载类型。 首先想到的可能是使用消息过滤器。 但是,消息筛选器所能做的只是筛选出不符合使用者要求的消息。 另一种方法是使用基于内容的路由器,并将具有不合规数据类型的消息路由到特定的转换器,以强制转换和转换为所需的数据类型。 这将有效,但完成相同任务的更简单方法是应用 Datatype Channel 模式。 您可以为每个特定的负载数据类型使用单独的数据类型通道。spring-doc.cn

要创建仅接受包含特定有效负载类型的消息的数据类型通道,请在 channel 元素的属性中提供数据类型的完全限定类名,如下例所示:datatypespring-doc.cn

Java
@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(Number.class);
    return channel;
}
XML 格式
<int:channel id="numberChannel" datatype="java.lang.Number"/>

请注意,对于可分配给通道数据类型的任何类型的类型,类型检查都会通过。 换句话说,前面示例中的 the 将接受有效负载为 或 的消息。 可以将多个类型作为逗号分隔的列表提供,如下例所示:numberChanneljava.lang.Integerjava.lang.Doublespring-doc.cn

Java
@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(String.class, Number.class);
    return channel;
}
XML 格式
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

因此,前面示例中的 'numberChannel' 只接受数据类型为 . 但是,如果消息的有效负载不是 required 类型,会发生什么情况呢? 这取决于你是否定义了一个名为 Spring 的 Conversion Service 实例的 bean。 如果没有,那么将立即抛出 an。 但是,如果已定义 Bean,则尝试将 Bean 用于将消息的有效负载转换为可接受的类型。java.lang.NumberintegrationConversionServiceExceptionintegrationConversionServicespring-doc.cn

您甚至可以注册自定义转换器。 例如,假设您将一条带有有效负载的消息发送到我们上面配置的 'numberChannel'。 您可以按如下方式处理该消息:Stringspring-doc.cn

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));

通常,这将是一个完全合法的操作。 但是,由于我们使用 Datatype Channel,因此此类操作的结果将生成类似于以下内容的异常:spring-doc.cn

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…

发生异常的原因是我们要求有效负载类型为 ,但我们发送了 . 所以我们需要一些东西来将 a 转换为 . 为此,我们可以实现类似于以下示例的转换器:NumberStringStringNumberspring-doc.cn

public static class StringToIntegerConverter implements Converter<String, Integer> {
    public Integer convert(String source) {
        return Integer.parseInt(source);
    }
}

然后,我们可以将其注册为 Integration Conversion Service 的转换器,如下例所示:spring-doc.cn

Java
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
    return new StringToIntegerConverter();
}
XML 格式
<int:converter ref="strToInt"/>

<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>

或者在类上标记有用于自动扫描的注释时。StringToIntegerConverter@Componentspring-doc.cn

当 'converter' 元素被解析时,如果尚未定义 bean,它会创建 bean。 有了该转换器,该操作现在将成功,因为数据类型通道使用该转换器将有效负载转换为 .integrationConversionServicesendStringIntegerspring-doc.cn

有关负载类型转换的更多信息,请参阅负载类型转换spring-doc.cn

从版本 4.0 开始,由 调用,它在应用程序上下文中查找转换服务。 要使用其他转换技术,您可以在通道上指定属性。 这必须是对 implementation 的引用。 仅使用方法。 它为转换器提供对消息标头的访问(如果转换可能需要标头中的信息,例如 )。 该方法只能返回已转换的有效负载或完整对象。 如果是后者,则转换器必须小心地从入站消息中复制所有 Headers。integrationConversionServiceDefaultDatatypeChannelMessageConvertermessage-converterMessageConverterfromMessagecontent-typeMessagespring-doc.cn

或者,您可以声明 ID 为 的 type ,并且该转换器由具有 .<bean/>MessageConverterdatatypeChannelMessageConverterdatatypespring-doc.cn

QueueChannel配置

要创建 ,请使用 sub-element. 您可以按如下方式指定通道的容量:QueueChannel<queue/>spring-doc.cn

Java
@Bean
public PollableChannel queueChannel() {
    return new QueueChannel(25);
}
XML 格式
<int:channel id="queueChannel">
    <queue capacity="25"/>
</int:channel>
如果您没有为此子元素的 'capacity' 属性提供值,则生成的队列是无限的。 为避免内存不足等问题,我们强烈建议您为有界队列设置显式值。<queue/>
持久配置QueueChannel

由于 a 提供了缓冲消息的功能,但默认情况下仅在内存中缓冲,因此它还引入了在系统故障时消息可能会丢失的可能性。 为了降低这种风险,a 可能由策略接口的持续实现提供支持。 有关 和 的更多详细信息,请参阅 Message StoreQueueChannelQueueChannelMessageGroupStoreMessageGroupStoreMessageStorespring-doc.cn

使用该属性时不允许使用该属性。capacitymessage-store

当 收到 时,它会将消息添加到消息存储中。 从 轮询 a 时,会将其从邮件存储中删除。QueueChannelMessageMessageQueueChannelspring-doc.cn

默认情况下,a 将其消息存储在内存中队列中,这可能会导致前面提到的消息丢失情况。 但是, Spring Integration 提供了持久存储,例如 .QueueChannelJdbcChannelMessageStorespring-doc.cn

您可以通过添加属性来为 any 配置消息存储,如下例所示:QueueChannelmessage-storespring-doc.cn

<int:channel id="dbBackedChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

(有关Java/Kotlin配置选项,请参阅以下示例。spring-doc.cn

Spring 集成 JDBC 模块还为许多流行的数据库提供了模式数据定义语言(DDL)。 这些模式位于该模块的org.springframework.integration.jdbc.store.channel包中()。spring-integration-jdbcspring-doc.cn

一个重要的功能是,对于任何事务性持久存储(例如 ),只要 Poller 配置了事务,只有在事务成功完成时,才能永久删除从存储中删除的消息。 否则,事务将回滚,并且不会丢失。JdbcChannelMessageStoreMessage

随着越来越多的与 “NoSQL” 数据存储相关的 Spring 项目开始为这些存储提供底层支持,可以使用消息存储的许多其他实现。 如果找不到满足您特定需求的接口,您也可以提供自己的接口实现。MessageGroupStorespring-doc.cn

从版本 4.0 开始,我们建议将实例配置为尽可能使用 。 与一般邮件存储相比,这些存储通常针对此用途进行了优化。 如果 是 a ,则按优先级顺序在 FIFO 中接收消息。 优先级的概念由 message store 实现确定。 例如,以下示例显示了 MongoDB 通道消息存储的 Java 配置:QueueChannelChannelMessageStoreChannelMessageStoreChannelPriorityMessageStorespring-doc.cn

Java
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
Java DSL
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
    return IntegrationFlow.from((Channels c) ->
            c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
            ....
            .get();
}
Kotlin DSL
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
    integrationFlow {
        channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
    }
注意类。 这是使用 operations 的实现。MessageGroupQueueBlockingQueueMessageGroupStore

自定义环境的另一个选项由 sub-element 的属性或其特定构造函数提供。 此属性提供对任何 implementation 的引用。 例如,Hazelcast 分布式 IQueue 可以按如下方式进行配置:QueueChannelref<int:queue>java.util.Queuespring-doc.cn

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(new Config()
                                           .setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
    return new QueueChannel(hazelcastInstance()
                              .getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel配置

要创建 ,请使用 <publish-subscribe-channel/> 元素。 使用此元素时,您还可以指定用于发布消息的 (如果未指定,它将在发件人的线程中发布),如下所示:PublishSubscribeChanneltask-executorspring-doc.cn

Java
@Bean
public MessageChannel pubsubChannel() {
    return new PublishSubscribeChannel(someExecutor());
}
XML 格式
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

如果提供 下游的 重排序器或聚合器 ,则可以将通道上的 “apply-sequence” 属性设置为 。 这样做表示通道应在传递消息之前设置 and 消息标头以及相关 ID。 例如,如果有五个订阅者,则 将设置为 ,并且消息的标头值范围为 到 。PublishSubscribeChanneltruesequence-sizesequence-numbersequence-size5sequence-number15spring-doc.cn

除了 之外,您还可以配置 . 默认情况下,它使用 implementation 将错误从 header 发送到 或 global 实例。 如果未配置 an,则忽略 ,并将异常直接抛出到调用方的线程中。ExecutorErrorHandlerPublishSubscribeChannelMessagePublishingErrorHandlerMessageChannelerrorChannelerrorChannelExecutorErrorHandlerspring-doc.cn

如果提供 或 downstream ,则可以将渠道上的“apply-sequence”属性设置为 。 这样做表示通道应在传递消息之前设置 sequence-size 和 sequence-number 消息头以及相关 ID。 例如,如果有五个订阅者,则 sequence-size 将设置为 ,并且消息将具有 sequence-number 标头值,范围从 到 。ResequencerAggregatorPublishSubscribeChanneltrue515spring-doc.cn

以下示例显示如何将标头设置为 :apply-sequencetruespring-doc.cn

Java
@Bean
public MessageChannel pubsubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel();
    channel.setApplySequence(true);
    return channel;
}
XML 格式
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
该值是默认的,以便发布-订阅通道可以将完全相同的消息实例发送到多个出站通道。 由于 Spring Integration 强制执行有效负载和 Headers 引用的不变性,因此当标志设置为 时,通道会创建具有相同有效负载引用但不同 Headers 值的新实例。apply-sequencefalsetrueMessage

从版本 5.4.3 开始,还可以配置 its 选项,以指示此通道在没有订阅者时不会静默忽略消息。 当没有订阅者时,将引发带有消息的 A,并且此选项设置为 。PublishSubscribeChannelrequireSubscribersBroadcastingDispatcherMessageDispatchingExceptionDispatcher has no subscriberstruespring-doc.cn

ExecutorChannel

要创建 ,请添加具有属性的子元素。 该属性的值可以引用上下文中的 any。 例如,这样做可以启用线程池的配置,以便将消息分派给订阅的处理程序。 如前所述,这样做会打破发送方和接收方之间的单线程执行上下文,以便处理程序的调用不会共享任何活动的事务上下文(即,处理程序可能会抛出,但调用已经成功返回)。 下面的示例演示如何使用元素并在属性中指定执行程序:ExecutorChannel<dispatcher>task-executorTaskExecutorExceptionsenddispatchertask-executorspring-doc.cn

Java
@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(someExecutor());
}
XML 格式
<int:channel id="executorChannel">
    <int:dispatcher task-executor="someExecutor"/>
</int:channel>

和 选项在 <dispatcher/> 子元素上也可用,如前面的 DirectChannel 配置中所述。 相同的默认值适用。 因此,通道具有启用故障转移的循环负载平衡策略,除非为其中一个或两个属性提供了显式配置,如下例所示:load-balancerfailoverspring-doc.cn

<int:channel id="executorChannelWithoutFailover">
    <int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>
PriorityChannel配置

要创建 ,请使用 sub-element,如下例所示:PriorityChannel<priority-queue/>spring-doc.cn

Java
@Bean
public PollableChannel priorityChannel() {
    return new PriorityChannel(20);
}
XML 格式
<int:channel id="priorityChannel">
    <int:priority-queue capacity="20"/>
</int:channel>

默认情况下,通道会查询消息的 Headers。 但是,您可以改为提供自定义引用。 另外,请注意 (像其他类型一样) 确实支持 attribute 。 与 一样,它也支持属性。 以下示例演示了所有这些:priorityComparatorPriorityChanneldatatypeQueueChannelcapacityspring-doc.cn

Java
@Bean
public PollableChannel priorityChannel() {
    PriorityChannel channel = new PriorityChannel(20, widgetComparator());
    channel.setDatatypes(example.Widget.class);
    return channel;
}
XML 格式
<int:channel id="priorityChannel" datatype="example.Widget">
    <int:priority-queue comparator="widgetComparator"
                    capacity="10"/>
</int:channel>

从版本 4.0 开始,子元素支持选项 ( ,在这种情况下不允许使用)。 邮件存储必须是 . 目前为 、 和 提供了 的实现。 有关更多信息,请参阅 QueueChannel ConfigurationMessage Store 。 您可以在 Backing Message Channels 中找到示例配置。priority-channelmessage-storecomparatorcapacityPriorityCapableChannelMessageStorePriorityCapableChannelMessageStoreRedisJDBCMongoDBspring-doc.cn

RendezvousChannel配置

当队列子元素为 时创建 A 。 它不提供前面描述的配置选项的任何其他配置选项,并且其队列不接受任何容量值,因为它是零容量直接切换队列。 以下示例演示如何声明 :RendezvousChannel<rendezvous-queue>RendezvousChannelspring-doc.cn

Java
@Bean
public PollableChannel rendezvousChannel() {
    return new RendezvousChannel();
}
XML 格式
<int:channel id="rendezvousChannel"/>
    <int:rendezvous-queue/>
</int:channel>
作用域通道配置

任何通道都可以配置属性,如下例所示:scopespring-doc.cn

<int:channel id="threadLocalChannel" scope="thread"/>
通道拦截器配置

消息通道也可能具有拦截器,如 通道拦截器中所述。 子元素可以添加到 (或更具体的元素类型) 。 你可以提供该属性来引用实现该接口的任何 Spring 托管对象,如下例所示:<interceptors/><channel/>refChannelInterceptorspring-doc.cn

<int:channel id="exampleChannel">
    <int:interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </int:interceptors>
</int:channel>

通常,我们建议在单独的位置定义拦截器实现,因为它们通常提供可在多个通道之间重用的常见行为。spring-doc.cn

全局通道拦截器配置

Channel interceptors 提供了一种简洁明了的方式来为每个单独的 Channel 应用横切行为。 如果应该在多个 channel 上应用相同的行为,则为每个 channel 配置相同的拦截器集将不是最有效的方法。 为了避免重复配置,同时使拦截器能够应用于多个通道, Spring 集成提供了全局拦截器。 请考虑以下一对示例:spring-doc.cn

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
    <bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>

每个元素都允许您定义一个全局侦听器,该侦听器应用于与属性定义的任何模式匹配的所有通道。 在前面的情况下,全局拦截器应用于 'thing1' 通道和所有其他以 'thing2' 或 'input' 开头的通道,但不应用于以 'thing3' 开头的通道(从 5.0 版本开始)。<channel-interceptor/>patternspring-doc.cn

将此语法添加到模式中会导致一个可能的(尽管可能不太可能)问题。 如果你有一个名为 bean 的 bean,并且你在通道拦截器的模式中包含了 的模式,那么它不再匹配。 该模式现在匹配所有未命名的 bean。 在这种情况下,您可以使用 . 该模式与名为 .!thing1!thing1patternthing1!\\!thing1!thing1

order 属性允许您管理当给定通道上有多个拦截器时,此拦截器的注入位置。 例如,通道 'inputChannel' 可以在本地配置单独的拦截器(见下文),如下例所示:spring-doc.cn

<int:channel id="inputChannel">
  <int:interceptors>
    <int:wire-tap channel="logger"/>
  </int:interceptors>
</int:channel>

一个合理的问题是“相对于本地配置的其他拦截器或通过其他全局拦截器定义,全局拦截器是如何注入的? 当前的实现提供了一种简单的机制来定义拦截器执行的顺序。 属性中的正数确保在任何现有拦截器之后注入拦截器,而负数确保拦截器在现有拦截器之前注入。 这意味着,在前面的示例中,全局拦截器被注入在本地配置的 'wire-tap' 拦截器之后(因为它大于)。 如果有另一个全局拦截器与 matching ,则其顺序将通过比较两个拦截器的属性值来确定。 要在现有拦截器之前注入全局拦截器,请对属性使用负值。orderorder0patternorderorderspring-doc.cn

请注意,和 属性都是可选的。 的默认值为 0,的默认值为 '*'(以匹配所有通道)。orderpatternorderpattern
丝锥

如前所述, Spring 集成提供了一个简单的 wire tap 拦截器。 您可以在元素中的任何通道上配置接线。 这样做对于调试特别有用,并且可以与 Spring 集成的日志记录通道适配器结合使用,如下所示:<interceptors/>spring-doc.cn

<int:channel id="in">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter'还接受'expression'属性,以便您可以根据'payload'和'headers'变量评估 SPEL 表达式。 或者,要记录完整的消息结果,请为 'log-full-message' 属性提供值 。 默认情况下,仅记录有效负载。 将其设置为 enable logging all headers s go to payload. 'expression' 选项提供了最大的灵活性(例如, )。toString()truefalsetrueexpression="payload.user.name"

关于 wire tap 和其他类似组件(消息发布配置)的一个常见误解是,它们在本质上是自动异步的。 默认情况下,作为组件的 wire tap 不会异步调用。 相反, Spring 集成专注于配置异步行为的单一统一方法:消息通道。 使消息流的某些部分同步或异步的是在该流中配置的 Message Channel 的类型。 这是消息通道抽象的主要好处之一。 从框架成立之初,我们就一直强调消息通道作为框架的一等公民的需求和价值。 它不仅仅是 EIP 模式的内部隐式实现。 它作为可配置组件完全公开给最终用户。 因此,Wire Tap 组件仅负责执行以下任务:spring-doc.cn

它本质上是桥接模式的变体,但它封装在通道定义中(因此更容易在不中断流的情况下启用和禁用)。 此外,与桥接不同,它基本上是分叉另一个消息流。 该流是同步的还是异步的?答案取决于 'channelB' 的消息通道类型。 我们有以下选项:direct channel、pollable channel 和 executor channel。 最后两个打破了线程边界,使通过此类通道的通信异步,因为将消息从该通道分派到其订阅的处理程序发生在与用于将消息发送到该通道的线程不同的线程上。 这就是使您的 wire-tap 流同步或异步的原因。 它与框架中的其他组件(比如消息发布者)一致,并且通过让您无需提前担心(除了编写线程安全代码)特定代码段应该作为同步还是异步实现,从而增加了一定程度的一致性和简单性。 两个代码段(比如组件 A 和组件 B)在消息通道上的实际连接使它们的协作同步或异步。 你甚至可能希望将来从 synchronous 更改为 asynchronous ,而 message channel 让你无需接触代码即可快速完成。spring-doc.cn

关于窃听的最后一点是,尽管上面提供了默认情况下不异步的基本原理,但您应该记住,通常希望尽快传递消息。 因此,使用 asynchronous channel 选项作为 wire tap 的出站通道是很常见的。 但是,默认情况下不强制实施异步行为。 如果我们这样做,有许多用例会中断,包括您可能不想打破事务边界。 也许您使用 wire tap 模式进行审计,并且您确实希望在原始事务中发送审计消息。 例如,您可以将 wire tap 连接到 JMS 出站通道适配器。 这样,您可以获得两全其美的效果:1) JMS 消息的发送可以在事务中进行,而 2) 它仍然是一个 “即发即弃” 操作,从而防止主消息流中出现任何明显的延迟。spring-doc.cn

从版本 4.0 开始,当侦听器(例如 WireTap)引用通道时避免循环引用非常重要。 您需要将此类 channel 从当前拦截器拦截的 channels 中排除。 这可以通过适当的模式或编程方式完成。 如果您有引用 的自定义,请考虑实施 . 这样,框架会询问拦截器是否可以根据提供的模式拦截每个候选通道。 您还可以在拦截器方法中添加运行时保护,以确保通道不是拦截器引用的通道。 这同时使用了这两种技术。ChannelInterceptorchannelVetoCapableInterceptorWireTap

从版本 4.3 开始,具有采用 a 而不是 instance 的其他构造函数。 这对于 Java 配置以及使用通道自动创建逻辑时非常方便。 目标 bean 在稍后提供的 中解析,在与 拦截 器。WireTapchannelNameMessageChannelMessageChannelchannelNamespring-doc.cn

通道解析需要一个 ,因此 wire tap 实例必须是 Spring 管理的 bean。BeanFactory

这种后期绑定方法还允许使用 Java DSL 配置简化典型的窃听模式,如下例所示:spring-doc.cn

@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}
条件接线器

可以使用 or 属性将 Wire Tap 设为有条件。 该 Bean 引用一个 Bean,该 Bean 可以在运行时确定消息是否应转到 tap 通道。 同样, is a boolean SpEL 表达式,执行相同的目的:如果表达式的计算结果为 ,则消息将发送到 tap 通道。selectorselector-expressionselectorMessageSelectorselector-expressiontruespring-doc.cn

全局 Wire Tap 配置

可以将全局 wire tap 配置为 Global Channel Interceptor Configuration 的特殊情况。 为此,请配置一个 top level 元素。 现在,除了正常的命名空间支持之外,还支持 and 属性,并且其工作方式与它们对 . 以下示例说明如何配置全局 Wire Tap:wire-tapwire-tappatternorderchannel-interceptorspring-doc.cn

Java
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
    return new WireTap(wiretapChannel);
}
XML 格式
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局 Wire Tap 提供了一种在外部配置单通道 Wire Tap 而无需修改现有通道配置的便捷方法。 为此,请将 该属性设置为 target channel name。 例如,您可以使用此技术配置测试用例以验证通道上的消息。pattern

特殊频道

默认情况下,在应用程序上下文中定义了两个特殊通道:和 。 'nullChannel' ( 的实例 ) 的作用类似于 ,记录在该级别发送给它的任何消息并立即返回。 特殊处理适用于传输消息的有效负载:它立即在此通道中订阅,以启动反应流处理,尽管数据被丢弃。 从反应流处理(请参阅 )引发的错误记录在该级别下,以便进行可能的调查。 如果需要对此类错误执行任何操作,则可以将具有自定义项的 应用于生成回复 this 的消息处理程序。 任何时候,当你遇到一个你不关心的回复的通道解析错误时,你可以将受影响的组件的属性设置为 'nullChannel' (名称 'nullChannel' 在应用程序上下文中保留)。errorChannelnullChannelNullChannel/dev/nullDEBUGorg.reactivestreams.PublisherSubscriber.onError(Throwable)warnReactiveRequestHandlerAdviceMono.doOnError()MononullChanneloutput-channelspring-doc.cn

'errorChannel' 在内部用于发送错误消息,并且可以被自定义配置覆盖。 错误处理中对此进行了更详细的讨论。spring-doc.cn

有关消息通道和拦截器的更多信息,另请参阅 Java DSL 一章中的消息通道spring-doc.cn

轮询器

本节描述了 Spring Integration 中 polling 的工作原理。spring-doc.cn

轮询消费者

当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一:spring-doc.cn

实际实现取决于这些终端节点连接到的通道类型。 连接到实现 org.springframework.messaging.SubscribableChannel 接口的通道适配器会生成一个 . 另一方面,连接到实现 org.springframework.messaging.PollableChannel 接口(例如 a )的通道适配器会生成一个 .EventDrivenConsumerQueueChannelPollingConsumerspring-doc.cn

轮询使用者允许 Spring 集成组件主动轮询消息,而不是以事件驱动的方式处理消息。spring-doc.cn

在许多消息传递方案中,它们代表一个关键的横切关注点。 在 Spring Integration 中,轮询使用者基于同名模式,Gregor Hohpe 和 Bobby Woolf 在 Enterprise Integration Patterns 一书中对此进行了描述。 您可以在该书的网站上找到该模式的描述。spring-doc.cn

Pollable 消息源

Spring 集成提供了轮询消费者模式的第二种变体。 使用入站通道适配器时,这些适配器通常由 . 例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器中描述的适配器配置了 Poller 以定期检索消息。 因此,当组件使用 Poller 进行配置时,生成的实例是以下类型之一:SourcePollingChannelAdapterspring-doc.cn

这意味着轮询器用于入站和出站消息传递方案。 以下是使用 Poller 的一些用例:spring-doc.cn

  • 轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务spring-doc.cn

  • 轮询内部(可轮询)消息通道spring-doc.cn

  • 轮询内部服务(例如在 Java 类上重复执行方法)spring-doc.cn

AOP 建议类可以应用于 Poller ,例如用于启动事务的事务通知。 从版本 4.1 开始,提供了 a。 轮询器使用触发器来确定下一次轮询的时间。 这可用于抑制 (跳过) 轮询,可能是因为存在一些下游条件会阻止消息被处理。 要使用此建议,您必须为其提供 . 从版本 4.2.5 开始,提供了 a。 要使用它,你可以将实例作为 bean 添加到应用程序上下文中,将其注入到 ,并将其添加到 Poller 的建议链中。 要跳过轮询,请调用 。 要恢复轮询,请调用 。 版本 4.2 在此领域增加了更多灵活性。 参见 Message Sources 的条件轮询器advice-chainPollSkipAdvicePollSkipAdvicePollSkipStrategySimplePollSkipStrategyPollSkipAdviceskipPolls()reset()

本章仅对轮询使用者以及它们如何适应消息通道(请参见消息通道)和通道适配器(请参见通道适配器)的概念进行简要概述。 有关消息收发终端节点的一般信息,特别是轮询使用者的更多信息,请参阅消息终端节点spring-doc.cn

延迟确认轮询消息源

从版本 5.0.1 开始,某些模块提供的实现支持将确认推迟到下游流完成(或将消息移交给另一个线程)。 目前仅限于 和 .MessageSourceAmqpMessageSourceKafkaMessageSourcespring-doc.cn

使用这些消息源,标头(请参阅 MessageHeaderAccessor API)将添加到消息中。 当与可轮询消息源一起使用时,标头的值是 的实例,如下例所示:IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKAcknowledgmentCallbackspring-doc.cn

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

并非所有消息源(例如 a )都支持该状态。 它的处理方式与 相同。KafkaMessageSourceREJECTACCEPTspring-doc.cn

应用程序可以随时确认消息,如下例所示:spring-doc.cn

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果 the 被连接到 a ,当 poller 线程在下游流完成后返回到适配器时,适配器会检查确认是否已被确认,如果没有,则将其状态设置为它(或者 flow 是否引发异常)。 状态值在 AcknowledgmentCallback.Status 枚举中定义。MessageSourceSourcePollingChannelAdapterACCEPTREJECTspring-doc.cn

Spring Integration 提供了对 . 这也负责设置或回调何时返回(或引发异常)。 以下示例演示如何使用 :MessageSourcePollingTemplateMessageSourceACCEPTREJECTAcknowledgmentCallbackMessageHandlerMessageSourcePollingTemplatespring-doc.cn

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在这两种情况下 ( 和 ),您都可以通过调用回调来禁用 auto ack/nack。 如果您将消息交给另一个线程并希望稍后确认,则可以执行此操作。 并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。SourcePollingChannelAdapterMessageSourcePollingTemplatenoAutoAck()spring-doc.cn

消息源的条件轮询器

本节介绍如何使用条件 Poller。spring-doc.cn

背景

Advice对象,在 ON POLLER 中,通知整个轮询任务(消息检索和处理)。 这些 “around advice” 方法无法访问 poll 的任何上下文 — 只能访问 poll 本身。 如前所述,这对于诸如使任务事务性或由于某些外部条件而跳过轮询等要求来说很好。 如果我们希望根据 poll 部分的结果采取一些行动,或者我们想根据条件调整 Poller 怎么办?对于这些实例, Spring 集成提供了“智能”轮询。advice-chainreceivespring-doc.cn

“智能”轮询

版本 5.3 引入了该界面。 中实现此接口的任何对象仅应用于操作 - 和 。 因此,它们只能应用于 或 。 此类实现以下方法:ReceiveMessageAdviceAdviceadvice-chainreceive()MessageSource.receive()PollableChannel.receive(timeout)SourcePollingChannelAdapterPollingConsumerspring-doc.cn

  • beforeReceive(Object source)该方法在该方法之前调用。 它允许您检查和重新配置源。 返回将取消此轮询(类似于前面提到的)。Object.receive()falsePollSkipAdvicespring-doc.cn

  • Message<?> afterReceive(Message<?> result, Object source)该方法在该方法之后调用。 同样,您可以重新配置源或执行任何操作(可能取决于结果,如果源没有创建消息,则可能会有所不同)。 您甚至可以返回不同的消息receive()nullspring-doc.cn

线程安全

如果 an 更改了源,则不应使用 . 如果 an 更改了源,则此类更改不是线程安全的,并且可能会导致意外结果,尤其是对于高频轮询器。 如果需要并发处理 poll 结果,请考虑使用下游,而不是向 Poller 添加执行程序。AdviceTaskExecutorAdviceExecutorChannelspring-doc.cn

Advice Chain 订购

您应该了解在初始化期间如何处理通知链。 未实现的对象将应用于整个轮询过程,并且所有对象都首先按顺序调用,然后再调用 any 。 然后,围绕 source 方法按顺序调用对象。 例如,如果您有 objects 、 where 和 are ,则对象将按以下顺序应用: 。 此外,如果源已经是 ,则会在任何现有对象之后调用 。 如果您想更改订单,您必须自己连接代理。AdviceReceiveMessageAdviceReceiveMessageAdviceReceiveMessageAdvicereceive()Advicea, b, c, dbdReceiveMessageAdvicea, c, b, dProxyReceiveMessageAdviceAdvicespring-doc.cn

SimpleActiveIdleReceiveMessageAdvice

此建议是 . 当与 一起使用时,它会根据上一次轮询是否生成消息来调整轮询频率。 poller 还必须具有对 same 的引用。ReceiveMessageAdviceDynamicPeriodicTriggerDynamicPeriodicTriggerspring-doc.cn

重要: 异步切换
SimpleActiveIdleReceiveMessageAdvice根据结果修改触发器。 这仅在 poller 线程上调用通知时有效。 如果 Poller 具有 . 要在轮询结果后使用异步操作时使用此建议,请稍后执行异步切换,可能使用 .receive()task-executorExecutorChannel
CompoundTriggerAdvice

此建议允许根据 poll 是否返回消息来选择两个触发器之一。 考虑一个使用 . 实例是不可变的,因此一旦构建就无法更改。 考虑这样一个使用案例:我们希望使用 cron 表达式每小时触发一次轮询,但如果未收到任何消息,则每分钟轮询一次,并在检索到消息时恢复为使用 cron 表达式。CronTriggerCronTriggerspring-doc.cn

为此,建议(和 poller)使用 a。 触发器的触发器可以是 . 当通知检测到未收到任何消息时,它会将辅助触发器添加到 . 调用实例的方法时,它会委托给辅助触发器(如果存在)。 否则,它将委托给主触发器。CompoundTriggerprimaryCronTriggerCompoundTriggerCompoundTriggernextExecutionTimespring-doc.cn

poller 还必须具有对 same 的引用。CompoundTriggerspring-doc.cn

以下示例显示了回退到每分钟的每小时 cron 表达式的配置:spring-doc.cn

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要: 异步切换
CompoundTriggerAdvice根据结果修改触发器。 这仅在 poller 线程上调用通知时有效。 如果 Poller 具有 . 要在轮询结果后使用异步操作时使用此建议,请稍后执行异步切换,可能使用 .receive()task-executorExecutorChannel
仅限 MessageSource 的建议

有些建议可能仅适用于 ,而对 则没有意义。 为此,仍然存在一个接口( 的扩展 )。 有关更多信息,请参见入站通道适配器:轮询多个服务器和目录MessageSource.receive()PollableChannelMessageSourceMutatorReceiveMessageAdvicespring-doc.cn

Channel Adapter

通道适配器是一个消息端点,它允许将单个发送方或接收方连接到消息通道。 Spring 集成提供了许多适配器来支持各种传输,例如 JMS、文件、HTTP、Web 服务、邮件等。 本参考指南的后续章节将讨论每个适配器。 但是,本章重点介绍简单但灵活的方法调用通道适配器支持。 有入站和出站适配器,每个适配器都可以使用 core 命名空间中提供的 XML 元素进行配置。 这些提供了一种扩展 Spring Integration 的简单方法,只要你有一个可以作为源或目标调用的方法。spring-doc.cn

配置入站通道适配器

元素(在 Java 配置中为 a)可以调用 Spring 管理对象上的任何方法,并在将方法的输出转换为 a 后将非 null 返回值发送到 。 激活适配器的订阅后,轮询器会尝试从源接收消息。 根据提供的配置,使用 调度 Poller。 要为单个通道适配器配置轮询间隔或 cron 表达式,你可以提供一个 'poller' 元素,其中包含一个调度属性,例如 'fixed-rate' 或 'cron'。 以下示例定义了两个实例:inbound-channel-adapterSourcePollingChannelAdapterMessageChannelMessageTaskSchedulerinbound-channel-adapterspring-doc.cn

Java DSL
@Bean
public IntegrationFlow source1() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.fixedRate(5000)))
                ...
                .get();
}

@Bean
public IntegrationFlow source2() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
                ...
                .get();
}
Java
public class SourceService {

    @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
    Object method1() {
        ...
    }

    @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
    Object method2() {
        ...
    }
}
Kotlin DSL
@Bean
fun messageSourceFlow() =
    integrationFlow( { GenericMessage<>(...) },
                    { poller { it.fixedRate(5000) } }) {
        ...
    }
XML 格式
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>

<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
    <int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
如果未提供 Poller,则必须在上下文中注册单个默认 Poller。 有关更多详细信息,请参阅 Endpoint Namespace Support
重要:轮询器配置

所有类型都由 a 提供支持,这意味着它们包含一个 poller 配置,该配置根据 Poller 中指定的配置轮询(以调用生成成为有效负载的值的自定义方法)。 以下示例显示了两个 Poller 的配置:inbound-channel-adapterSourcePollingChannelAdapterMessageSourceMessagespring-doc.cn

<int:poller max-messages-per-poll="1" fixed-rate="1000"/>

<int:poller max-messages-per-poll="10" fixed-rate="1000"/>

在第一个配置中,轮询任务每次轮询调用一次,并且在每个任务 (poll) 期间,根据属性值调用方法(导致消息生成)一次。 在第二种配置中,轮询任务每次轮询调用 10 次,或者直到它返回 'null',因此每次轮询可能会生成 10 条消息,而每次轮询的间隔为 1 秒。 但是,如果配置类似于以下示例,会发生什么情况:max-messages-per-pollspring-doc.cn

<int:poller fixed-rate="1000"/>

请注意,没有指定。 正如我们稍后介绍的那样,(例如, , , , 和其他)中相同的 poller 配置将具有 for for 的默认值,这意味着“除非轮询方法返回null(可能是因为)中没有更多消息,否则不间断地执行轮询任务”,然后休眠一秒钟。max-messages-per-pollPollingConsumerservice-activatorfilterrouter-1max-messages-per-pollQueueChannelspring-doc.cn

但是,在 中,它有点不同。 的默认值为 ,除非您将其显式设置为负值(如 )。 这确保了 Poller 可以对生命周期事件(例如启动和停止)做出反应,并防止它在自定义方法的实现可能永远不会返回 null 并且恰好是不可中断的情况下可能在无限循环中旋转。SourcePollingChannelAdaptermax-messages-per-poll1-1MessageSourcespring-doc.cn

但是,如果您确定您的方法可以返回 null,并且需要在每次轮询中轮询尽可能多的可用源,则应显式设置为负值,如下例所示:max-messages-per-pollspring-doc.cn

<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>

从版本 5.5 开始,值 for 具有特殊含义 - 完全跳过调用,这可能被视为暂停此入站通道适配器,直到稍后将 for 更改为非零值,例如通过 Control Bus。0max-messages-per-pollMessageSource.receive()maxMessagesPerPollspring-doc.cn

另请参阅 Global Default Poller 了解更多信息。spring-doc.cn

配置出站通道适配器

元素(用于 Java 配置)还可以将 a 连接到任何 POJO 消费者方法,该方法应该使用发送到该通道的消息的有效负载来调用。 以下示例说明如何定义出站通道适配器:outbound-channel-adapter@ServiceActivatorMessageChannelspring-doc.cn

Java DSL
@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
    return f -> f
             .handle(myPojo, "handle");
}
Java
public class MyPojo {

    @ServiceActivator(channel = "channel1")
    void handle(Object payload) {
        ...
    }

}
Kotlin DSL
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
    integrationFlow {
        handle(myPojo, "handle")
    }
XML 格式
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>

<beans:bean id="target" class="org.MyPojo"/>

如果要适配的通道是 ,则必须提供 poller 子元素( 上的 子注释 ),如下例所示:PollableChannel@Poller@ServiceActivatorspring-doc.cn

Java
public class MyPojo {

    @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
    void handle(Object payload) {
        ...
    }

}
XML 格式
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
    <int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>

<beans:bean id="target" class="org.MyPojo"/>

如果 POJO 使用者实现可以在其他定义中重用,则应使用属性。 但是,如果使用者实现仅由的单个定义引用,则可以将其定义为内部 bean,如下例所示:ref<outbound-channel-adapter><outbound-channel-adapter>spring-doc.cn

<int:outbound-channel-adapter channel="channel" method="handle">
    <beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
不允许在同一配置中同时使用 attribute 和 inner handler definition,因为它会产生不明确的条件。 此类配置会导致引发异常。ref<outbound-channel-adapter>

可以在没有引用的情况下创建任何通道适配器,在这种情况下,它会隐式创建 . 创建的频道名称与 or 元素的属性匹配。 因此,如果未提供,则需要。channelDirectChannelid<inbound-channel-adapter><outbound-channel-adapter>channelidspring-doc.cn

通道适配器表达式和脚本

与许多其他 Spring 集成组件一样,和 也提供了对 SPEL 表达式求值的支持。 要使用 SPEL,请在'expression'属性中提供表达式字符串,而不是提供用于在 Bean 上调用方法的'ref'和'method'属性。 计算表达式时,它遵循与 method-invocation 相同的协定,其中:每当评估结果为非 null 值时,an 的表达式都会生成一条消息,而 an 的表达式必须等效于返回 void 的方法调用。<inbound-channel-adapter><outbound-channel-adapter><inbound-channel-adapter><outbound-channel-adapter>spring-doc.cn

从 Spring Integration 3.0 开始,还可以使用 SPEL (甚至 a )子元素进行配置,因为需要比使用简单的 'expression' 属性所能实现的更复杂的事情。 如果使用 属性以脚本形式提供脚本,则还可以设置 ,从而允许定期刷新资源。 如果希望在每次轮询时检查脚本,则需要将此设置与 Poller 的触发器协调,如下例所示:<int:inbound-channel-adapter/><expression/><script/>Resourcelocationrefresh-check-delayspring-doc.cn

<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller max-messages-per-poll="1" fixed-delay="5000"/>
    <script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>

另请参阅 when using the sub-element 上的属性。 有关表达式的更多信息,请参阅 Spring 表达式语言 (SpEL)。 有关脚本,请参阅 Groovy 支持脚本支持cacheSecondsReloadableResourceBundleExpressionSource<expression/>spring-doc.cn

() 是一个端点,它通过定期触发以轮询某些底层 . 由于在轮询时没有消息对象,因此表达式和脚本无权访问 root ,因此大多数其他消息传递 SpEL 表达式中没有可用的有效负载或标头属性。 该脚本可以生成并返回带有 Headers 和 payload 的完整对象,也可以只生成并返回一个 payload,该 payload 由框架添加到具有基本 Headers 的消息中。<int:inbound-channel-adapter/>SourcePollingChannelAdapterMessageSourceMessageMessage

消息桥

消息桥是连接两个消息通道或通道适配器的相对简单的端点。 例如,您可能希望将 a 连接到 a,以便订阅终端节点不必担心任何轮询配置。 相反,消息桥提供轮询配置。PollableChannelSubscribableChannelspring-doc.cn

通过在两个通道之间提供中间轮询器,您可以使用消息传递桥来限制入站消息。 Poller 的触发器确定消息到达第二个通道的速率,而 Poller 的属性对吞吐量实施限制。maxMessagesPerPollspring-doc.cn

消息桥的另一个有效用途是连接两个不同的系统。 在这种情况下, Spring 集成的作用仅限于在这些系统之间建立连接并在必要时管理 Poller。 在两个系统之间至少有一个转换器,以便在它们的格式之间进行转换,这可能更常见。 在这种情况下,通道可以作为转换器端点的 'input-channel' 和 'output-channel' 提供。 如果不需要数据格式转换,则消息桥可能确实足够了。spring-doc.cn

使用 XML 配置 Bridge

您可以使用 element is 在两个消息通道或通道适配器之间创建消息桥。 为此,请提供 and 属性,如下例所示:<bridge>input-channeloutput-channelspring-doc.cn

<int:bridge input-channel="input" output-channel="output"/>

如上所述,消息桥的一个常见用例是将 连接到 . 在执行此角色时,消息传送桥还可以用作限制器:PollableChannelSubscribableChannelspring-doc.cn

<int:bridge input-channel="pollable" output-channel="subscribable">
     <int:poller max-messages-per-poll="10" fixed-rate="5000"/>
 </int:bridge>

您可以使用类似的机制来连接通道适配器。 下面的示例显示了 Spring 集成命名空间中 和 adapters 之间的简单“echo”:stdinstdoutstreamspring-doc.cn

<int-stream:stdin-channel-adapter id="stdin"/>

 <int-stream:stdout-channel-adapter id="stdout"/>

 <int:bridge id="echo" input-channel="stdin" output-channel="stdout"/>

类似的配置适用于其他(可能更有用的)Channel Adapter 桥接,例如文件到 JMS 或邮件到文件。 接下来的章节将介绍各种通道适配器。spring-doc.cn

如果未在桥接上定义 'output-channel',则使用入站消息提供的回复通道(如果可用)。 如果 output 和 reply channel 都不可用,则会引发异常。

使用 Java 配置配置 Bridge

以下示例演示如何使用注释在 Java 中配置网桥:@BridgeFromspring-doc.cn

@Bean
public PollableChannel polled() {
    return new QueueChannel();
}

@Bean
@BridgeFrom(value = "polled", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public SubscribableChannel direct() {
    return new DirectChannel();
}

以下示例演示如何使用注释在 Java 中配置网桥:@BridgeTospring-doc.cn

@Bean
@BridgeTo(value = "direct", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public PollableChannel polled() {
    return new QueueChannel();
}

@Bean
public SubscribableChannel direct() {
    return new DirectChannel();
}

或者,您可以使用 ,如下例所示:BridgeHandlerspring-doc.cn

@Bean
@ServiceActivator(inputChannel = "polled",
        poller = @Poller(fixedRate = "5000", maxMessagesPerPoll = "10"))
public BridgeHandler bridge() {
    BridgeHandler bridge = new BridgeHandler();
    bridge.setOutputChannelName("direct");
    return bridge;
}

使用 Java DSL 配置 Bridge

您可以使用 Java 域特定语言 (DSL) 来配置网桥,如下例所示:spring-doc.cn

@Bean
public IntegrationFlow bridgeFlow() {
    return IntegrationFlow.from("polled")
            .bridge(e -> e.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(10)))
            .channel("direct")
            .get();
}