消息端点
本章的第一部分涵盖了一些背景理论,并揭示了相当多的关于驱动 Spring Integration 的各种消息传递组件的底层 API。 如果您想真正了解幕后发生的事情,这些信息可能会有所帮助。 但是,如果您想启动并运行各种元素的简化基于命名空间的配置,请随时跳到 Endpoint Namespace Support (终端节点命名空间支持)。
如概述中所述,消息终端节点负责将各种消息收发组件连接到通道。 在接下来的几章中,我们将介绍许多使用消息的不同组件。 其中一些还能够发送回复消息。 发送消息非常简单。 如前面的 Message Channels 中所示,您可以向消息通道发送消息。 但是,接收稍微复杂一些。 主要原因是有两种类型的使用者:轮询使用者和事件驱动使用者。
在这两者中,事件驱动型消费者要简单得多。
无需管理和调度单独的 Poller 线程,它们本质上是具有回调方法的侦听器。
当连接到 Spring Integration 的可订阅消息通道之一时,这个简单的选项效果很好。
但是,当连接到缓冲的、可轮询的消息通道时,某些组件必须调度和管理轮询线程。
Spring 集成提供了两种不同的端点实现来容纳这两种类型的消费者。
因此,消费者自己只需要实现回调接口即可。
当需要轮询时,终端节点充当使用者实例的容器。
其好处类似于使用容器来托管消息驱动的 bean,但是,由于这些使用者是在ApplicationContext
,它更类似于 Spring 自己的MessageListener
器皿。
消息处理程序
Spring 集成的MessageHandler
interface 由框架中的许多组件实现。
换句话说,这不是公共 API 的一部分,您通常不会实现MessageHandler
径直。
尽管如此,消息使用者使用它来实际处理使用的消息,因此了解此策略接口确实有助于理解使用者的整体角色。
接口定义如下:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管它很简单,但这个接口为以下章节中介绍的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。 这些组件各自对它们处理的消息执行非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。 Spring 集成提供了两个端点实现,它们托管这些基于回调的处理程序,并让它们连接到消息通道。
事件驱动型消费者
因为它是两者中更简单的,所以我们首先介绍事件驱动的使用者终端节点。
您可能还记得,SubscribableChannel
接口提供了一个subscribe()
方法,并且该方法接受MessageHandler
参数(如SubscribableChannel
).
下面的清单显示了subscribe
方法:
subscribableChannel.subscribe(messageHandler);
由于订阅通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的消费者,并且 Spring 集成提供的实现接受SubscribableChannel
以及MessageHandler
,如下例所示:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
轮询消费者
Spring 集成还提供了一个PollingConsumer
,并且可以以相同的方式实例化,只是通道必须实现PollableChannel
,如下例所示:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
轮询使用者还有许多其他配置选项。 以下示例显示如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
这PeriodicTrigger
通常使用简单的区间 (Duration
),但也支持initialDelay
属性和布尔值fixedRate
属性(默认值为false
— 即没有固定延迟)。
以下示例设置这两个属性:
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
前面示例中的三个设置的结果是一个等待 5 秒,然后每秒触发一次的触发器。
这CronTrigger
需要有效的 cron 表达式。
有关详细信息,请参阅 Javadoc。
以下示例将新的CronTrigger
:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上一个示例中定义的触发器的结果是星期一到星期五每 10 秒触发一次的触发器。
轮询终端节点的默认触发器是PeriodicTrigger 具有 1 秒固定延迟周期的实例。 |
除了触发器之外,您还可以指定其他两个与轮询相关的配置属性:maxMessagesPerPoll
和receiveTimeout
.
以下示例说明如何设置这两个属性:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
这maxMessagesPerPoll
属性指定在给定轮询作中要接收的最大消息数。
这意味着 Poller 继续调用receive()
而无需等待,直到null
或达到最大值。
例如,如果 Poller 有一个 10 秒的间隔触发器,并且maxMessagesPerPoll
设置25
,并且它正在轮询队列中有 100 条消息的通道,则可以在 40 秒内检索所有 100 条消息。
它抓取 25 个,等待 10 秒,抓取下一个 25 个,依此类推。
如果maxMessagesPerPoll
配置了负值,则MessageSource.receive()
在单个轮询周期内调用,直到返回null
.
从版本 5.5 开始,一个0
值具有特殊含义 - 跳过MessageSource.receive()
调用,这可能被视为此轮询端点的暂停,直到maxMessagesPerPoll
稍后更改为非零值,例如通过 Control Bus。
这receiveTimeout
property 指定 Poller 在调用 receive作时如果没有可用的消息时应等待的时间。
例如,考虑两个表面上看起来相似但实际上完全不同的选项:第一个选项的间隔触发为 5 秒,接收超时为 50 毫秒,而第二个选项的间隔触发为 50 毫秒,接收超时为 5 秒。
第一个接收消息的时间可能比它在通道上接受的晚 4950 毫秒(如果该消息在其 poll 调用之一返回后立即到达)。
另一方面,第二个配置永远不会错过超过 50 毫秒的消息。
区别在于第二个选项需要线程等待。
但是,因此,它可以更快地响应到达的消息。
这种技术称为 “长轮询”,可用于在轮询源上模拟事件驱动的行为。
轮询消费者也可以委托给 SpringTaskExecutor
,如下例所示:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,PollingConsumer
具有一个名为adviceChain
.
此属性允许您指定List
的 AOP 建议,用于处理其他横切关注点,包括交易。
这些建议围绕doPoll()
方法。
有关更深入的信息,请参阅 Endpoint Namespace Support 下的有关 AOP 建议链和事务支持的部分。
另请参阅@Poller
annotation Javadocs 和相应的 Messaging Annotations Support 部分。
Java DSL 还提供了一个.poller()
endpoint 配置选项及其各自的Pollers
厂。
前面的示例显示了依赖项查找。
但是,请记住,这些使用者通常配置为 Spring bean 定义。
事实上, Spring 集成还提供了一个FactoryBean
叫ConsumerEndpointFactoryBean
,这将根据 Channel 的类型创建适当的 Consumer 类型。
此外,Spring 集成具有完整的 XML 名称空间支持,以进一步隐藏这些细节。
本指南介绍了基于命名空间的配置,因为介绍了每种组件类型。
许多MessageHandler implementations 可以生成回复消息。
如前所述,与接收消息相比,发送消息是微不足道的。
但是,何时发送回复消息以及发送多少回复消息取决于处理程序类型。
例如,聚合器等待大量消息到达,并且通常配置为拆分器的下游使用者,该拆分器可以为其处理的每条消息生成多个回复。
使用 namespace 配置时,您不需要严格了解所有详细信息。
但是,仍然值得知道的是,这些组件中的几个共享一个公共基类AbstractReplyProducingMessageHandler ,并且它提供了一个setOutputChannel(..) 方法。 |
终端节点命名空间支持
在本参考手册中,您可以找到终端节点元素的特定配置示例,例如 router、transformer、service-activator 等。
其中大多数都支持input-channel
属性,并且许多都支持output-channel
属性。
解析后,这些 endpoint 元素会生成PollingConsumer
或EventDrivenConsumer
,具体取决于input-channel
引用:PollableChannel
或SubscribableChannel
分别。
当通道是可轮询的时,轮询行为基于 endpoint 元素的poller
sub-element 及其属性。
下面列出了poller
:
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
initial-delay="" (6)
id="" (7)
max-messages-per-poll="" (8)
receive-timeout="" (9)
ref="" (10)
task-executor="" (11)
time-unit="MILLISECONDS" (12)
trigger=""> (13)
<int:advice-chain /> (14)
<int:transactional /> (15)
</int:poller>
1 | 提供使用 Cron 表达式配置 Poller 的功能。
底层实现使用org.springframework.scheduling.support.CronTrigger .
如果设置了此属性,则不必指定以下任何属性:fixed-delay ,trigger ,fixed-rate 和ref . |
2 | 通过将此属性设置为true ,你可以只定义一个全局默认 Poller。
如果在应用程序上下文中定义了多个默认 Poller,则会引发异常。
连接到PollableChannel (PollingConsumer ) 或任何SourcePollingChannelAdapter 没有显式配置的 Poller 然后使用全局默认 Poller。
它默认为false .
自选。 |
3 | 标识如果此 Poller 的调用失败,则向其发送错误消息的通道。
要完全禁止异常,您可以提供对nullChannel .
自选。 |
4 | 固定延迟触发器使用PeriodicTrigger 在被窝里。
数值为time-unit 或者可以是 duration 格式(从版本 6.2 开始),例如PT10S ,P1D .
如果设置了此属性,则不必指定以下任何属性:fixed-rate ,trigger ,cron 和ref . |
5 | 固定速率触发器使用PeriodicTrigger 在被窝里。
数值为time-unit 或者可以是 duration 格式(从版本 6.2 开始),例如PT10S ,P1D .
如果设置了此属性,则不必指定以下任何属性:fixed-delay ,trigger ,cron 和ref . |
6 | 的PeriodicTrigger Under the covers(从 6.2 版本开始)。
数值为time-unit 或者可以是 duration 格式,例如PT10S ,P1D . |
7 | 引用 poller 的基础 bean 定义的 ID,其类型为org.springframework.integration.scheduling.PollerMetadata .
这id 属性是顶级 Poller 元素所必需的,除非它是默认的 Poller (default="true" ). |
8 | 有关更多信息,请参阅Configuring An Inbound Channel Adapter。
如果未指定,则默认值取决于上下文。
如果您使用PollingConsumer ,则此属性默认为-1 .
但是,如果您使用SourcePollingChannelAdapter 这max-messages-per-poll 属性默认为1 .
自选。 |
9 | 在基础类上设置 ValuePollerMetadata .
如果未指定,则默认为 1000(毫秒)。
自选。 |
10 | Bean 引用另一个顶级 Poller 的 Pod 引用。
这ref 属性不得出现在顶级poller 元素。
但是,如果设置了此属性,则不必指定以下任何属性:fixed-rate ,trigger ,cron 和fixed-delay . |
11 | 提供引用自定义任务执行程序的功能。 有关更多信息,请参阅 TaskExecutor 支持。 自选。 |
12 | 此属性指定java.util.concurrent.TimeUnit enum 值org.springframework.scheduling.support.PeriodicTrigger .
因此,此属性只能与fixed-delay 或fixed-rate 属性。
如果与任一cron 或trigger reference 属性,则会导致失败。
支持的PeriodicTrigger 是毫秒。
因此,唯一可用的选项是毫秒和秒。
如果未提供此值,则任何fixed-delay 或fixed-rate value 被解释为毫秒。
基本上,此枚举为基于秒的 interval 触发器值提供了便利。
对于每小时、每天和每月设置,我们建议使用cron trigger 来代替。 |
13 | 对任何 Spring 配置的 bean 的引用,该 bean 实现了org.springframework.scheduling.Trigger 接口。
但是,如果设置了此属性,则不必指定以下任何属性:fixed-delay ,fixed-rate ,cron 和ref .
自选。 |
14 | 允许指定额外的 AOP 建议来处理其他横切关注点。 有关更多信息,请参阅 交易 。 自选。 |
15 | Poller 可以成为事务性的。 有关更多信息,请参阅 AOP Advice 链。 自选。 |
例子
可以按如下方式配置具有 1 秒间隔的简单基于间隔的 Poller :
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用fixed-rate
属性,您还可以使用fixed-delay
属性。
对于基于 Cron 表达式的轮询器,请使用cron
属性,如下例所示:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是PollableChannel
,则需要 Poller 配置。
具体来说,如前所述,trigger
是PollingConsumer
类。
因此,如果省略poller
子元素,则可能会引发异常。
如果你试图在连接到不可轮询通道的元素上配置 Poller,也可能引发异常。
也可以创建顶级 poller,在这种情况下,只有一个ref
attribute 是必需的,如下例所示:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
这ref 属性只允许在内部 Poller 定义上。
在顶级 Poller 上定义此属性会导致在应用程序上下文初始化期间引发配置异常。 |
全局默认轮询器
为了进一步简化配置,你可以定义一个全局默认 Poller。
XML DSL 中的单个顶级 Poller 组件可能具有default
属性设置为true
.
对于 Java 配置,一个PollerMetadata
bean 的PollerMetadata.DEFAULT_POLLER
在这种情况下,必须声明 name。
在这种情况下,任何具有PollableChannel
对于其 input channel,该通道在同一ApplicationContext
,并且没有明确配置poller
使用该默认值。
下面的示例展示了这样的 poller 和使用它的转换器:
-
Java DSL
-
Java
-
Kotlin DSL
-
XML
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
事务支持
Spring 集成还为 pollers 提供了事务支持,以便每个接收和转发作都可以作为原子工作单元执行。
要为 Poller 配置事务,请添加<transactional/>
sub-元素。
以下示例显示了可用属性:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关更多信息,请参阅 轮询器事务支持。
AOP 建议链
由于 Spring 事务支持依赖于代理机制,因此TransactionInterceptor
(AOP Advice) 处理由 Poller 启动的消息流的事务性行为时,有时必须提供额外的建议来处理与 Poller 关联的其他横切行为。
为此,poller
定义了一个advice-chain
元素,该元素允许您在实现MethodInterceptor
接口。
以下示例演示如何定义advice-chain
对于poller
:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实施MethodInterceptor
接口,请参阅 Spring Framework Reference Guide 的 AOP 部分。
建议链也可以应用于没有任何事务配置的 Poller,从而增强 Poller 启动的消息流的行为。
使用通知链时,<transactional/> 不能指定 child 元素。
相反,声明一个<tx:advice/> bean 并将其添加到<advice-chain/> .
有关完整的配置详细信息,请参阅 Poller Transaction Support 。 |
TaskExecutor 支持
轮询线程可以由 Spring 的TaskExecutor
抽象化。
这将为一个终端节点或一组终端节点启用并发。
从 Spring 3.0 开始,核心 Spring Framework 有一个task
namespace 及其<executor/>
元素支持创建简单的线程池执行程序。
该元素接受常见并发设置的属性,例如 pool-size 和 queue-capacity。
配置线程池执行程序可以对终端节点在负载下的性能产生重大影响。
这些设置适用于每个终端节点,因为终端节点的性能是要考虑的主要因素之一(另一个主要因素是终端节点订阅的通道上的预期卷)。
要为配置了 XML 命名空间支持的轮询终端节点启用并发,请提供task-executor
引用其<poller/>
元素,然后提供以下示例中所示的一个或多个属性:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果您不提供 task-executor,则在调用者的线程中调用使用者的处理程序。
请注意,调用方通常是默认的TaskScheduler
(请参见配置 Task Scheduler)。
您还应该记住,task-executor
属性可以提供对 Spring 的TaskExecutor
接口。
这executor
元素是为了方便起见而提供的。
如前面在轮询使用者的 background 部分中提到的,您还可以以模拟事件驱动行为的方式配置轮询使用者。
在触发器中使用较长的接收超时和较短的间隔,您可以确保对到达的消息做出非常及时的反应,即使在轮询的消息源上也是如此。
请注意,这仅适用于具有超时的阻塞 wait 调用的源。
例如,文件 poller 不会阻塞。
每receive()
call 会立即返回,并且要么包含新文件,要么不包含新文件。
因此,即使 Poller 包含长receive-timeout
,该值永远不会在这种情况下使用。
另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与。
以下示例显示了轮询使用者如何几乎即时地接收消息:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用这种方法不会带来太多开销,因为在内部,它只不过是一个定时等待线程,它不需要像(例如)颠簸、无限 while 循环那样多的 CPU 资源使用。
在运行时更改轮询率
当使用fixed-delay
或fixed-rate
属性,则默认实现使用PeriodicTrigger
实例。
这PeriodicTrigger
是核心 Spring Framework 的一部分。
它仅接受 interval 作为构造函数参数。
因此,无法在运行时更改它。
但是,您可以定义自己的org.springframework.scheduling.Trigger
接口。
您甚至可以使用PeriodicTrigger
作为起点。
然后,您可以为间隔 (period) 添加 setter,甚至可以在触发器本身中嵌入自己的限制逻辑。
这period
属性与每次对nextExecutionTime
以安排下一次轮询。
要在 Poller 中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并使用trigger
属性,该属性引用自定义触发器 Bean 实例。
现在,您可以获取对触发器 Bean 的引用,并更改轮询之间的轮询间隔。
有关示例,请参阅 Spring Integration Samples 项目。
它包含一个名为dynamic-poller
,它使用自定义触发器并演示在运行时更改轮询间隔的功能。
该示例提供了一个自定义触发器,用于实现org.springframework.scheduling.Trigger
接口。
该示例的触发器基于 Spring 的PeriodicTrigger
实现。
但是,自定义触发器的字段不是最终的,并且属性具有显式 getter 和 setter,允许您在运行时动态更改轮询周期。
不过,请务必注意,由于 Trigger 方法是nextExecutionTime() ,则根据现有配置,对动态触发器的任何更改在下次轮询之前不会生效。
无法强制触发器在当前配置的下一次执行时间之前触发。 |
负载类型转换
在本参考手册中,您还可以看到接受消息或任何任意Object
作为输入参数。
在Object
,这样的参数被映射到消息有效负载或有效负载或 Headers 的一部分(当使用 Spring 表达式语言时)。
但是,端点方法的 input 参数类型有时与有效负载或其部分的类型不匹配。
在此方案中,我们需要执行类型转换。
Spring 集成提供了一种注册类型转换器(通过使用 SpringConversionService
) 在其自己的名为integrationConversionService
.
一旦使用 Spring 集成基础结构定义了第一个转换器,就会自动创建该 bean。
要注册转换器,您可以实现org.springframework.core.convert.converter.Converter
,org.springframework.core.convert.converter.GenericConverter
或org.springframework.core.convert.converter.ConverterFactory
.
这Converter
implementation 是最简单的,并且从单一类型转换为另一种类型。
对于更复杂的作,例如转换为类层次结构,您可以实现GenericConverter
可能还有一个ConditionalConverter
.
这些选项允许您完全访问from
和to
类型描述符,支持复杂的转换。
例如,如果你有一个名为Something
即转化的目标(parameter type、channel 数据类型等),您有两个具体的实现,称为Thing1
和Thing
,并且您希望根据输入类型转换为其中一种,则GenericConverter
会很合适。
有关更多信息,请参阅以下接口的 Javadoc:
实现转换器后,可以使用方便的命名空间支持注册它,如下例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,你可以使用内部 Bean,如下例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring 集成 4.0 开始,你可以使用 Comments 来创建前面的配置,如下例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您也可以使用@Configuration
annotation 中,如下例所示:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
在配置应用程序上下文时,Spring Framework 允许您添加 相比之下, 但是,如果您确实想使用 Spring
在这种情况下,由 |
内容类型转换
从版本 5.0 开始,默认情况下,方法调用机制基于org.springframework.messaging.handler.invocation.InvocableHandlerMethod
基础设施。
其HandlerMethodArgumentResolver
实现(例如PayloadArgumentResolver
和MessageMethodArgumentResolver
) 可以使用MessageConverter
abstraction 来转换传入的payload
添加到 Target Method 参数类型。
转换可以基于contentType
消息标头。
为此, Spring 集成提供了ConfigurableCompositeMessageConverter
,它委托给要调用的已注册转换器列表,直到其中一个转换器返回非 null 结果。
默认情况下,此转换器提供(按严格顺序):
-
MappingJackson2MessageConverter
如果 Jackson 处理器存在于 Classpath 中
请参阅 Javadoc(在前面的列表中链接)以了解有关其用途和适当性的更多信息contentType
值进行转化。
这ConfigurableCompositeMessageConverter
是因为它可以与任何其他MessageConverter
implementations,包括或排除前面提到的 default converters。
它也可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如下例所示:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新转换器在 defaults 之前注册在 composite 中。
您也不能使用ConfigurableCompositeMessageConverter
但请提供您自己的MessageConverter
通过注册一个名称为integrationArgumentResolverMessageConverter
(通过设置IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
属性)。
这MessageConverter 基于 -based(包括contentType header) 转换不可用。
在这种情况下,只有上面 Payload Type Conversion 中提到的常规类到类转换可用。 |
异步轮询
如果希望轮询是异步的,Poller 可以选择指定task-executor
属性,该属性指向任何TaskExecutor
bean(Spring 3.0 通过task
命名空间)。
但是,在使用TaskExecutor
.
问题在于有两个配置,即 Poller 和TaskExecutor
.
他们必须彼此协调一致。
否则,您最终可能会造成人为的内存泄漏。
请考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
前面的配置演示了 Out-of-tune 配置。
默认情况下,任务执行程序具有无界任务队列。 即使所有线程都被阻塞,Poller 也会继续调度新任务,等待新消息到达或超时过期。 假设有 20 个线程执行任务,超时时间为 5 秒,因此它们以每秒 4 个的速率执行。 但是,新任务以每秒 20 个的速率调度,因此任务执行程序中的内部队列以每秒 16 个的速度增长(当进程空闲时),因此我们存在内存泄漏。
处理此问题的方法之一是将queue-capacity
任务执行程序的属性。
即使 0 也是一个合理的值。
您还可以通过设置设置rejection-policy
Task Executor 的属性(例如,更改为DISCARD
).
换句话说,在配置时必须了解某些细节TaskExecutor
.
有关该主题的更多详细信息,请参阅 Spring 参考手册中的“任务执行和调度”。
端点内部 Bean
许多端点是复合 bean。
这包括所有使用者和所有轮询的入站通道适配器。
使用者(轮询或事件驱动)委托给MessageHandler
.
轮询的适配器通过委托给MessageSource
.
通常,获取对委托 Bean 的引用很有用,这可能是为了在运行时更改配置或用于测试。
这些 bean 可以从ApplicationContext
具有众所周知的名字。MessageHandler
实例使用类似于someConsumer.handler
(其中 'consumer' 是端点的id
属性)。MessageSource
实例使用 Bean ID 注册,类似于somePolledAdapter.source
,其中 'somePolledAdapter' 是适配器的 ID。
上述内容仅适用于框架组件本身。 您可以改用内部 Bean 定义,如下例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
该 bean 的处理方式与声明的任何内部 bean 相同,并且未在应用程序上下文中注册。
如果您希望以其他方式访问此 bean,请在顶层使用id
并使用ref
属性。
有关更多信息,请参阅 Spring 文档。