对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
分配器
拆分器是一个组件,其作用是将消息划分为多个部分,并发送生成的消息以进行独立处理。 很多时候,他们是包含聚合器的管道中的上游生产者。
编程模型
用于执行拆分的 API 由一个基类 .
它是一种封装拆分器通用功能的实现,例如在生成的消息上填写相应的消息标头 (, 和 )。
通过此填充,可以跟踪消息及其处理结果(在典型情况下,这些标头将复制到由各种转换终端节点生成的消息中)。
然后,这些值可以由组合消息处理器等使用。AbstractMessageSplitter
MessageHandler
CORRELATION_ID
SEQUENCE_SIZE
SEQUENCE_NUMBER
以下示例显示了以下摘录:AbstractMessageSplitter
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在应用程序中实现特定的拆分器,可以扩展并实现该方法,其中包含用于拆分消息的逻辑。
返回值可以是以下值之一:AbstractMessageSplitter
splitMessage
-
A 或一个消息数组或迭代消息的 (或 )。 在这种情况下,消息将作为消息发送(在 之后 ,并填充)。 使用此方法可以为您提供更多控制 — 例如,在拆分过程中填充自定义消息标头。
Collection
Iterable
Iterator
CORRELATION_ID
SEQUENCE_SIZE
SEQUENCE_NUMBER
-
非消息对象的 A 或数组,或迭代非消息对象的 (或 )。 它的工作方式与前一种情况类似,不同之处在于每个 collection 元素都用作消息有效负载。 使用此方法,您可以专注于域对象,而不必考虑消息传送系统,并生成更易于测试的代码。
Collection
Iterable
Iterator
-
一个或非 message 对象(但不是集合或数组)。 它的工作方式与前面的情况类似,只是只发送了一条消息。
Message
在 Spring 集成中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。
在这种情况下,该方法的返回值将如前所述进行解释。
input 参数可以是 a 或简单的 POJO。
在后一种情况下,拆分器接收传入消息的有效负载。
我们推荐这种方法,因为它将代码与 Spring Integration API 分离,并且通常更容易测试。Message
迭代器
从版本 4.1 开始,支持 to split 的类型。
请注意,在 (或 ) 的情况下,我们无权访问基础项目的数量,并且 header 设置为 .
这意味着 default 的 an 将不起作用,并且不会释放 from 的组;它将保持为 。
在这种情况下,您应该使用适当的自定义或依赖 或 .AbstractMessageSplitter
Iterator
value
Iterator
Iterable
SEQUENCE_SIZE
0
SequenceSizeReleaseStrategy
<aggregator>
CORRELATION_ID
splitter
incomplete
ReleaseStrategy
send-partial-result-on-expiry
group-timeout
MessageGroupStoreReaper
从版本 5.0 开始,如果可能,它提供了允许确定 and 对象大小的方法。
例如,可以确定底层对象的大小。
从版本 5.0.9 开始,此方法还会正确返回 .AbstractMessageSplitter
protected obtainSizeIfPossible()
Iterable
Iterator
XPathMessageSplitter
NodeList
com.fasterxml.jackson.core.TreeNode
对象有助于避免在拆分之前在内存中构建整个集合。
例如,当使用迭代或流从某些外部系统(例如 DataBase 或 FTP )填充基础项时。Iterator
MGET
Stream 和 Flux
从版本 5.0 开始,支持 Java 和 Reactive Streams 类型进行拆分。
在这种情况下,目标是基于其迭代功能构建的。AbstractMessageSplitter
Stream
Publisher
value
Iterator
此外,如果分流器的输出通道是 的实例,则会产生一个结果而不是 ,并且输出通道订阅了此实例,以便对下游流量需求进行基于背压的拆分。ReactiveStreamsSubscribableChannel
AbstractMessageSplitter
Flux
Iterator
Flux
从版本 5.2 开始,splitter 支持发送 split 函数返回空容器(集合、数组、流等)的请求消息的选项。
在这种情况下,没有要迭代的项目来发送到 .
拆分结果仍作为流结束指示器。discardChannel
Flux
outputChannel
null
使用 Java、Groovy 和 Kotlin DSL 配置 Splitter
基于 a 及其具有 DSL 配置的可迭代有效负载的简单拆分器示例:Message
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
有关这些 DSL 的更多信息,请参阅相应的章节:
使用 XML 配置 Splitter
可以通过 XML 配置拆分器,如下所示:
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | 拆分器的 ID 是可选的。 |
2 | 对在应用程序上下文中定义的 Bean 的引用。
Bean 必须实现拆分逻辑,如前面的部分所述。
自选。
如果未提供对 bean 的引用,则假定到达 上的消息的有效负载是 的实现,并且默认的拆分逻辑应用于集合,将每个单独的元素合并到消息中并将其发送到 。input-channel java.util.Collection output-channel |
3 | 实现拆分逻辑的方法(在 Bean 上定义)。 自选。 |
4 | 分路器的 input 通道。 必填。 |
5 | 拆分器将拆分传入消息的结果发送到的通道。 可选(因为传入消息可以自己指定回复通道)。 |
6 | 在切分结果为空的情况下,请求消息发送到的频道。
可选(它们将在 result 的情况下停止)。null |
如果自定义 splitter 实现可以在其他定义中引用,我们建议使用属性。
但是,如果自定义拆分器处理程序实现的范围应限定为 的单个定义,则可以配置内部 Bean 定义,如下例所示:ref
<splitter>
<splitter>
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
不允许在同一配置中同时使用 attribute 和 inner handler definition,因为它会产生不明确的条件并导致引发异常。ref <int:splitter> |
如果该属性引用了扩展的 bean(例如框架本身提供的拆分器),则通过将输出通道直接注入处理程序来优化配置。
在这种情况下,每个都必须是一个单独的 bean 实例(或-scoped bean)或使用内部配置类型。
但是,仅当未在拆分器 XML 定义中提供任何特定于拆分器的属性时,此优化才适用。
如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。ref AbstractMessageProducingHandler ref prototype <bean/> |
使用注释配置 Splitter
该注释适用于需要 type 或 message payload 类型的方法,并且该方法的返回值应为任何类型的 a。
如果返回的值不是实际对象,则每个项目都包装在 a 中作为 .
每个结果都发送到定义 的端点的指定 output 通道。@Splitter
Message
Collection
Message
Message
Message
Message
@Splitter
以下示例显示如何使用注释配置拆分器:@Splitter
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}