拆分器是一个组件,其作用是将消息划分为多个部分,并发送要独立处理的结果消息。 很多时候,他们是包含聚合器的管道中的上游生产者。Spring中文文档

编程模型

用于执行拆分的 API 由一个基类 . 它是一种实现,它封装了拆分器的常见功能,例如在生成的消息上填充相应的消息头 (、 和 )。 通过此填充可以跟踪消息及其处理结果(在典型方案中,这些标头将复制到由各种转换终结点生成的消息中)。 然后,这些值可以由组合消息处理器使用。AbstractMessageSplitterMessageHandlerCORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBERSpring中文文档

以下示例显示了以下内容的摘录:AbstractMessageSplitterSpring中文文档

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

若要在应用程序中实现特定的拆分器,可以扩展和实现该方法,该方法包含用于拆分消息的逻辑。 返回值可以是下列值之一:AbstractMessageSplittersplitMessageSpring中文文档

  • 消息的数组或循环访问消息的 (或)。 在这种情况下,消息将作为消息发送(在 和 之后)。 使用此方法可以为您提供更多控制权,例如,在拆分过程中填充自定义邮件头。CollectionIterableIteratorCORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBERSpring中文文档

  • 非消息对象的数组或循环访问非消息对象的 (或 )。 它的工作方式与前面的情况类似,只是每个集合元素都用作消息有效负载。 使用此方法,您可以专注于域对象,而不必考虑消息传递系统,并生成更易于测试的代码。CollectionIterableIteratorSpring中文文档

  • 或非消息对象(但不是集合或数组)。 它的工作方式与前面的情况类似,只是发送了一条消息。MessageSpring中文文档

在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。 在这种情况下,方法的返回值如前所述解释。 输入参数可以是 POJO 或简单的 POJO。 在后一种情况下,拆分器接收传入消息的有效负载。 我们推荐这种方法,因为它将代码与 Spring Integration API 分离,并且通常更易于测试。MessageSpring中文文档

迭代器

从版本 4.1 开始,支持要拆分的类型。 请注意,对于 (或 ),我们无权访问基础项的数量,并且标头设置为 。 这意味着默认值 an 将不起作用,并且不会释放 from 的组;它将保持为 . 在这种情况下,您应该使用适当的自定义或与 或 .AbstractMessageSplitterIteratorvalueIteratorIterableSEQUENCE_SIZE0SequenceSizeReleaseStrategy<aggregator>CORRELATION_IDsplitterincompleteReleaseStrategysend-partial-result-on-expirygroup-timeoutMessageGroupStoreReaperSpring中文文档

从 5.0 版开始,提供了允许确定 和 对象大小的方法(如果可能的话)。 例如,可以确定基础对象的大小。 从版本 5.0.9 开始,此方法还会正确返回 .AbstractMessageSplitterprotected obtainSizeIfPossible()IterableIteratorXPathMessageSplitterNodeListcom.fasterxml.jackson.core.TreeNodeSpring中文文档

对象可用于避免在拆分之前在内存中构建整个集合的需要。 例如,当使用迭代或流从某些外部系统(例如数据库或 FTP)填充基础项目时。IteratorMGETSpring中文文档

流和通量

从 5.0 版开始,支持 Java 和 Reactive Streams 类型进行拆分。 在本例中,目标基于其迭代功能构建。AbstractMessageSplitterStreamPublishervalueIteratorSpring中文文档

此外,如果分路器的输出通道是 的实例,则生成结果而不是 ,并且输出通道订阅了此结果,以便根据下游流量需求进行基于背压的分离。ReactiveStreamsSubscribableChannelAbstractMessageSplitterFluxIteratorFluxSpring中文文档

从 V5.2 开始,拆分器支持发送拆分函数返回空容器(集合、数组、流等)的请求消息的选项。 在本例中,没有要迭代的项来发送到 . 拆分结果仍作为流量结束指示器。discardChannelFluxoutputChannelnullSpring中文文档

使用 Java、Groovy 和 Kotlin DSL 配置拆分器

基于 a 及其具有 DSL 配置的可迭代有效负载的简单拆分器示例:MessageSpring中文文档

