对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
Channel Adapter
通道适配器是一个消息端点,它允许将单个发送方或接收方连接到消息通道。 Spring 集成提供了许多适配器来支持各种传输,例如 JMS、文件、HTTP、Web 服务、邮件等。 本参考指南的后续章节将讨论每个适配器。 但是,本章重点介绍简单但灵活的方法调用通道适配器支持。 有入站和出站适配器,每个适配器都可以使用 core 命名空间中提供的 XML 元素进行配置。 这些提供了一种扩展 Spring Integration 的简单方法,只要你有一个可以作为源或目标调用的方法。
配置入站通道适配器
元素(在 Java 配置中为 a)可以调用 Spring 管理对象上的任何方法,并在将方法的输出转换为 a 后将非 null 返回值发送到 。
激活适配器的订阅后,轮询器会尝试从源接收消息。
根据提供的配置,使用 调度 Poller。
要为单个通道适配器配置轮询间隔或 cron 表达式,你可以提供一个 'poller' 元素,其中包含一个调度属性,例如 'fixed-rate' 或 'cron'。
以下示例定义了两个实例:inbound-channel-adapter
SourcePollingChannelAdapter
MessageChannel
Message
TaskScheduler
inbound-channel-adapter
-
Java DSL
-
Java
-
Kotlin DSL
-
XML
@Bean
public IntegrationFlow source1() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.fixedRate(5000)))
...
.get();
}
@Bean
public IntegrationFlow source2() {
return IntegrationFlow.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
...
.get();
}
public class SourceService {
@InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
Object method1() {
...
}
@InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
Object method2() {
...
}
}
@Bean
fun messageSourceFlow() =
integrationFlow( { GenericMessage<>(...) },
{ poller { it.fixedRate(5000) } }) {
...
}
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>
<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
<int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
另请参阅 Channel Adapter 表达式和脚本。
如果未提供 Poller,则必须在上下文中注册单个默认 Poller。 有关更多详细信息,请参阅 Endpoint Namespace Support 。 |
轮询终端节点的默认触发器是具有 1 秒固定延迟期的实例。PeriodicTrigger |
重要:轮询器配置
所有类型都由 a 提供支持,这意味着它们包含一个 poller 配置,该配置根据 Poller 中指定的配置轮询(以调用生成成为有效负载的值的自定义方法)。
以下示例显示了两个 Poller 的配置:
在第一个配置中,轮询任务每次轮询调用一次,并且在每个任务 (poll) 期间,根据属性值调用方法(导致消息生成)一次。
在第二种配置中,轮询任务每次轮询调用 10 次,或者直到它返回 'null',因此每次轮询可能会生成 10 条消息,而每次轮询的间隔为 1 秒。
但是,如果配置类似于以下示例,会发生什么情况:
请注意,没有指定。
正如我们稍后介绍的那样,(例如, , , , 和其他)中相同的 poller 配置将具有 for for 的默认值,这意味着“除非轮询方法返回null(可能是因为)中没有更多消息,否则不间断地执行轮询任务”,然后休眠一秒钟。 但是,在 中,它有点不同。
的默认值为 ,除非您将其显式设置为负值(如 )。
这确保了 Poller 可以对生命周期事件(例如启动和停止)做出反应,并防止它在自定义方法的实现可能永远不会返回 null 并且恰好是不可中断的情况下可能在无限循环中旋转。 但是,如果您确定您的方法可以返回 null,并且需要在每次轮询中轮询尽可能多的可用源,则应显式设置为负值,如下例所示:
从版本 5.5 开始,值 for 具有特殊含义 - 完全跳过调用,这可能被视为暂停此入站通道适配器,直到稍后将 for 更改为非零值,例如通过 Control Bus。 从版本 6.2 开始,和 可以配置为 ISO 8601 Duration 格式,例如 ,等。
此外,基础的 an 还以与 和 类似的值格式公开。 另请参阅 Global Default Poller 了解更多信息。 |
配置出站通道适配器
元素(用于 Java 配置)还可以将 a 连接到任何 POJO 消费者方法,该方法应该使用发送到该通道的消息的有效负载来调用。
以下示例说明如何定义出站通道适配器:outbound-channel-adapter
@ServiceActivator
MessageChannel
-
Java DSL
-
Java
-
Kotlin DSL
-
XML
@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
return f -> f
.handle(myPojo, "handle");
}
public class MyPojo {
@ServiceActivator(channel = "channel1")
void handle(Object payload) {
...
}
}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
integrationFlow {
handle(myPojo, "handle")
}
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>
<beans:bean id="target" class="org.MyPojo"/>
如果要适配的通道是 ,则必须提供 poller 子元素( 上的 子注释 ),如下例所示:PollableChannel
@Poller
@ServiceActivator
-
Java
-
XML
public class MyPojo {
@ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
void handle(Object payload) {
...
}
}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
<int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>
<beans:bean id="target" class="org.MyPojo"/>
如果 POJO 使用者实现可以在其他定义中重用,则应使用属性。
但是,如果使用者实现仅由的单个定义引用,则可以将其定义为内部 bean,如下例所示:ref
<outbound-channel-adapter>
<outbound-channel-adapter>
<int:outbound-channel-adapter channel="channel" method="handle">
<beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
不允许在同一配置中同时使用 attribute 和 inner handler definition,因为它会产生不明确的条件。
此类配置会导致引发异常。ref <outbound-channel-adapter> |
可以在没有引用的情况下创建任何通道适配器,在这种情况下,它会隐式创建 .
创建的频道名称与 or 元素的属性匹配。
因此,如果未提供,则需要。channel
DirectChannel
id
<inbound-channel-adapter>
<outbound-channel-adapter>
channel
id
通道适配器表达式和脚本
与许多其他 Spring 集成组件一样,和 也提供了对 SPEL 表达式求值的支持。
要使用 SPEL,请在'expression'属性中提供表达式字符串,而不是提供用于在 Bean 上调用方法的'ref'和'method'属性。
计算表达式时,它遵循与 method-invocation 相同的协定,其中:每当评估结果为非 null 值时,an 的表达式都会生成一条消息,而 an 的表达式必须等效于返回 void 的方法调用。<inbound-channel-adapter>
<outbound-channel-adapter>
<inbound-channel-adapter>
<outbound-channel-adapter>
从 Spring Integration 3.0 开始,还可以使用 SPEL (甚至 a )子元素进行配置,因为需要比使用简单的 'expression' 属性所能实现的更复杂的事情。
如果使用 属性以脚本形式提供脚本,则还可以设置 ,从而允许定期刷新资源。
如果希望在每次轮询时检查脚本,则需要将此设置与 Poller 的触发器协调,如下例所示:<int:inbound-channel-adapter/>
<expression/>
<script/>
Resource
location
refresh-check-delay
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller max-messages-per-poll="1" fixed-delay="5000"/>
<script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>
另请参阅 when using the sub-element 上的属性。
有关表达式的更多信息,请参阅 Spring 表达式语言 (SpEL)。
有关脚本,请参阅 Groovy 支持和脚本支持。cacheSeconds
ReloadableResourceBundleExpressionSource
<expression/>
() 是一个端点,它通过定期触发以轮询某些底层 .
由于在轮询时没有消息对象,因此表达式和脚本无权访问 root ,因此大多数其他消息传递 SpEL 表达式中没有可用的有效负载或标头属性。
该脚本可以生成并返回带有 Headers 和 payload 的完整对象,也可以只生成并返回一个 payload,该 payload 由框架添加到具有基本 Headers 的消息中。<int:inbound-channel-adapter/> SourcePollingChannelAdapter MessageSource Message Message |