核心信息传递
消息频道
消息频道
虽然消息在封装数据方面扮演着关键角色,它是消息频道这使消息生产者与消息消费者脱钩。
MessageChannel 接口
Spring Integration 的顶层消息频道接口定义如下:
public interface MessageChannel {
boolean send(Message message);
boolean send(Message message, long timeout);
}
发送消息时,返回值为true如果消息成功发送。
如果发送调用超时或中断,则返回false.
Pollable频道
由于消息通道可能或不缓冲消息(如Spring集成概述中所述),两个子接口定义了缓冲(可轮询)和非缓冲(可订阅)通道行为。
以下列表展示了Pollable频道接口:
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
与发送方法类似,接收消息时,超时或中断时返回值为空。
消息通道实现
Spring Integration 提供了不同的消息通道实现。 以下章节简要介绍了每一项。
发布订阅频道
这发布订阅频道实现广播 任意消息发送给所有订阅的处理员。
这通常用于发送事件消息,事件消息的主要作用是通知(与文档消息不同,文档消息通常由单个处理器处理)。
注意发布订阅频道仅供发送。
因为它会直接向用户广播,当它发送(消息)调用方法时,消费者无法轮询消息(它不实现Pollable频道因此 没有接收()方法)。
相反,任何订阅者本身都必须是消息处理器,以及订阅者的handleMessage(消息)方法依次被调用。
在3.0版本之前,调用发送方法发布订阅频道但该节目没有回归任何订阅者false. 与消息模板一个MessageDeliveryException被扔了。
从3.0版本开始,行为发生了变化,导致发送只要至少有最低订阅者(并且成功处理消息),则始终被视为成功。
这种行为可以通过设置min订阅者该属性默认为0.
如果你使用任务执行者,仅使用正确数量的订阅者来判定,因为消息的实际处理是异步完成的。 |
队列通道
这队列通道实现会包裹队列。
与发布订阅频道这队列通道具有点对点语义。
换句话说,即使频道有多个用户,也只有其中一个用户应该接收到任何消息被送到那个频道。
它提供了一个默认的无参数构造子(提供本质上无界的容量Integer.MAX价值)以及接受队列容量的构造器,如下列表所示:
public QueueChannel(int capacity)
尚未达到容量上限的信道会将其内部队列中存储消息,且发送(消息<?>)方法会立即返回,即使没有接收端准备好处理消息。
如果队列已满,发送方会阻塞,直到队列中有空位。
或者,如果你使用带有额外超时参数的发送方法,队列会阻塞,直到任一房间可用或超时时间结束,以先发生者为准。
类似地,a接收()如果队列中有消息可用,呼叫会立即返回;但如果队列为空,则接收呼叫可能会阻塞,直到消息可用或超时(如提供)过去。
无论哪种情况,都可以通过传递超时值为0强制立即返回,不论队列状态。
但请注意,这涉及到以下版本发送()和接收()没有超时参数块无限期。
优先频道
而队列通道强制执行先进先出(FIFO)排序,即优先频道是一种允许信道内根据优先级排序消息的替代实现。
默认情况下,优先级由优先权每个消息中的头部。
然而,对于自定义优先级确定逻辑,有一个类型的比较器Comparator<Message<?>>可以提供给优先频道构造 函数。
会合频道
这会合频道实现了“直接切换”场景,发送方会屏蔽,直到另一方调用该通道接收()方法。
对方会封锁,直到发送方发送消息。
内部实现与队列通道,但其使用了一个同步队列(零容量实现阻塞队列).
这在发送端和接收端在不同线程中运行的情况效果良好,但异步地将消息丢弃在队列中则不合适。
换句话说,具有会合频道,发送方知道某个接收者已接受该消息,而当队列通道该消息将被存储在内部队列中,可能永远不会收到。
请记住,所有这些基于队列的通道默认都是在内存中存储消息。
当需要持久化时,你可以在“队列”元素中提供“消息存储”属性来引用持久化消息商店实现过程中,或者你也可以用由持久代理支持的本地通道替代,比如JMS支持的通道或通道适配器。
后者选项允许你利用任何 JMS 提供商在消息持久化上的实现,详见 JMS 支持。
然而,当不需要在队列中缓冲时,最简单的方法是依赖直达频道,将在下一节讨论。 |
这会合频道也适用于实现请求-回复作。
发送方可以创建一个临时且匿名的实例会合频道然后在构建 时将其设置为“replyChannel”头部消息.
发完后消息发送方可以立即调用收到(可选地提供超时值)以便在等待回复时进行屏蔽消息.
这与 Spring Integration 内部许多请求-回复组件的实现非常相似。
直达频道
这直达频道具有点对点语义,但除此之外更接近发布订阅频道比之前描述的任何基于队列的信道实现都要强大。
它实现了订阅频道接口而非Pollable频道接口,因此它直接向订阅者发送消息。
然而,作为点对点信道,它与发布订阅频道它将每个消息到单一订阅者消息处理器.
除了是最简单的点对点通道选项外,其最重要的特性之一是允许单线程在通道的“两侧”执行作。
例如,如果一个处理器订阅了直达频道,然后发送消息该通道触发调用该处理器的handleMessage(消息)在发送方线程中直接进行方法,在发送()方法调用可以返回。
提供这种行为的信道实现的主要动机是支持必须跨信道的事务,同时仍能利用信道提供的抽象性和松耦合。如果发送()调用是在事务范围内调用的,处理程序调用的结果(例如更新数据库记录)在决定该事务的最终结果(提交或回滚)中起作用。
自从......直达频道是最简单的选项,且不会增加调度和管理轮询器线程所需的额外开销,它是 Spring Integration 中的默认通道类型。基本思路是为应用程序定义通道,考虑哪些需要提供缓冲或限速输入,并修改它们为基于队列可投票频道. 同样,如果一个频道需要广播消息,它也不应该是直达频道而是发布订阅频道. 之后,我们会展示这些通道的配置方式。 |
这直达频道内部委派给消息调度器调用其订阅的消息处理程序,该调度器可以通过以下方式暴露负载均衡策略负载均衡器或负载均衡器-参考属性(互斥)。负载均衡策略被消息调度器用来帮助确定当多个消息处理器订阅同一通道时,消息如何分配到消息处理器之间。为了方便起见,负载均衡器属性 暴露了指向 预先实现的值枚举负载均衡策略. 一个循环赛(轮流负载均衡于处理机之间)没有(对于想要明确禁用负载均衡的情况) 是唯一可用的数值。未来版本可能会添加其他策略实现。不过,自3.0版本起,你可以自行实现负载均衡策略并用以下方式注入负载均衡器-参考属性,应指向实现负载均衡策略,如下示例所示:
一个固定订阅频道是订阅频道只支持一个消息处理器无法取消订阅的用户。这在没有其他用户参与且不需要信道拦截器时的高吞吐量性能场景中非常有用。
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
注意负载均衡器和负载均衡器-参考属性是互斥的。
负载均衡还与布尔值结合使用备援切换财产。 如果备援切换值为真(默认值),当前一个处理器抛出异常时,调度器会退回到后续的处理程序(如有需要)。顺序由处理程序本身定义的可选顺序值决定,若无该值,则由处理程序订阅的顺序决定。
如果在某些情况下,调度器每次发生错误时都必须尝试调用第一个处理程序,然后在每次发生错误时都以相同的固定顺序回退,则不应提供负载均衡策略。换句话说,调度器仍然支持备援切换即使未启用负载均衡,也具有布尔属性。然而,在没有负载均衡的情况下,调用处理程序总是从第一个开始,按照它们的顺序。例如,当对主级、次级、三级等有明确定义时,这种方法效果很好。使用命名空间支持时,次序任意端点的属性决定了顺序。
请记住负载均衡和备援切换仅当一个信道拥有多个订阅消息处理程序时才适用。使用命名空间支持时,这意味着多个端点共享定义在输入通道属性。 |
从5.2版本开始,当备援切换成立时,当前处理程序的失败以及失败消息都会被记录在调试或信息如果分别配置。
执行者频道
这执行者频道是一个点对点信道,支持与 相同的调度器配置直达频道(负载均衡策略和备援切换布尔属性)。
这两种分发信道类型的关键区别在于执行者频道代表到一个实例任务执行者执行派遣任务。
这意味着发送方法通常不会阻塞,但也意味着处理器调用可能不会在发送端线程中发生。
因此,它不支持跨发送方和接收端处理器的事务。
发送方有时会阻挡。
例如,当使用 a任务执行者带有拒绝策略,会限制客户端(例如ThreadPoolExecutor.CallerRunsPolicy),发送端线程可以在线程池达到最大容量且执行者工作队列满时执行该方法。
由于这种情况只会以不可预测的方式发生,你不应依赖它来进行交易。 |
分区通道
从6.1版本开始,分区通道提供实现。
这是 的扩展摘要执行者通道代表点对点调度逻辑,实际消耗由发送到该信道的消息计算的分区键决定在特定线程上进行。
该信道类似于执行者频道但不同的是,使用相同分区键的消息总是在同一线程中处理,保持顺序。
它不需要外部接口任务执行者但可以通过自定义配置线程工厂(例如:Thread.ofVirtual().name(“partition-”, 0).factory()).
该工厂用于将单线程执行程序填充到消息调度器每个分区委托。
默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID消息头作为分区键使用。
该通道可以配置为简单的豆子:
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
该频道将拥有3分区——专用线程;将使用partitionKey用头来确定消息将在哪个分区处理。
看分区通道欲了解更多信息,请参阅 Javadocs 类。
流信息频道
这流信息频道是org.reactivestreams.出版商实现“沉没”向内部发送消息reactor.core.publisher.Flux供下游反应型用户按需消费。
该信道实现既不是订阅频道,也不是Pollable频道,所以只有org.reactivestreams.Subscriber实例可以用来利用该通道的能量,以尊重反应流的背压特性。
另一方面,流信息频道实现 aReactiveStreamsSubscribeableChannel(可订阅频道)其subscribeTo(Publisher<Message<?>>)契约允许接收来自响应式源发布者的事件,将响应式流桥接进集成流程。
为了实现整个积分流的完全反应行为,必须在流中所有端点之间放置这样的通道。
有关与反应流交互的更多信息,请参见反应流支持。
有范围的通道
Spring Integration 1.0 提供了线程本地频道但自2.0版本起已移除。
现在处理相同要求的更通用方法是添加一个范围归因于频道。
属性的值可以是上下文中可用的作用域名称。
例如,在网页环境中,某些作用域可用,任何自定义作用域实现都可以在上下文中注册。
以下示例展示了对通道应用线程本地作用域,包括对该作用域本身的注册:
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
前述示例中定义的通道也委托到内部的队列,但通道绑定到当前线程,因此队列内容也同样绑定。
这样,发送到通道的线程以后可以接收到相同的消息,但其他线程无法访问。
虽然线程作用域通道很少被要求使用,但在某些情况下它们仍然有用直达频道实例被用于强制执行单一线程作,但任何回复消息都应发送到“终端”通道。
如果该终端通道是线程作用波,原始发送线程可以从终端通道收集回复。
现在,由于任何通道都可以有作用域,你可以在线程本地之外,定义自己的作用域。
通道拦截器
消息架构的一个优势是能够以非侵入式方式提供通用行为并捕获通过系统传递的消息的有意义信息。由于消息实例发送和接收消息频道这些信道为拦截发送和接收作提供了机会。 这通道拦截者策略界面(如下列表所示)为这些作提供了方法:
public interface ChannelInterceptor {
Message<?> preSend(Message<?> message, MessageChannel channel);
void postSend(Message<?> message, MessageChannel channel, boolean sent);
void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);
boolean preReceive(MessageChannel channel);
Message<?> postReceive(Message<?> message, MessageChannel channel);
void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}
实现接口后,注册拦截器与信道的作只需进行以下调用:
channel.addInterceptor(someChannelInterceptor);
返回消息实例可用于变换消息或者可以返回“null”以阻止进一步处理(当然,任何方法都可能抛出运行异常). 另外,还有预接收方法可以返回false以防止接收作继续。
请记住接收()调用仅在可投票频道. 事实上,订阅频道接口甚至没有定义接收()方法。 原因在于当消息被发送到订阅频道,根据频道类型,直接发送给零个或多个用户(例如, 一个发布订阅频道发送给所有订阅者)。因此,preReceive(...),postReceive(...)和afterReceiveCompletion(...)拦截器方法仅在被应用到一个Pollable频道. |
由于很少需要实现所有拦截方法,接口提供无作方法(返回无效方法没有代码,消息- 返回方法 返回消息原样,以及布尔方法返回true).
拦截器方法的调用顺序取决于信道类型。如前所述,基于队列的信道是唯一能够接收()方法首先被拦截。此外,发送和接收拦截的关系取决于发送端和接收端线程的时序。例如,如果接收端在等待消息时已被阻塞,顺序可能如下:preSend,预接收,postReceive,postSend. 然而,如果接收方在发送方在信道上放置消息并已返回后进行轮询,顺序如下:preSend,postSend(时间过时),预接收,postReceive. 在这种情况下,经过的时间取决于多种因素,因此通常不可预测(实际上,接收可能永远不会发生)。队列的类型也起着作用(例如,会合时间与优先级)。简而言之,你不能仅凭顺序来判断preSend之前postSend和预接收之前postReceive. |
从 Spring Framework 4.1 和 Spring Integration 4.1 开始,通道拦截者提供新方法:afterSendCompletion()和afterReceiveCompletion(). 它们在之后被调用send()' 和 'receive()调用,无论是否触发异常,允许资源清理。注意,通道调用这些方法在通道拦截者列表按首字母的倒序排列preSend()和preReceive()调用。
从版本5.1开始,全局信道拦截器适用于动态注册信道——例如通过beanFactory.initializeBean()或集成流上下文当使用 Java DSL 时。此前,在应用上下文刷新后创建 Beans 时,不会应用拦截器。
另外,从5.1版本开始,ChannelInterceptor.postReceive()当未收到消息时不再调用;不再需要检查零 留言<?>. 之前,该方法被调用过。如果你有一个依赖前述行为的拦截器,实现afterReceiveCompleted()而是因为无论消息是否被接收,都会调用该方法。
从5.2版本开始,频道拦截者意识被弃用,取而代之的是拦截频道来自Spring Messaging模块,现已扩展以实现向后兼容。 |
消息模板
当端点及其各种配置选项被引入时,Spring Integration为消息组件提供了基础,使消息系统能够非侵入式调用你的应用代码。
不过,有时需要从你的应用码中调用消息系统。
为了方便实现此类用例,Spring Integration 提供了消息模板支持消息通道内的多种作,包括请求和回复场景。
例如,可以发送请求并等待回复,具体如下:
MessagingTemplate template = new MessagingTemplate();
Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));
在前面的例子中,模板内部会创建一个临时匿名通道。 模板上也可以设置“sendTimeout”和“receiveTimeout”属性,并且支持其他交换类型。 以下列表展示了此类方法的签名:
public boolean send(final MessageChannel channel, final Message<?> message) { ...
}
public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}
public Message<?> receive(final PollableChannel<?> channel) { ...
}
一种侵入性更低的方法,允许你调用带有有效载荷或头部值的简单接口,而不是消息实例描述为这时GatewayProxyFactoryBean. |
配置消息通道
要创建消息通道实例,你可以使用<频道/>元素用于XML或直达频道Java配置实例,具体如下:
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
当你使用<频道/>没有任何子元素,它生成一个直达频道实例(a订阅频道).
要创建发布-订阅频道,请使用<发布-订阅-频道/>元素(该发布订阅频道在爪哇语中),具体如下:
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
你也可以选择多种<队列/>子元素用于创建任意可轮询的信道类型(如消息信道实现中描述)。
以下章节展示了每种信道类型的示例。
直达频道配置
如前所述,直达频道是默认类型。
以下列表展示了谁应该定义该职位:
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
默认信道具有轮询负载均衡器,并且启用故障切换(参见直达频道更多细节)。
要禁用其中一个或两个,请添加一个<调度员/>子元素(a负载均衡策略构造者直达频道)并配置属性如下:
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
数据类型通道配置
有时,消费者只能处理特定类型的负载,迫使你确保输入消息的负载类型。 我首先想到的可能是使用消息过滤器。 然而,该消息过滤器只能过滤掉不符合消费者需求的消息。 另一种方法是使用基于内容的路由器,将不合规数据类型的消息路由到特定的变换器,以强制转换和转换到所需的数据类型。 这确实可行,但更简单的方法是应用数据类型通道模式。 你可以为每个特定的有效载荷数据类型使用不同的数据类型通道。
要创建只接受包含某一有效载荷类型的消息的数据类型通道,请在通道元素中提供该数据类型的完全限定类名称数据类型属性,如下例所示:
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
注意,类型检查能通过任何可分配给通道数据类型的类型。
换句话说,数字频道在前述示例中,将接受有效载荷为java.lang.整数或java.lang.Double.
多种类型可以作为逗号分隔列表提供,如下示例所示:
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
因此,前述示例中的“numberChannel”只接受数据类型为java.lang.Number.
但如果消息的有效载荷不是所需类型,会发生什么?
这取决于你是否定义了一个名为integrationConversionService这是Spring的转换服务的一个实例。
如果不行,则例外会立即被抛弃。
然而,如果你定义了integrationConversionServiceBEAN,它用于尝试将消息的有效载荷转换为可接受的类型。
你甚至可以注册自定义转换器。
例如,假设你发送一条包含字符串有效载荷传输到我们上面配置的“numberChannel”。
你可以这样处理这条信息:
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常,这会是完全合法的作。 然而,由于我们使用Datatype Channel,这种作的结果会产生类似以下异常:
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
例外发生是因为我们要求有效载荷类型为数但我们发了字符串.
所以我们需要一个东西来转换字符串转给数.
为此,我们可以实现类似以下示例的转换器:
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
然后我们可以将其注册为积分转换服务的转换器,如下示例所示:
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
或者在字符串到整数转换器当它被标记为@Component自动扫描注释。
当解析“转换器”元素时,它生成了integrationConversionService如果还没有定义的话,那就是豆子。
有了那个转换器,那个发送作将成功,因为数据类型通道利用该转换器进行转换字符串有效载荷到整数.
有关有效载荷类型转换的更多信息,请参见有效载荷类型转换。
从4.0版本开始,integrationConversionService由默认数据类型通道消息转换器,它在应用上下文中查找转换服务。
要使用不同的转换技术,你可以指定消息转换器在频道上有属性。
这一定是对消息转换器实现。
只有发件消息采用方法。
它为转换器提供了对消息头部的访问(以防转换过程中可能需要从头部获取信息,例如内容类型).
该方法只能返回已转换的有效载荷或完整的有效载荷消息对象。
如果是后者,转换器必须小心复制所有入站消息的头部。
或者,你可以声明<豆/>类型消息转换器其标识为数据类型通道消息转换器该转换器被所有具有数据类型.
队列通道配置
要创建一个队列通道,使用<队列/>子元素。
您可以指定频道容量如下:
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
如果你没有为“容量”属性提供值,那就要说明<队列/>子元素,生成的队列是无界的。
为避免内存不足等问题,我们强烈建议你为有界队列设置一个显式值。 |
持续队列通道配置
自队列通道它提供了缓存消息的能力,但默认仅在内存中进行,同时引入了在系统故障时消息可能丢失的可能性。
为了降低这种风险,一个队列通道可能由持久实现支持,MessageGroupStore策略界面。
欲了解更多详情MessageGroupStore和消息商店,请参见消息商店。
这能力当消息存储属性被使用。 |
当队列通道获得消息,它将消息添加到消息存储中。
当消息是从一个队列通道,它会从消息存储中移除。
默认情况下,队列通道将消息存储在内存队列中,这可能导致前述的消息丢失情景。
然而,Spring 集成提供了持久存储,例如Jdbc频道消息存储.
你可以为任何配置消息存储队列通道通过添加消息存储属性,如下例所示:
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(见下方示例,了解 Java/Kotlin 配置选项。)
Spring Integration JDBC 模块还为多种流行数据库提供了模式数据定义语言(DDL)。
这些模式位于该模块的 org.springframework.integration.jdbc.store.channel 包中(Spring-integration-JDBC).
一个重要特性是,对于任何事务型持久存储(例如Jdbc频道消息存储),只要轮询器配置好交易,从存储中移除的消息只有在交易成功完成后才能永久移除。
否则,交易会回滚,且消息没有丢失。 |
随着越来越多的 Spring 项目与“NoSQL”数据存储相关的支持,消息存储的许多其他实现也随之出现。
你也可以提供你自己的实现MessageGroupStore如果找不到满足你具体需求的,可以直接做Interface。
自4.0版本起,我们建议队列通道实例配置为使用频道信息存储如果可能的话。这些通常针对这种用途进行了优化,相较于通用消息存储。如果频道信息存储是频道优先信息存储消息以FIFO顺序按优先级顺序接收。优先级的概念由消息存储的实现决定。例如,以下示例展示了MongoDB通道消息存储的Java配置:
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlow.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
注意消息组队列类。 那是阻塞队列实现以使用MessageGroupStore操作。 |
另一个定制选项队列通道环境由裁判属性<智:队列>子元素或其特定构造子。该属性提供对任意java.util.Queue实现。 例如,Hazelcast 分布式IQue可以配置如下:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
发布订阅频道配置
要创建一个发布订阅频道,使用<发布-订阅频道/>元素。使用该元素时,您还可以指定任务执行者用于发布消息(若未指定,则发布在发送方线程中),具体如下:
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
如果你在下游提供重序列器或聚合器发布订阅频道你可以在通道上设置“应用序列”属性为true. 这样做表示通道应设置序列大小和序列号消息头以及传递消息前的相关ID。例如,如果有五个订阅者,则序列大小将被设置为5,消息则会序列号头部值范围为1自5.
同时执行者,你也可以配置错误处理程序. 默认情况下,发布订阅频道使用消息发布错误处理实现时发送错误到消息频道来自errorChannel标题或进入全局errorChannel实例。 如果执行者未配置,错误处理程序被忽略,异常会直接抛入调用者的线程。
如果你提供重序器或聚合下游发布订阅频道你可以在通道上设置“应用序列”属性为true. 这样做意味着信道应在传递消息前设置序列大小和序列号头部以及相关ID。例如,如果有五个订阅者,序列大小将设置为5,消息的序列号头值为1自5.
以下示例展示了如何设置应用序列头部 至true:
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
这应用序列值为false默认情况下,发布-订阅通道可以向多个出站通道发送完全相同的消息实例。由于 Spring Integration 强制有效载荷和头部引用的不可变性,当标志设置为true,信道生成新的消息具有相同有效载荷引用但头部值不同的实例。 |
从5.4.3版本开始,发布订阅频道也可以配置为要求订阅者其选项广播调度员以表明该频道在没有订阅者时不会无声无视消息。 一个MessageDispatchingException其中调度员没有用户当没有订阅者时,会抛出消息,且该选项设置为true.
执行者频道
要创建一个执行者频道,添加<调度员>具有 的子元素任务执行者属性。 该属性的值可以引用任意任务执行者在上下文中。例如,这样做可以配置线程池,用于向订阅的处理器发送消息。如前所述,这样做会破坏发送端和接收端之间的单线程执行上下文,使得任何活跃的事务上下文不会被处理程序调用共享(即处理程序可以抛出例外,但发送调用已经成功返回)。以下示例展示了如何使用调度元素 并指定执行者任务执行者属性:
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
|
这
|
优先频道配置
要创建一个优先频道,使用<优先队列/>子元素,如下示例所示:
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
默认情况下,频道会查询优先权消息的头部。
不过,你也可以选择定制比较仪参考。
另外,请注意优先频道(和其他类型一样)确实支持数据类型属性。
与队列通道,它还支持能力属性。
以下示例展示了所有这些:
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
会合频道配置
一个会合频道当队列子元素是 时创建的<集合队列>.
它不提供任何额外的配置选项,且其队列不接受任何容量值,因为它是一个零容量的直接切换队列。
以下示例展示了如何声明会合频道:
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
通道拦截器配置
消息信道也可能包含拦截器,如信道拦截器中所述。
这<拦截机/>子元素可以添加到<频道/>(或者更具体的元素类型)。
你可以提供裁判属性以引用任何实现通道拦截者界面,如下示例所示:
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
一般来说,我们建议在独立位置定义拦截器实现,因为它们通常提供可在多个信道间重复使用的共同行为。
全局通道拦截器配置
信道拦截器为每个信道应用交叉截切行为提供了一种简洁简洁的方法。 如果在多个信道上也应用相同的行为,为每个信道配置同一套拦截器并不是最高效的方式。 为了避免重复配置,同时允许拦截器应用于多个信道,Spring Integration提供了全局拦截器。 请考虑以下两个例子:
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
每<通道拦截器/>元素允许你定义一个全局拦截器,该拦截器应用在所有符合由模式属性。
在前述情况下,全局拦截器应用在“thing1”信道以及所有以“thing2”或“input”开头的信道上,但不应用于以“thing3”开头的信道(自版本5.0起)。
将这种语法加入模式会带来一个可能(虽然可能性不大)的问题。
如果你有一颗叫做!thing1你包含了一个模式!thing1在你的频道拦截机模式模式,已经不匹配了。
现在图案与所有未命名的豆子相匹配东西1.
在这种情况下,你可以跳出!在与 的模式中。
图案\\!事情1匹配一个名为!thing1. |
顺序属性允许你管理在同一信道上有多个拦截器时,该拦截器被注入的位置。 例如,信道“inputChannel”可以在本地配置独立拦截器(见下文),如下示例所示:
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
一个合理的问题是:“全局拦截器是如何与本地配置的其他拦截器相较的,或者通过其他全局拦截器定义进行注入?”
当前实现提供了定义拦截器执行顺序的简单机制。
在次序属性保证拦截器在现有拦截器之后注入,负数则表示拦截器在现有拦截器之前注入。
这意味着在前述例子中,全局拦截器在(因为其次序大于0)本地配置的“窃听”拦截器。
如果存在另一个全局拦截器与匹配的模式其阶数通过比较两个拦截器的数值确定次序属性。
要在现有拦截器之前注入全局拦截器,使用负值次序属性。
注意次序和模式属性是可选的。
默认值次序将为0,且模式,默认为“*”(用于匹配所有通道)。 |
窃听
如前所述,Spring Integration 提供了一个简单的窃听器。
你可以在<拦截机/>元素。
这样做对调试尤其有用,可以与 Spring Integration 的日志通道适配器配合使用,具体如下:
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
“logging-channel-adapter”还接受“expression”属性,以便你可以根据“payload”和“headers”变量评估SpEL表达式。
或者,记录完整消息toString()结果,给出一个值true用于“log-full-message”属性。
默认情况下,它是false这样只记录有效载荷。
设置为true除了有效载荷外,还支持对所有头部进行日志记录。
“表达式”选项提供了最大的灵活性(例如,expression=“payload.user.name”). |
关于线控及其他类似组件(消息发布配置)的一个常见误解是它们本质上是自动异步的。 默认情况下,线路分流器作为组件不会异步调用。 相反,Spring Integration专注于配置异步行为的统一方法:消息通道。 使消息流某些部分是同步或异步的,取决于该流中配置的消息通道类型。 这是消息通道抽象的主要优势之一。 自框架诞生以来,我们一直强调信息渠道作为框架一等公民的必要性和价值。 它不仅仅是EIP模式的内部隐式实现。 它作为一个可配置组件完全向终端用户开放。 因此,窃听组件仅负责执行以下任务:
-
通过接入通道拦截消息流(例如,
频道A) -
抓取每条消息
-
将消息发送到另一个信道(例如,
频道B)
它本质上是桥式模式的变体,但封装在通道定义中(因此更容易启用和禁用而不破坏流)。 此外,与桥接器不同,它基本上是分叉了另一条消息流。 这是同步流还是异步流?答案取决于“channelB”属于哪种消息通道。 我们有以下选项:直接通道、可轮询通道和执行通道。 后两者突破线程边界,使得此类信道上的通信异步,因为该信道向其订阅处理程序发送消息的分发发生在与发送消息的线程不同的线程上。 这就是让你的线接流成为同步或异步的原因。 它与框架内的其他组件(如消息发布器)保持一致,通过避免你提前担心(除了编写线程安全代码外)某段代码应当实现同步还是异步,从而增加了一致性和简洁性。 两段代码(比如组件A和组件B)在消息通道上的实际连接,决定了它们的协作是同步或异步的。 你未来甚至可能想从同步切换到异步,消息通道让你可以快速切换,而无需作代码。
关于窃听的最后一点是,尽管上述理由说明默认不异步,但通常最好尽快将消息传递。 因此,使用异步信道选项作为窃听器的出站信道是相当常见的做法。 然而,异步行为并非默认强制执行。 如果我们这样做,会有许多用例出现问题,包括你可能不想打破事务边界。 也许你使用窃听模式进行审计,并且希望审计消息能在原始交易中发送。 举个例子,你可以把线路分接器连接到JMS的外呼通道适配器。 这样你就能兼顾两全:1)JMS消息的发送可以在事务中进行,同时2)仍是“发射后遗忘”作,从而避免主消息流中出现明显延迟。
从4.0版本开始,当拦截器(例如,窃听类)引用了一个频道。
你需要排除这些频道,不包括当前拦截器拦截的通道。
这可以通过适当的模式或程序化实现。
如果你有自定义通道拦截者这涉及一个渠道,考虑实现否决能力拦截者.
这样,框架会根据提供的模式询问拦截者是否可以拦截每个候选信道。
你也可以在拦截器方法中添加运行时保护,确保该信道不会被拦截器引用。
这窃听同时使用这两种技巧。 |
从4.3版本开始,窃听有额外的构造子,取频道名称而不是消息频道实例。
这对 Java 配置和使用通道自动创建逻辑时非常方便。
目标消息频道豆子由提供的频道名称后来,在与
拦截 器。
信道分辨率需要豆子工厂,因此线接实例必须是Spring管理的豆子。 |
这种晚绑定方法还允许简化典型的 Java DSL 配置窃听模式,如下示例所示:
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
条件线路分流器
通过使用选择器或选择表达式属性。
这选择器参考文献消息选择器BEAN,可以在运行时决定消息是否应进入分接通道。
同样,选择表达式是一个布尔SpEL表达式,具有相同目的:如果该表达式值为true,消息被发送到分接信道。
全局线分流配置
可以将全局窃听器配置配置为全局通道拦截器配置的特殊情况。
为此,需要配置一个顶层窃听元素。
现在,除了平常之外窃听命名空间支持,模式和次序属性得到支持,工作方式与通道拦截器.
以下示例展示了如何配置全局窃听器:
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局线分流器提供了一种方便的方式,可以在外部配置单信道窃听器,而无需修改现有信道配置。
为此,设置模式归属于目标频道名称。
例如,你可以利用该技术配置测试用例以验证信道上的消息。 |
特别频道
在应用上下文中默认定义了两个特殊通道:errorChannel和零通道.
“nullChannel”(一个零通道) 表现为/dev/null,记录发送到它的任何消息调试水平,立即返回。
特殊处理适用于org.reactivestreams.出版商传输消息的有效载荷:该信道立即订阅该信道,以启动响应式流处理,尽管数据会被丢弃。
响应式流处理抛出的错误(参见Subscriber.onError(可投掷))记录在警告为可能的调查准备。
如果需要对此类错误采取任何处理,则ReactiveRequestHandlerAdvice其中Mono.doOnError()可以对生成消息的处理程序进行定制化单回复这条零通道.
当你遇到不关心的回复信道分辨率错误时,可以设置受影响组件的输出通道属性为“nullChannel”(名称“nullChannel”在应用上下文中保留)。
“errorChannel”内部用于发送错误消息,可以通过自定义配置覆盖。 这在错误处理中有更详细的讨论。
另请参阅 Java DSL 章节中的消息通道,了解更多关于消息通道和拦截器的信息。
轮询器
本节介绍了春季整合中的民调工作原理。
民调消费者
当消息端点(通道适配器)连接到通道并实例化时,它们会产生以下实例之一:
实际实现取决于这些端点连接的信道类型。
一个通道适配器连接到实现org.springframework.messaging.SubscribableChannel接口生成一个实例EventDrivenConsumer.
另一方面,连接到实现org.springframework.messaging.PollableChannel接口(例如一个队列通道) 产生一个实例民调消费者.
轮询消费者让 Spring Integration 组件主动轮询消息,而非以事件驱动的方式处理消息。
它们在许多信息传递场景中是一个关键的跨领域关注点。 在春季整合中,轮询消费者基于同名模式,该模式见Gregor Hohpe和Bobby Woolf合著的《企业集成模式》一书。 你可以在书的网站上找到该图案的描述。
可轮询消息源
春季集成提供了第二种民调消费者模式的变体。
当使用入站通道适配器时,这些适配器通常会被SourcePollingChannelAdapter.
例如,当从远程FTP服务器位置获取消息时,FTP入站通道适配器中描述的适配器配置了轮询器,定期检索消息。
因此,当组件配置轮询器时,生成的实例类型如下:
这意味着轮询器既可用于入站消息,也可用于出站消息场景。 以下是轮询器使用的一些用例:
-
轮询某些外部系统,如FTP服务器、数据库和Web服务
-
内部(可轮询)消息通道轮询
-
轮询内部服务(例如反复执行 Java 类的方法)
AOP建议类可以应用于轮询者,在咨询链,例如交易建议以启动交易。
从4.1版本开始,PollSkipAdvice提供。
投票者使用触发器来确定下一次投票的时间。
这PollSkipAdvice可以用来抑制(跳过)轮询,可能是因为存在某些下游条件阻止消息被处理。
要使用这个建议,你必须提供一个实现投票跳过策略.
从版本 4.2.5 开始,简单投票跳过策略提供。
使用时,你可以将一个实例作为 bean 添加到应用上下文中,注入到PollSkipAdvice,并将其加入民调者的建议链中。
要跳过投票,请致电跳过投票().
要继续投票,请致电重置().
4.2版本在这方面增加了更多灵活性。
关于消息源,请参见条件轮询器。 |
延迟确认可投票消息源
从5.0.1版本开始,某些模块提供消息源支持延迟确认直到下游流程完成(或将消息交给其他线程)的实现。
目前这仅限于Amqp消息源以及卡夫卡消息源.
有了这些消息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK标题(参见消息头访问器应用程序接口)被添加到消息中。
当与可轮询消息源一起使用时,头部的值是致谢回应,如下示例所示:
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如,卡夫卡消息源)支持该拒绝地位。
它被视为接受.
应用程序可以随时确认消息,如下示例所示:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果消息源接线到一个SourcePollingChannelAdapter当轮询线程在下游流完成后返回适配器时,适配器会检查确认是否已被确认,如果没有,则将其状态设置为接受它(或拒绝如果流抛出异常)。
状态值定义在致谢回电。状态列举.
Spring Integration 提供消息来源投票模板对 a 进行临时轮询消息源.
这同样负责设置接受或拒绝在致谢回应当消息处理器回调返回(或抛出异常)。
以下示例展示了如何使用消息来源投票模板:
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
无论哪种情况(SourcePollingChannelAdapter和消息来源投票模板),你可以通过调用来禁用自动确认/导航noAutoAck()关于复试。
如果你把消息交给其他帖子,之后想回复,可以这样做。
并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移提交必须在同一线程上执行)。
消息源的条件轮询器
本节介绍如何使用条件轮询器。
背景
建议对象,在一个咨询链在轮询器上,建议整个轮询任务(包括消息检索和处理)。
这些“周围建议”方法无法访问民调的任何上下文——只能访问民调本身。
这对于诸如将任务设置为事务性或因外部条件跳过轮询等需求,如前所述,是合适的。
如果我们希望根据结果采取某些行动,收到是投票的一部分,还是根据条件调整投票器?对于这些情况,Spring Integration提供了“智能”投票。
“聪明”民调
5.3 版本引入了接收信息建议接口。
任何建议在咨询链实现该接口的 仅应用于接收()操作-MessageSource.receive()和PollableChannel.receive(超时).
因此,它们只能应用于SourcePollingChannelAdapter或民调消费者.
此类类实现以下方法:
-
beforeReceive(对象源)该方法在Object.receive()方法。 它允许你检查并重新配置源。 返回false取消该轮询(类似于PollSkipAdvice前面提到过)。 -
Message<?> afterReceive(Message<?> 结果,对象源)该方法以接收()方法。 同样,你可以重新配置源代码或采取任何作(可能取决于结果,结果可能包括零如果源头没有发出任何信息)。 你甚至可以回复不同的消息
|
螺纹安全
如果 |
|
建议链排序
你应该了解初始化过程中建议链是如何处理的。 |
简单活跃Idle接收消息建议
这个建议是一个简单的实现接收信息建议. 与动态周期触发器它根据上一次投票是否产生消息,调整轮询频率。轮询器还必须有相同的引用动态周期触发器.
|
重要:异步切换
简单活跃Idle接收消息建议根据接收()结果。 这只有在轮询器线程中调用建议时才有效。如果轮询器有任务执行者. 如果你想在轮询结果后使用异步作,建议稍后进行异步切换,可能使用执行者频道. |
CompoundTriggerAdvice
该建议允许根据轮询是否返回消息,选择两个触发器之一。考虑一个轮询器使用克隆触发器.克隆触发器实例是不可变的,因此一旦构建完毕,就无法更改。考虑一个用例,我们想用cron表达式每小时触发一次轮询,但如果没有收到消息,则每分钟轮询一次,当检索到消息时,恢复使用cron表达式。
建议(和轮询器)使用了一个复合触发器为此目的。触发器是主要触发器可以是克隆触发器. 当建议检测到未收到消息时,会将次级触发器添加到复合触发器. 当复合触发器实例下一个执行时间调用方法时,如果存在,则委派给次级触发器。否则,它委派给主触发器。
轮询器还必须有相同的引用复合触发器.
下例展示了每分钟退回的每小时cron表达式的配置:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
|
重要:异步切换
CompoundTriggerAdvice根据接收()结果。 这只有在轮询器线程中调用建议时才有效。如果轮询器有任务执行者. 如果你想在轮询结果后使用异步作,建议稍后进行异步切换,可能使用执行者频道. |
仅限MessageSource的建议
有些建议可能仅适用于MessageSource.receive()而且它们对Pollable频道. 为此,一个消息源变异器接口(是接收信息建议)仍然存在。更多信息请参见“入站通道适配器:轮询多台服务器和目录”。
通道适配器
通道适配器是一种消息端点,使单个发送者或接收者能够连接到消息通道。Spring Integration 提供多种适配器,支持各种传输方式,如 JMS、文件、HTTP、Web 服务、邮件等。本指南后续章节将讨论每个适配器。不过,本章重点关注简单但灵活的方法调用通道适配器支持。适配器既有入站适配器,也有出站适配器,每个适配器都可以配置核心命名空间中的 XML 元素。只要你有一个可以作为源或目的地调用的方法,这些适配器为扩展 Spring 集成提供了便捷的方式。
配置入站通道适配器
一入站信道适配器元素(aSourcePollingChannelAdapter在 Java 配置中,可以调用 Spring 管理对象上的任何方法,并向消息频道在将方法输出转换为消息. 当适配器的订阅被激活时,轮询器会尝试接收源端的消息。轮询器会以任务调度器根据提供的配置。要配置单个通道适配器的轮询间隔或cron表达式,可以提供一个带有调度属性的“poller”元素,如“固定速率”或“cron”。以下示例定义了两个入站信道适配器实例:
@Bean
public IntegrationFlow source1() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.fixedRate(5000)))
...
.get();
}
@Bean
public IntegrationFlow source2() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
...
.get();
}
public class SourceService {
@InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
Object method1() {
...
}
@InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
Object method2() {
...
}
}
@Bean
fun messageSourceFlow() =
integrationFlow( { GenericMessage<>(...) },
{ poller { it.fixedRate(5000) } }) {
...
}
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>
<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
<int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
另见通道适配器表达式和脚本。
| 如果没有提供轮询器,则必须在上下文中注册一个单一默认轮询器。更多细节请参见端点命名空间支持。 |
|
重要信息:轮询器配置
所有的
在第一种配置中,轮询任务每次轮询调用一次,在每个任务(轮询)中,基于
注意,没有 然而,在 但是,如果你确定方法可以返回 null,并且需要根据每次轮询对尽可能多的来源进行轮询,那么你应该明确设置
从5.5版本开始,a 更多信息请参见全局默认轮询器。 |
配置出站通道适配器
一出站通道适配器元素(a@ServiceActivator对于 Java 配置来说)也可以连接消息频道对任何应与发送到该信道的消息有效载荷一起调用的POJO消费者方法。
以下示例展示了如何定义出站通道适配器:
@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
return f -> f
.handle(myPojo, "handle");
}
public class MyPojo {
@ServiceActivator(channel = "channel1")
void handle(Object payload) {
...
}
}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
integrationFlow {
handle(myPojo, "handle")
}
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>
<beans:bean id="target" class="org.MyPojo"/>
如果被适应的信道是Pollable频道,你必须提供一个轮询子元素(该@Poller关于@ServiceActivator),如下示例所示:
public class MyPojo {
@ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
void handle(Object payload) {
...
}
}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
<int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>
<beans:bean id="target" class="org.MyPojo"/>
你应该用裁判如果POJO消费者实现可以被其他方式重用,属性<出站通道适配器>定义。
然而,如果消费者实现仅由单一定义引用<出站通道适配器>你可以将其定义为内豆,如下示例所示:
<int:outbound-channel-adapter channel="channel" method="handle">
<beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
同时使用裁判属性和内部处理程序定义在同一个<出站通道适配器>不允许配置,因为这会产生歧义。
这种配置会导致抛出异常。 |
任何通道适配器都可以在没有渠道在这种情况下,它隐式地创建了一个实例直达频道.
创建频道名称与身份证属性<入站通道适配器>或<出站通道适配器>元素。
因此,如果渠道不提供,身份证是必需的。
通道适配器表达式与脚本
与许多其他 Spring 集成组件一样,<入站通道适配器>和<出站通道适配器>同时支持SpEL表达式评估。
要使用 SpEL,应在 'expression' 属性中提供表达字符串,而不是提供用于调用 bean 的方法调用的 'ref' 和 'method' 属性。
当一个表达式被评估时,它遵循与方法调用相同的契约,其中:对<入站通道适配器>只要评估结果为非空值,就会生成消息,而<出站通道适配器>必须等同于空回归方法调用。
从 Spring Integration 3.0 开始,一个<int:入站通道适配器/>也可以配置为 SpEL<表达/>(甚至是<脚本/>) 子元素,适用于需要比简单“expression”属性更复杂作时。如果你提供一个脚本作为资源通过使用位置属性,你也可以设置刷新-检查-延迟允许定期刷新资源。如果你希望每次轮询都检查脚本,你需要将此设置与轮询器的触发器协调,如下示例所示:
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller max-messages-per-poll="1" fixed-delay="5000"/>
<script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>
另见缓存秒属性ReloadableResourceBundleExpressionSource当使用<表达/>子元素。有关表达式的更多信息,请参见 Spring 表达式语言(SpEL)。关于脚本,请参见 Groovy 支持和脚本支持
这<int:入站通道适配器/> (SourcePollingChannelAdapter)是一个端点,通过定期触发以轮询某个底层信息来启动消息流消息源. 由于在轮询时没有消息对象,表达式和脚本无法访问根节点消息,因此没有大多数其他消息SpEL表达式中可用的有效载荷或头部属性。脚本可以生成并返回完整的消息带有头部和有效载荷的对象,或者仅有一个有效载荷,框架将有效载入带有基本头部的消息中。 |
消息桥接器
消息桥接是一个相对简单的端点,连接两个消息通道或通道适配器。例如,你可能想连接Pollable频道转给订阅频道这样订阅端点无需担心任何轮询配置。相反,消息桥提供轮询配置。
通过在两个信道之间提供一个中介轮询器,你可以使用消息桥来限制进站消息的进站速度。轮询器的触发器决定消息到达第二个信道的速率,轮询器的maxMessagesPerPoll属性强制执行吞吐量的限制。
消息桥的另一个有效用途是连接两个不同的系统。在这种情况下,Spring Integration的作用仅限于连接这些系统并在必要时管理轮询器。更常见的是两个系统之间至少有一个变换器,用于格式转换。在这种情况下,通道可以作为变换器端点的“输入通道”和“输出通道”提供。如果不需要数据格式转换,消息桥确实可能足够。
用XML配置桥接器
你可以使用<桥>元素用于在两个消息通道或通道适配器之间建立消息桥。为此,提供输入通道和输出通道属性,如下例所示:
<int:bridge input-channel="input" output-channel="output"/>
如上所述,消息桥接的一个常见用例是连接Pollable频道转给订阅频道. 在执行此功能时,消息桥也可以作为节流器:
<int:bridge input-channel="pollable" output-channel="subscribable">
<int:poller max-messages-per-poll="10" fixed-rate="5000"/>
</int:bridge>
你可以使用类似连接通道适配器的机制。以下示例展示了简单的“回声”连接标准和学生来自 Spring Integration 的适配器流Namespace:
<int-stream:stdin-channel-adapter id="stdin"/>
<int-stream:stdout-channel-adapter id="stdout"/>
<int:bridge id="echo" input-channel="stdin" output-channel="stdout"/>
类似配置适用于其他(可能更实用的)通道适配器桥,如文件到JMS或邮件到文件。后续章节将介绍各种通道适配器。
| 如果桥接器上没有定义“输出通道”,则使用入站消息提供的回复通道(如有)。如果既没有输出也没有回复通道,则会抛出异常。 |
用 Java 配置配置桥接器
以下示例展示了如何通过使用@BridgeFrom注解:
@Bean
public PollableChannel polled() {
return new QueueChannel();
}
@Bean
@BridgeFrom(value = "polled", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public SubscribableChannel direct() {
return new DirectChannel();
}
以下示例展示了如何通过使用@BridgeTo注解:
@Bean
@BridgeTo(value = "direct", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public PollableChannel polled() {
return new QueueChannel();
}
@Bean
public SubscribableChannel direct() {
return new DirectChannel();
}
或者,你也可以使用桥接处理者,如下示例所示:
@Bean
@ServiceActivator(inputChannel = "polled",
poller = @Poller(fixedRate = "5000", maxMessagesPerPoll = "10"))
public BridgeHandler bridge() {
BridgeHandler bridge = new BridgeHandler();
bridge.setOutputChannelName("direct");
return bridge;
}