@Bean
public IntegrationFlow someFlow() {
    return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
    integrationFlow {
        split<Message<*>> { it.payload }
    }
@Bean
someFlow() {
    integrationFlow {
        splitWith {
		    expectedType Message<?>
		    function { it.payload }
        }
    }
}

有关 DSL 的更多信息,请参阅相应章节:Spring中文文档

使用 XML 配置拆分器

可以通过 XML 配置拆分器,如下所示:Spring中文文档

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           (1)
  ref="splitterBean"                  (2)
  method="split"                      (3)
  input-channel="inputChannel"        (4)
  output-channel="outputChannel"      (5)
  discard-channel="discardChannel" /> (6)

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 拆分器的 ID 是可选的。
2 对应用程序上下文中定义的 Bean 的引用。 Bean 必须实现拆分逻辑,如上一节所述。 自选。 如果未提供对 Bean 的引用,则假定到达 的消息的有效负载是 的实现,并且默认的拆分逻辑应用于集合,将每个单独的元素合并到消息中并将其发送到 .input-channeljava.util.Collectionoutput-channel
3 实现拆分逻辑的方法(在 Bean 上定义)。 自选。
4 分路器的输入通道。 必填。
5 拆分器将拆分传入消息的结果发送到的通道。 可选(因为传入消息可以自行指定回复通道)。
6 在空拆分结果的情况下,请求消息发送到的通道。 可选(如果出现结果,它们将停止)。null

如果可以在其他定义中引用自定义拆分器实现,则建议使用属性。 但是,如果自定义拆分器处理程序实现的范围应限定为 的单个定义,则可以配置内 Bean 定义,如下例所示:ref<splitter><splitter>Spring中文文档

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
不允许在同一配置中同时使用属性和内部处理程序定义,因为这会创建不明确的条件并导致引发异常。ref<int:splitter>
如果属性引用了扩展的 Bean(例如框架本身提供的拆分器),则通过将输出通道直接注入处理程序来优化配置。 在这种情况下,每个实例都必须是单独的 Bean 实例(或 -scoped bean)或使用内部配置类型。 但是,仅当未在拆分器 XML 定义中提供任何特定于拆分器的属性时,此优化才适用。 如果无意中引用了多个 Bean 中的同一消息处理程序,则会出现配置异常。refAbstractMessageProducingHandlerrefprototype<bean/>
1 拆分器的 ID 是可选的。
2 对应用程序上下文中定义的 Bean 的引用。 Bean 必须实现拆分逻辑,如上一节所述。 自选。 如果未提供对 Bean 的引用,则假定到达 的消息的有效负载是 的实现,并且默认的拆分逻辑应用于集合,将每个单独的元素合并到消息中并将其发送到 .input-channeljava.util.Collectionoutput-channel
3 实现拆分逻辑的方法(在 Bean 上定义)。 自选。
4 分路器的输入通道。 必填。
5 拆分器将拆分传入消息的结果发送到的通道。 可选(因为传入消息可以自行指定回复通道)。
6 在空拆分结果的情况下,请求消息发送到的通道。 可选(如果出现结果,它们将停止)。null
不允许在同一配置中同时使用属性和内部处理程序定义,因为这会创建不明确的条件并导致引发异常。ref<int:splitter>
如果属性引用了扩展的 Bean(例如框架本身提供的拆分器),则通过将输出通道直接注入处理程序来优化配置。 在这种情况下,每个实例都必须是单独的 Bean 实例(或 -scoped bean)或使用内部配置类型。 但是,仅当未在拆分器 XML 定义中提供任何特定于拆分器的属性时,此优化才适用。 如果无意中引用了多个 Bean 中的同一消息处理程序,则会出现配置异常。refAbstractMessageProducingHandlerrefprototype<bean/>

使用注释配置拆分器

批注适用于需要类型或消息负载类型的方法,并且该方法的返回值应为任何类型。 如果返回的值不是实际对象,则每个项目都包装在 a 中,作为 的有效负载。 每个结果都发送到定义该终结点的指定输出通道。@SplitterMessageCollectionMessageMessageMessageMessage@SplitterSpring中文文档

下面的示例演示如何使用注释配置拆分器:@SplitterSpring中文文档

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}