此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
要创建消息通道实例,可以使用 xml 元素或 Java 配置实例,如下所示:<channel/>
DirectChannel
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
当您使用不带任何子元素的元素时,它会创建一个实例 (a )。<channel/>
DirectChannel
SubscribableChannel
要创建发布-订阅通道,请使用元素(在 Java 中为 the),如下所示:<publish-subscribe-channel/>
PublishSubscribeChannel
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
您也可以提供各种子元素来创建任何可轮询的通道类型(如 消息通道实现中所述)。
以下部分显示了每种通道类型的示例。<queue/>
DirectChannel
配置
如前所述, 是默认类型。
下面的清单显示了定义谁:DirectChannel
-
Java
-
XML
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
默认通道具有循环负载均衡器,并且还启用了故障转移(有关更多详细信息,请参阅 DirectChannel
)。
要禁用其中一项或两项,请添加子元素( 的构造函数 )并按如下方式配置属性:<dispatcher/>
LoadBalancingStrategy
DirectChannel
-
Java
-
XML
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
数据类型 Channel 配置
有时,使用者只能处理特定类型的有效负载,这迫使您确保输入消息的有效负载类型。 首先想到的可能是使用消息过滤器。 但是,消息筛选器所能做的只是筛选出不符合使用者要求的消息。 另一种方法是使用基于内容的路由器,并将具有不合规数据类型的消息路由到特定的转换器,以强制转换和转换为所需的数据类型。 这将有效,但完成相同任务的更简单方法是应用 Datatype Channel 模式。 您可以为每个特定的负载数据类型使用单独的数据类型通道。
要创建仅接受包含特定有效负载类型的消息的数据类型通道,请在 channel 元素的属性中提供数据类型的完全限定类名,如下例所示:datatype
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
请注意,对于可分配给通道数据类型的任何类型的类型,类型检查都会通过。
换句话说,前面示例中的 the 将接受有效负载为 或 的消息。
可以将多个类型作为逗号分隔的列表提供,如下例所示:numberChannel
java.lang.Integer
java.lang.Double
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
因此,前面示例中的 'numberChannel' 只接受数据类型为 .
但是,如果消息的有效负载不是 required 类型,会发生什么情况呢?
这取决于你是否定义了一个名为 Spring 的 Conversion Service 实例的 bean。
如果没有,那么将立即抛出 an。
但是,如果已定义 Bean,则尝试将 Bean 用于将消息的有效负载转换为可接受的类型。java.lang.Number
integrationConversionService
Exception
integrationConversionService
您甚至可以注册自定义转换器。
例如,假设您将一条带有有效负载的消息发送到我们上面配置的 'numberChannel'。
您可以按如下方式处理该消息:String
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常,这将是一个完全合法的操作。 但是,由于我们使用 Datatype Channel,因此此类操作的结果将生成类似于以下内容的异常:
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
发生异常的原因是我们要求有效负载类型为 ,但我们发送了 .
所以我们需要一些东西来将 a 转换为 .
为此,我们可以实现类似于以下示例的转换器:Number
String
String
Number
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
然后,我们可以将其注册为 Integration Conversion Service 的转换器,如下例所示:
-
Java
-
XML
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
或者在类上标记有用于自动扫描的注释时。StringToIntegerConverter
@Component
当 'converter' 元素被解析时,如果尚未定义 bean,它会创建 bean。
有了该转换器,该操作现在将成功,因为数据类型通道使用该转换器将有效负载转换为 .integrationConversionService
send
String
Integer
有关负载类型转换的更多信息,请参阅负载类型转换。
从版本 4.0 开始,由 调用,它在应用程序上下文中查找转换服务。
要使用其他转换技术,您可以在通道上指定属性。
这必须是对 implementation 的引用。
仅使用方法。
它为转换器提供对消息标头的访问(如果转换可能需要标头中的信息,例如 )。
该方法只能返回已转换的有效负载或完整对象。
如果是后者,则转换器必须小心地从入站消息中复制所有 Headers。integrationConversionService
DefaultDatatypeChannelMessageConverter
message-converter
MessageConverter
fromMessage
content-type
Message
或者,您可以声明 ID 为 的 type ,并且该转换器由具有 .<bean/>
MessageConverter
datatypeChannelMessageConverter
datatype
QueueChannel
配置
要创建 ,请使用 sub-element.
您可以按如下方式指定通道的容量:QueueChannel
<queue/>
-
Java
-
XML
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
如果您没有为此子元素的 'capacity' 属性提供值,则生成的队列是无限的。
为避免内存不足等问题,我们强烈建议您为有界队列设置显式值。<queue/> |
持久配置QueueChannel
由于 a 提供了缓冲消息的功能,但默认情况下仅在内存中缓冲,因此它还引入了在系统故障时消息可能会丢失的可能性。
为了降低这种风险,a 可能由策略接口的持续实现提供支持。
有关 和 的更多详细信息,请参阅 Message Store。QueueChannel
QueueChannel
MessageGroupStore
MessageGroupStore
MessageStore
使用该属性时不允许使用该属性。capacity message-store |
当 收到 时,它会将消息添加到消息存储中。
从 轮询 a 时,会将其从邮件存储中删除。QueueChannel
Message
Message
QueueChannel
默认情况下,a 将其消息存储在内存中队列中,这可能会导致前面提到的消息丢失情况。
但是, Spring Integration 提供了持久存储,例如 .QueueChannel
JdbcChannelMessageStore
您可以通过添加属性来为 any 配置消息存储,如下例所示:QueueChannel
message-store
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(有关Java/Kotlin配置选项,请参阅以下示例。
Spring 集成 JDBC 模块还为许多流行的数据库提供了模式数据定义语言(DDL)。
这些模式位于该模块的org.springframework.integration.jdbc.store.channel包中()。spring-integration-jdbc
一个重要的功能是,对于任何事务性持久存储(例如 ),只要 Poller 配置了事务,只有在事务成功完成时,才能永久删除从存储中删除的消息。
否则,事务将回滚,并且不会丢失。JdbcChannelMessageStore Message |
随着越来越多的与 “NoSQL” 数据存储相关的 Spring 项目开始为这些存储提供底层支持,可以使用消息存储的许多其他实现。
如果找不到满足您特定需求的接口,您也可以提供自己的接口实现。MessageGroupStore
从版本 4.0 开始,我们建议将实例配置为尽可能使用 。
与一般邮件存储相比,这些存储通常针对此用途进行了优化。
如果 是 a ,则按优先级顺序在 FIFO 中接收消息。
优先级的概念由 message store 实现确定。
例如,以下示例显示了 MongoDB 通道消息存储的 Java 配置:QueueChannel
ChannelMessageStore
ChannelMessageStore
ChannelPriorityMessageStore
-
Java
-
Java DSL
-
Kotlin DSL
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlow.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
注意类。
这是使用 operations 的实现。MessageGroupQueue BlockingQueue MessageGroupStore |
自定义环境的另一个选项由 sub-element 的属性或其特定构造函数提供。
此属性提供对任何 implementation 的引用。
例如,Hazelcast 分布式 IQueue
可以按如下方式进行配置:QueueChannel
ref
<int:queue>
java.util.Queue
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
如果您没有为此子元素的 'capacity' 属性提供值,则生成的队列是无限的。
为避免内存不足等问题,我们强烈建议您为有界队列设置显式值。<queue/> |
使用该属性时不允许使用该属性。capacity message-store |
一个重要的功能是,对于任何事务性持久存储(例如 ),只要 Poller 配置了事务,只有在事务成功完成时,才能永久删除从存储中删除的消息。
否则,事务将回滚,并且不会丢失。JdbcChannelMessageStore Message |
注意类。
这是使用 operations 的实现。MessageGroupQueue BlockingQueue MessageGroupStore |
PublishSubscribeChannel
配置
要创建 ,请使用 <publish-subscribe-channel/> 元素。
使用此元素时,您还可以指定用于发布消息的 (如果未指定,它将在发件人的线程中发布),如下所示:PublishSubscribeChannel
task-executor
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
如果提供 下游的 重排序器或聚合器 ,则可以将通道上的 “apply-sequence” 属性设置为 。
这样做表示通道应在传递消息之前设置 and 消息标头以及相关 ID。
例如,如果有五个订阅者,则 将设置为 ,并且消息的标头值范围为 到 。PublishSubscribeChannel
true
sequence-size
sequence-number
sequence-size
5
sequence-number
1
5
除了 之外,您还可以配置 .
默认情况下,它使用 implementation 将错误从 header 发送到 或 global 实例。
如果未配置 an,则忽略 ,并将异常直接抛出到调用方的线程中。Executor
ErrorHandler
PublishSubscribeChannel
MessagePublishingErrorHandler
MessageChannel
errorChannel
errorChannel
Executor
ErrorHandler
如果提供 或 downstream ,则可以将渠道上的“apply-sequence”属性设置为 。
这样做表示通道应在传递消息之前设置 sequence-size 和 sequence-number 消息头以及相关 ID。
例如,如果有五个订阅者,则 sequence-size 将设置为 ,并且消息将具有 sequence-number 标头值,范围从 到 。Resequencer
Aggregator
PublishSubscribeChannel
true
5
1
5
以下示例显示如何将标头设置为 :apply-sequence
true
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
该值是默认的,以便发布-订阅通道可以将完全相同的消息实例发送到多个出站通道。
由于 Spring Integration 强制执行有效负载和 Headers 引用的不变性,因此当标志设置为 时,通道会创建具有相同有效负载引用但不同 Headers 值的新实例。apply-sequence false true Message |
从版本 5.4.3 开始,还可以配置 its 选项,以指示此通道在没有订阅者时不会静默忽略消息。
当没有订阅者时,将引发带有消息的 A,并且此选项设置为 。PublishSubscribeChannel
requireSubscribers
BroadcastingDispatcher
MessageDispatchingException
Dispatcher has no subscribers
true
该值是默认的,以便发布-订阅通道可以将完全相同的消息实例发送到多个出站通道。
由于 Spring Integration 强制执行有效负载和 Headers 引用的不变性,因此当标志设置为 时,通道会创建具有相同有效负载引用但不同 Headers 值的新实例。apply-sequence false true Message |
ExecutorChannel
要创建 ,请添加具有属性的子元素。
该属性的值可以引用上下文中的 any。
例如,这样做可以启用线程池的配置,以便将消息分派给订阅的处理程序。
如前所述,这样做会打破发送方和接收方之间的单线程执行上下文,以便处理程序的调用不会共享任何活动的事务上下文(即,处理程序可能会抛出,但调用已经成功返回)。
下面的示例演示如何使用元素并在属性中指定执行程序:ExecutorChannel
<dispatcher>
task-executor
TaskExecutor
Exception
send
dispatcher
task-executor
-
Java
-
XML
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
和 选项在 <dispatcher/> 子元素上也可用,如前面的
|
和 选项在 <dispatcher/> 子元素上也可用,如前面的
|
PriorityChannel
配置
要创建 ,请使用 sub-element,如下例所示:PriorityChannel
<priority-queue/>
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
默认情况下,通道会查询消息的 Headers。
但是,您可以改为提供自定义引用。
另外,请注意 (像其他类型一样) 确实支持 attribute 。
与 一样,它也支持属性。
以下示例演示了所有这些:priority
Comparator
PriorityChannel
datatype
QueueChannel
capacity
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
从版本 4.0 开始,子元素支持选项 ( ,在这种情况下不允许使用)。
邮件存储必须是 .
目前为 、 和 提供了 的实现。
有关更多信息,请参阅 QueueChannel
Configuration 和 Message Store 。
您可以在 Backing Message Channels 中找到示例配置。priority-channel
message-store
comparator
capacity
PriorityCapableChannelMessageStore
PriorityCapableChannelMessageStore
Redis
JDBC
MongoDB
RendezvousChannel
配置
当队列子元素为 时创建 A 。
它不提供前面描述的配置选项的任何其他配置选项,并且其队列不接受任何容量值,因为它是零容量直接切换队列。
以下示例演示如何声明 :RendezvousChannel
<rendezvous-queue>
RendezvousChannel
-
Java
-
XML
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
通道拦截器配置
消息通道也可能具有拦截器,如 通道拦截器中所述。
子元素可以添加到 (或更具体的元素类型) 。
你可以提供该属性来引用实现该接口的任何 Spring 托管对象,如下例所示:<interceptors/>
<channel/>
ref
ChannelInterceptor
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
通常,我们建议在单独的位置定义拦截器实现,因为它们通常提供可在多个通道之间重用的常见行为。
全局通道拦截器配置
Channel interceptors 提供了一种简洁明了的方式来为每个单独的 Channel 应用横切行为。 如果应该在多个 channel 上应用相同的行为,则为每个 channel 配置相同的拦截器集将不是最有效的方法。 为了避免重复配置,同时使拦截器能够应用于多个通道, Spring 集成提供了全局拦截器。 请考虑以下一对示例:
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
每个元素都允许您定义一个全局侦听器,该侦听器应用于与属性定义的任何模式匹配的所有通道。
在前面的情况下,全局拦截器应用于 'thing1' 通道和所有其他以 'thing2' 或 'input' 开头的通道,但不应用于以 'thing3' 开头的通道(从 5.0 版本开始)。<channel-interceptor/>
pattern
将此语法添加到模式中会导致一个可能的(尽管可能不太可能)问题。
如果你有一个名为 bean 的 bean,并且你在通道拦截器的模式中包含了 的模式,那么它不再匹配。
该模式现在匹配所有未命名的 bean。
在这种情况下,您可以使用 .
该模式与名为 .!thing1 !thing1 pattern thing1 ! \ \!thing1 !thing1 |
order 属性允许您管理当给定通道上有多个拦截器时,此拦截器的注入位置。 例如,通道 'inputChannel' 可以在本地配置单独的拦截器(见下文),如下例所示:
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
一个合理的问题是“相对于本地配置的其他拦截器或通过其他全局拦截器定义,全局拦截器是如何注入的?
当前的实现提供了一种简单的机制来定义拦截器执行的顺序。
属性中的正数确保在任何现有拦截器之后注入拦截器,而负数确保拦截器在现有拦截器之前注入。
这意味着,在前面的示例中,全局拦截器被注入在本地配置的 'wire-tap' 拦截器之后(因为它大于)。
如果有另一个全局拦截器与 matching ,则其顺序将通过比较两个拦截器的属性值来确定。
要在现有拦截器之前注入全局拦截器,请对属性使用负值。order
order
0
pattern
order
order
请注意,和 属性都是可选的。
的默认值为 0,的默认值为 '*'(以匹配所有通道)。order pattern order pattern |
将此语法添加到模式中会导致一个可能的(尽管可能不太可能)问题。
如果你有一个名为 bean 的 bean,并且你在通道拦截器的模式中包含了 的模式,那么它不再匹配。
该模式现在匹配所有未命名的 bean。
在这种情况下,您可以使用 .
该模式与名为 .!thing1 !thing1 pattern thing1 ! \ \!thing1 !thing1 |
请注意,和 属性都是可选的。
的默认值为 0,的默认值为 '*'(以匹配所有通道)。order pattern order pattern |
丝锥
如前所述, Spring 集成提供了一个简单的 wire tap 拦截器。
您可以在元素中的任何通道上配置接线。
这样做对于调试特别有用,并且可以与 Spring 集成的日志记录通道适配器结合使用,如下所示:<interceptors/>
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter'还接受'expression'属性,以便您可以根据'payload'和'headers'变量评估 SPEL 表达式。
或者,要记录完整的消息结果,请为 'log-full-message' 属性提供值 。
默认情况下,仅记录有效负载。
将其设置为 enable logging all headers s go to payload.
'expression' 选项提供了最大的灵活性(例如, )。toString() true false true expression="payload.user.name" |
关于 wire tap 和其他类似组件(消息发布配置)的一个常见误解是,它们在本质上是自动异步的。 默认情况下,作为组件的 wire tap 不会异步调用。 相反, Spring 集成专注于配置异步行为的单一统一方法:消息通道。 使消息流的某些部分同步或异步的是在该流中配置的 Message Channel 的类型。 这是消息通道抽象的主要好处之一。 从框架成立之初,我们就一直强调消息通道作为框架的一等公民的需求和价值。 它不仅仅是 EIP 模式的内部隐式实现。 它作为可配置组件完全公开给最终用户。 因此,Wire Tap 组件仅负责执行以下任务:
-
通过点击通道(例如
channelA
) -
抓取每条消息
-
将消息发送到另一个通道(例如
channelB
)
它本质上是桥接模式的变体,但它封装在通道定义中(因此更容易在不中断流的情况下启用和禁用)。 此外,与桥接不同,它基本上是分叉另一个消息流。 该流是同步的还是异步的?答案取决于 'channelB' 的消息通道类型。 我们有以下选项:direct channel、pollable channel 和 executor channel。 最后两个打破了线程边界,使通过此类通道的通信异步,因为将消息从该通道分派到其订阅的处理程序发生在与用于将消息发送到该通道的线程不同的线程上。 这就是使您的 wire-tap 流同步或异步的原因。 它与框架中的其他组件(比如消息发布者)一致,并且通过让您无需提前担心(除了编写线程安全代码)特定代码段应该作为同步还是异步实现,从而增加了一定程度的一致性和简单性。 两个代码段(比如组件 A 和组件 B)在消息通道上的实际连接使它们的协作同步或异步。 你甚至可能希望将来从 synchronous 更改为 asynchronous ,而 message channel 让你无需接触代码即可快速完成。
关于窃听的最后一点是,尽管上面提供了默认情况下不异步的基本原理,但您应该记住,通常希望尽快传递消息。 因此,使用 asynchronous channel 选项作为 wire tap 的出站通道是很常见的。 但是,默认情况下不强制实施异步行为。 如果我们这样做,有许多用例会中断,包括您可能不想打破事务边界。 也许您使用 wire tap 模式进行审计,并且您确实希望在原始事务中发送审计消息。 例如,您可以将 wire tap 连接到 JMS 出站通道适配器。 这样,您可以获得两全其美的效果:1) JMS 消息的发送可以在事务中进行,而 2) 它仍然是一个 “即发即弃” 操作,从而防止主消息流中出现任何明显的延迟。
从版本 4.0 开始,当侦听器(例如 WireTap 类)引用通道时避免循环引用非常重要。
您需要将此类 channel 从当前拦截器拦截的 channels 中排除。
这可以通过适当的模式或编程方式完成。
如果您有引用 的自定义,请考虑实施 .
这样,框架会询问拦截器是否可以根据提供的模式拦截每个候选通道。
您还可以在拦截器方法中添加运行时保护,以确保通道不是拦截器引用的通道。
这同时使用了这两种技术。ChannelInterceptor channel VetoCapableInterceptor WireTap |
从版本 4.3 开始,具有采用 a 而不是 instance 的其他构造函数。
这对于 Java 配置以及使用通道自动创建逻辑时非常方便。
目标 bean 在稍后提供的 中解析,在与
拦截 器。WireTap
channelName
MessageChannel
MessageChannel
channelName
通道解析需要一个 ,因此 wire tap 实例必须是 Spring 管理的 bean。BeanFactory |
这种后期绑定方法还允许使用 Java DSL 配置简化典型的窃听模式,如下例所示:
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
'logging-channel-adapter'还接受'expression'属性,以便您可以根据'payload'和'headers'变量评估 SPEL 表达式。
或者,要记录完整的消息结果,请为 'log-full-message' 属性提供值 。
默认情况下,仅记录有效负载。
将其设置为 enable logging all headers s go to payload.
'expression' 选项提供了最大的灵活性(例如, )。toString() true false true expression="payload.user.name" |
从版本 4.0 开始,当侦听器(例如 WireTap 类)引用通道时避免循环引用非常重要。
您需要将此类 channel 从当前拦截器拦截的 channels 中排除。
这可以通过适当的模式或编程方式完成。
如果您有引用 的自定义,请考虑实施 .
这样,框架会询问拦截器是否可以根据提供的模式拦截每个候选通道。
您还可以在拦截器方法中添加运行时保护,以确保通道不是拦截器引用的通道。
这同时使用了这两种技术。ChannelInterceptor channel VetoCapableInterceptor WireTap |
通道解析需要一个 ,因此 wire tap 实例必须是 Spring 管理的 bean。BeanFactory |
条件接线器
可以使用 or 属性将 Wire Tap 设为有条件。
该 Bean 引用一个 Bean,该 Bean 可以在运行时确定消息是否应转到 tap 通道。
同样, is a boolean SpEL 表达式,执行相同的目的:如果表达式的计算结果为 ,则消息将发送到 tap 通道。selector
selector-expression
selector
MessageSelector
selector-expression
true
全局 Wire Tap 配置
可以将全局 wire tap 配置为 Global Channel Interceptor Configuration 的特殊情况。
为此,请配置一个 top level 元素。
现在,除了正常的命名空间支持之外,还支持 and 属性,并且其工作方式与它们对 .
以下示例说明如何配置全局 Wire Tap:wire-tap
wire-tap
pattern
order
channel-interceptor
-
Java
-
XML
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局 Wire Tap 提供了一种在外部配置单通道 Wire Tap 而无需修改现有通道配置的便捷方法。
为此,请将 该属性设置为 target channel name。
例如,您可以使用此技术配置测试用例以验证通道上的消息。pattern |
全局 Wire Tap 提供了一种在外部配置单通道 Wire Tap 而无需修改现有通道配置的便捷方法。
为此,请将 该属性设置为 target channel name。
例如,您可以使用此技术配置测试用例以验证通道上的消息。pattern |