此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

在版本 4.1.2 中添加了 ,其命名空间支持在版本 4.2 中添加了。 根据 将文本文件拆分为单独的行。 默认情况下,拆分器在从文件中读取行时,使用 an 一次发出一行。 将该属性设置为使其在将所有行作为消息发出之前将所有行读入内存。 一个用例可能是,如果要在发送任何包含行的消息之前检测文件上的 I/O 错误。 但是,它仅适用于相对较短的文件。FileSplitterFileSplitterBufferedReader.readLine()IteratoriteratorfalseSpring中文文档

入站有效负载可以是 、(路径)、或 。 其他有效载荷类型保持不变。FileStringFileInputStreamReaderSpring中文文档

以下列表显示了配置 :FileSplitterSpring中文文档

@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 拆分器的 Bean 名称。
2 设置为(默认值)以使用迭代器或在发送行之前将文件加载到内存中。truefalse
3 设置为在文件数据之前和之后发出文件开头和文件结尾标记消息。 标记是带有有效负载(属性中带有 和 值)的消息。 在筛选某些行的下游流中按顺序处理文件时,可以使用标记。 它们使下游处理能够知道文件何时已完全处理。 此外,包含或添加到这些消息的标头。 标记包括行数。 如果文件为空,则发出 only 和 标记,并作为 . 默认值为 。 默认情况下,为 。 另请参阅(下一个属性)。trueFileSplitter.FileMarkerSTARTENDmarkfile_markerSTARTENDENDSTARTEND0lineCountfalsetrueapply-sequencefalsemarkers-json
4 如果为 true,请将其设置为将对象转换为 JSON 字符串。 (使用下面)。markerstrueFileMarkerSimpleJsonSerializer
5 设置为禁用邮件中包含和标头。 默认值为 ,除非是 。 当 和 是 时,标记包含在测序中。 当 和 是 时,标头设置为 ,因为大小未知。falsesequenceSizesequenceNumbertruemarkerstruetruemarkerstruetrueiteratortruesequenceSize0
6 设置为在文件中没有行时引发 a。 默认值为 。trueRequiresReplyExceptionfalse
7 设置将文本数据读取到有效负载时要使用的字符集名称。 默认值为平台字符集。String
8 在为其余行发出的消息中作为标题携带的第一行的标头名称。 从 5.0 版开始。
9 设置用于向拆分器发送消息的输入通道。
10 设置将消息发送到的输出通道。
11 设置发送超时。 仅当 can 阻止 (例如完整的 .output-channelQueueChannel
12 设置为禁用在刷新上下文时自动启动拆分器。 默认值为 。falsetrue
13 如果 是 .input-channel<publish-subscribe-channel/>
14 设置拆分器的启动阶段(在 时使用)。auto-startuptrue

还会将任何基于文本的内容拆分为几行。 从版本 4.3 开始,当与 FTP 或 SFTP 流入站通道适配器或使用该选项检索文件的 FTP 或 SFTP 出站网关结合使用时,拆分器会在文件完全使用时自动关闭支持流的会话 有关这些工具的详细信息,请参阅 FTP 流入站通道适配器SFTP 流入站通道适配器以及 FTP 出站网关SFTP 出站网关FileSplitterInputStreamstreamSpring中文文档

使用 Java 配置时,可以使用其他构造函数,如以下示例所示:Spring中文文档

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

如果为 true,则标记表示为 JSON 字符串(使用 )。markersJsonSimpleJsonSerializerSpring中文文档

版本 5.0 引入了指定第一行内容为标题(例如 CSV 文件中的列名)的选项。 传递给此属性的参数是标头名称,在该名称下,第一行作为标题在为其余行发出的消息中携带。 此行不包含在序列标头中(如果为 true),也不包含在与 关联的 中。 注意:从版本 5.5 开始,lineCount' 也作为消息的标头包含在内,因为可以序列化为 JSON。 如果文件仅包含标题行,则该文件将被视为空文件,因此,在拆分期间仅发出实例(如果启用了标记,则不会发出任何消息)。 默认情况下(如果未设置标头名称),第一行将被视为数据,并成为第一条发出消息的有效负载。firstLineAsHeaderapplySequencelineCountFileMarker.ENDFileHeaders.LINE_COUNTFileMarker.ENDFileMarkerFileMarkerSpring中文文档

如果需要从文件内容中提取标头的更复杂逻辑(不是第一行,不是整行内容,不是某个特定标头,等等),请考虑在 . 请注意,已移动到标题的行可能会从正常内容流程的下游进行筛选。FileSplitterSpring中文文档

1 拆分器的 Bean 名称。
2 设置为(默认值)以使用迭代器或在发送行之前将文件加载到内存中。truefalse
3 设置为在文件数据之前和之后发出文件开头和文件结尾标记消息。 标记是带有有效负载(属性中带有 和 值)的消息。 在筛选某些行的下游流中按顺序处理文件时,可以使用标记。 它们使下游处理能够知道文件何时已完全处理。 此外,包含或添加到这些消息的标头。 标记包括行数。 如果文件为空,则发出 only 和 标记,并作为 . 默认值为 。 默认情况下,为 。 另请参阅(下一个属性)。trueFileSplitter.FileMarkerSTARTENDmarkfile_markerSTARTENDENDSTARTEND0lineCountfalsetrueapply-sequencefalsemarkers-json
4 如果为 true,请将其设置为将对象转换为 JSON 字符串。 (使用下面)。markerstrueFileMarkerSimpleJsonSerializer
5 设置为禁用邮件中包含和标头。 默认值为 ,除非是 。 当 和 是 时,标记包含在测序中。 当 和 是 时,标头设置为 ,因为大小未知。falsesequenceSizesequenceNumbertruemarkerstruetruemarkerstruetrueiteratortruesequenceSize0
6 设置为在文件中没有行时引发 a。 默认值为 。trueRequiresReplyExceptionfalse
7 设置将文本数据读取到有效负载时要使用的字符集名称。 默认值为平台字符集。String
8 在为其余行发出的消息中作为标题携带的第一行的标头名称。 从 5.0 版开始。
9 设置用于向拆分器发送消息的输入通道。
10 设置将消息发送到的输出通道。
11 设置发送超时。 仅当 can 阻止 (例如完整的 .output-channelQueueChannel
12 设置为禁用在刷新上下文时自动启动拆分器。 默认值为 。falsetrue
13 如果 是 .input-channel<publish-subscribe-channel/>
14 设置拆分器的启动阶段(在 时使用)。auto-startuptrue

幂等下游处理拆分文件

如果为 true,则拆分器会在标头中添加行号(如果为 true,则标记计为行)。 线路号可以与幂等接收器一起使用,以避免在重新启动后重新处理线路。apply-sequenceSEQUENCE_NUMBERmarkersSpring中文文档

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}