文件支持

文件支持

Spring Integration 的文件支持通过专用词汇表扩展了 Spring Integration 核心,以处理读取、写入和转换文件。spring-doc.cadn.net.cn

您需要将此依赖项包含在您的项目中:spring-doc.cadn.net.cn

专家
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>6.0.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-file:6.0.9"

它提供了一个命名空间,使元素能够定义专用于文件的通道适配器,并支持可以将文件内容读取到字符串或字节数组中的转换器。spring-doc.cadn.net.cn

本节介绍FileReadingMessageSourceFileWritingMessageHandler以及如何将它们配置为 bean。 它还讨论了通过特定于文件的实现来处理文件的支持Transformer. 最后,它解释了特定于文件的命名空间。spring-doc.cadn.net.cn

读取文件

一个FileReadingMessageSource可用于使用文件系统中的文件。 这是MessageSource从文件系统目录创建消息。 以下示例演示如何配置FileReadingMessageSource:spring-doc.cadn.net.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:directory="${input.directory}"/>

要防止为某些文件创建消息,您可以提供FileListFilter. 默认情况下,我们使用以下过滤器:spring-doc.cadn.net.cn

IgnoreHiddenFileListFilter确保不处理隐藏文件。 请注意,隐藏的确切定义取决于系统。 例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏。 另一方面,Microsoft Windows 有一个专用的文件属性来指示隐藏文件。spring-doc.cadn.net.cn

4.2 版引入了IgnoreHiddenFileListFilter. 在以前的版本中,包含隐藏文件。 在默认配置中,IgnoreHiddenFileListFilter首先触发,然后是AcceptOnceFileListFilter.spring-doc.cadn.net.cn

AcceptOnceFileListFilter确保仅从目录中选取一次文件。spring-doc.cadn.net.cn

AcceptOnceFileListFilter将其状态存储在内存中。 如果您希望状态在系统重启后继续存在,您可以使用FileSystemPersistentAcceptOnceFileListFilter. 此过滤器将接受的文件名存储在MetadataStore实现(请参阅元数据存储)。 此过滤器与文件名和修改时间匹配。spring-doc.cadn.net.cn

从 4.0 版开始,此过滤器需要ConcurrentMetadataStore. 与共享数据存储(例如Redis使用RedisMetadataStore),它允许在多个应用程序实例之间或跨多个服务器使用的网络文件共享共享过滤器键。spring-doc.cadn.net.cn

从版本 4.1.5 开始,此过滤器具有一个新属性 (flushOnUpdate),这会导致它在每次更新时刷新元数据存储(如果存储实现Flushable).spring-doc.cadn.net.cn

持久文件列表过滤器现在具有布尔属性forRecursion. 将此属性设置为true,也设置alwaysAcceptDirectories,这意味着出站网关上的递归作 (lsmget) 现在每次都会遍历完整的目录树。 这是为了解决未检测到目录树深处更改的问题。 另外forRecursion=true导致文件的完整路径用作元数据存储键;这解决了如果具有相同名称的文件多次出现在不同目录中,则过滤器无法正常工作的问题。 重要提示:这意味着对于顶级目录下的文件,将找不到持久元数据存储中的现有密钥。 因此,该属性是false默认情况下;这可能会在将来的版本中更改。spring-doc.cadn.net.cn

以下示例配置FileReadingMessageSource使用过滤器:spring-doc.cadn.net.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="customFilterBean"/>

读取文件的一个常见问题是,在文件准备就绪之前可能会被检测到(即,其他进程可能仍在写入文件)。 默认值AcceptOnceFileListFilter并不能阻止这一点。 在大多数情况下,如果文件写入过程在每个文件准备好读取后立即重命名,则可以防止这种情况发生。 一个filename-patternfilename-regex过滤器,该过滤器仅接受已准备就绪的文件(可能基于已知后缀),使用默认值AcceptOnceFileListFilter,允许这种情况。 这CompositeFileListFilter启用组合,如以下示例所示:spring-doc.cadn.net.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="compositeFilter"/>

<bean id="compositeFilter"
    class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
            <bean class="o.s.i.file.filters.RegexPatternFileListFilter">
                <constructor-arg value="^test.*$"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种选择。 4.2 版本添加了LastModifiedFileListFilter. 此过滤器可以配置age属性,以便过滤器仅传递早于此值的文件。 年龄默认为 60 秒,但您应该选择足够大的年龄以避免提前拾取文件(例如,由于网络故障)。 以下示例演示如何配置LastModifiedFileListFilter:spring-doc.cadn.net.cn

<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
    <property name="age" value="120" />
</bean>

从 4.3.7 版本开始,ChainFileListFilter(的扩展CompositeFileListFilter) 的引入,以允许后续筛选器应仅看到上一个筛选器的结果的情况。 (使用CompositeFileListFilter,所有过滤器都会看到所有文件,但它只传递已通过所有过滤器的文件)。 需要新行为的一个示例是LastModifiedFileListFilterAcceptOnceFileListFilter,当我们不想接受文件时,直到经过一段时间。 使用CompositeFileListFilter,因为AcceptOnceFileListFilter在第一次通过时查看所有文件,当另一个过滤器通过时,它不会再传递它。 这CompositeFileListFilter当模式过滤器与查找辅助文件以指示文件传输完成的自定义过滤器结合使用时,方法非常有用。 模式筛选器可能只传递主文件(例如something.txt),但“完成”过滤器需要查看(例如)something.done存在。spring-doc.cadn.net.cn

假设我们有文件a.txt,a.doneb.txt.spring-doc.cadn.net.cn

模式过滤器仅通过a.txtb.txt,而“完成”过滤器会看到所有三个文件并仅通过a.txt. 复合Filter的最终结果是,只有a.txt被释放。spring-doc.cadn.net.cn

使用ChainFileListFilter,如果链中的任何过滤器返回空列表,则不会调用其余过滤器。

5.0 版引入了ExpressionFileListFilter以对文件执行 SpEL 表达式作为上下文求值根对象。 为此,用于文件处理的所有 XML 组件(本地和远程)以及现有的filter属性,已提供filter-expression选项,如以下示例所示:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter
        directory="${inputdir}"
        filter-expression="name matches '.text'"
        auto-startup="false"/>

5.0.5 版引入了DiscardAwareFileListFilter对被拒绝的文件感兴趣的实现。 为此,应通过addDiscardCallback(Consumer<File>). 在框架中,此功能是从FileReadingMessageSource.WatchServiceDirectoryScanner,结合LastModifiedFileListFilter. 与常规不同DirectoryScannerWatchService提供根据目标文件系统上的事件进行处理的文件。 在轮询具有这些文件的内部队列时,LastModifiedFileListFilter可能会丢弃它们,因为它们相对于其配置的来说太年轻了age. 因此,我们丢失了该文件以备将来可能考虑。discard 回调钩子允许我们将文件保留在内部队列中,以便可以根据age在随后的民意调查中。 这CompositeFileListFilter还实现了一个DiscardAwareFileListFilter并将丢弃回调填充到其所有DiscardAwareFileListFilter代表。spring-doc.cadn.net.cn

因为CompositeFileListFilter将文件与所有委托进行匹配,则discardCallback可以对同一文件多次调用。

从 5.1 版本开始,FileReadingMessageSource不检查目录是否存在,并且直到其start()被调用(通常通过包装SourcePollingChannelAdapter). 以前,没有简单的方法可以防止在引用目录时(例如从测试中引用目录)或稍后应用权限时出现作系统权限错误。spring-doc.cadn.net.cn

邮件头

从 5.0 版开始,FileReadingMessageSource(除了payload作为轮询的File) 将以下标头填充到出站Message:spring-doc.cadn.net.cn

  • FileHeaders.FILENAME:这File.getName()要发送的文件。 可用于后续重命名或复制逻辑。spring-doc.cadn.net.cn

  • FileHeaders.ORIGINAL_FILE:这File对象本身。 通常,当我们丢失原始组件时,此标头会自动由框架组件(例如拆分器转换器)填充File对象。 但是,为了与任何其他自定义用例的一致性和便利性,此标头对于访问原始文件非常有用。spring-doc.cadn.net.cn

  • FileHeaders.RELATIVE_PATH:引入了一个新标头,用于表示文件路径相对于扫描根目录的部分。 当需要在其他位置还原源目录层次结构时,此标头可能很有用。 为此,该DefaultFileNameGenerator(参见“'生成文件名)可以配置为使用此标头。spring-doc.cadn.net.cn

目录扫描和轮询

FileReadingMessageSource不会立即为目录中的文件生成消息。 它使用内部队列来存储scanner. 这scanEachPoll选项用于确保在每次轮询时使用最新的输入目录内容刷新内部队列。 默认情况下 (scanEachPoll = false)、FileReadingMessageSource清空其队列,然后再扫描目录。 此默认行为对于减少目录中大量文件的扫描特别有用。 但是,在需要自定义排序的情况下,请务必考虑将此标志设置为true. 文件的处理顺序可能与预期不同。 默认情况下,队列中的文件以其自然 (path) 订单。 扫描添加的新文件(即使队列已有文件)也会插入到适当的位置以保持该自然顺序。 要自定义订单,请FileReadingMessageSource可以接受Comparator<File>作为构造函数参数。 它由内部 (PriorityBlockingQueue)根据业务需求重新排序其内容。 因此,要按特定顺序处理文件,您应该提供一个比较器FileReadingMessageSource而不是对自定义生成的列表进行排序DirectoryScanner.spring-doc.cadn.net.cn

5.0 版推出RecursiveDirectoryScanner以执行文件树访问。 实现基于Files.walk(Path start, int maxDepth, FileVisitOption…​ options)功能性。 根目录 (DirectoryScanner.listFiles(File)) 参数从结果中排除。 所有其他子目录包含和排除均基于目标FileListFilter实现。 例如,SimplePatternFileListFilter默认过滤掉目录。 看AbstractDirectoryAwareFileListFilter及其实现以获取更多信息。spring-doc.cadn.net.cn

从 5.5 版本开始,FileInboundChannelAdapterSpec的 Java DSL 有一个方便的recursive(boolean)使用RecursiveDirectoryScanner在目标中FileReadingMessageSource而不是默认的。

命名空间支持

使用特定于文件的命名空间可以简化文件读取的配置。 为此,请使用以下模板:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-file="http://www.springframework.org/schema/integration/file"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>

在此命名空间中,您可以减少FileReadingMessageSource并将其包装在入站通道适配器中,如下所示:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn1"
    directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>

<int-file:inbound-channel-adapter id="filesIn2"
    directory="file:${input.directory}"
    filter="customFilterBean" />

<int-file:inbound-channel-adapter id="filesIn3"
    directory="file:${input.directory}"
    filename-pattern="test*" />

<int-file:inbound-channel-adapter id="filesIn4"
    directory="file:${input.directory}"
    filename-regex="test[0-9]+\.txt" />

第一个通道适配器示例依赖于默认的FileListFilter实现:spring-doc.cadn.net.cn

因此,您也可以省略prevent-duplicatesignore-hidden属性,因为它们是true默认情况下。spring-doc.cadn.net.cn

Spring Integration 4.2 引入了ignore-hidden属性。 在以前的版本中,包含隐藏文件。spring-doc.cadn.net.cn

第二个通道适配器示例使用自定义过滤器,第三个使用自定义过滤器filename-pattern属性添加AntPathMatcher基于过滤器,第四个使用该过滤器的filename-regex属性,将基于正则表达式模式的过滤器添加到FileReadingMessageSource. 这filename-patternfilename-regex每个属性都与常规filterreference 属性。 但是,您可以使用filter属性来引用CompositeFileListFilter它结合了任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。spring-doc.cadn.net.cn

当多个进程从同一目录读取时,您可能需要锁定文件以防止它们被并发选取。 为此,您可以使用FileLocker. 有一个java.nio基于的实现可用,但也可以实现您自己的锁定方案。 这nio储物柜可以按如下方式注入:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:nio-locker/>
</int-file:inbound-channel-adapter>

您可以按如下方式配置自定义储物柜:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了储物柜时,它负责在允许接收文件之前获取锁。 它不承担解锁文件的责任。 如果您已经处理了文件并保持锁悬挂,则存在内存泄漏。 如果这是一个问题,您应该调用FileLocker.unlock(File file)在适当的时间自己。

如果过滤和锁定文件还不够,您可能需要控制文件的完全列出方式。 要实现此类需求,您可以使用DirectoryScanner. 此扫描程序可让您准确确定每次轮询中列出的文件。 这也是 Spring Integration 内部用于连接的接口FileListFilterinstances 和FileLockerFileReadingMessageSource. 您可以注入自定义DirectoryScanner进入<int-file:inbound-channel-adapter/>scanner属性,如以下示例所示:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
     scanner="customDirectoryScanner"/>

这样做使您可以完全自由地选择排序、列表和锁定策略。spring-doc.cadn.net.cn

同样重要的是要了解过滤器(包括patterns,regex,prevent-duplicates等)和locker实例实际上由scanner. 在适配器上设置的任何这些属性随后都会注入到内部scanner. 对于外部scanner,则禁止在FileReadingMessageSource. 必须在该自定义中指定(如果需要)DirectoryScanner. 换句话说,如果您注入scanner进入FileReadingMessageSource,您应该提供filterlocker关于这一点scanner,而不是在FileReadingMessageSource.spring-doc.cadn.net.cn

默认情况下,DefaultDirectoryScanner使用IgnoreHiddenFileListFilterAcceptOnceFileListFilter. 若要防止使用它们,您可以配置自己的过滤器(例如AcceptAllFileListFilter),甚至将其设置为null.

WatchServiceDirectoryScanner

FileReadingMessageSource.WatchServiceDirectoryScanner当将新文件添加到目录时依赖于文件系统事件。 在初始化期间,将注册目录以生成事件。 初始文件列表也是在初始化期间构建的。 在遍历目录树时,遇到的任何子目录也会被注册以生成事件。 在第一次轮询时,返回遍历目录的初始文件列表。 在后续轮询中,将返回来自新创建事件的文件。 如果添加了新的子目录,则其创建事件将用于遍历新子树以查找现有文件并注册找到的任何新子目录。spring-doc.cadn.net.cn

有一个问题WatchKey当其内部事件queue不会在目录修改事件发生时被程序清空。 如果超过队列大小,则StandardWatchEventKinds.OVERFLOW发出以指示某些文件系统事件可能丢失。 在这种情况下,将完全重新扫描根目录。 为避免重复,请考虑使用适当的FileListFilter(例如AcceptOnceFileListFilter) 或在处理完成后删除文件。

WatchServiceDirectoryScanner可以通过FileReadingMessageSource.use-watch-service选项,它与scanner选择。 内部FileReadingMessageSource.WatchServiceDirectoryScanner实例为提供的directory.spring-doc.cadn.net.cn

此外,现在WatchService轮询逻辑可以跟踪StandardWatchEventKinds.ENTRY_MODIFYStandardWatchEventKinds.ENTRY_DELETE.spring-doc.cadn.net.cn

如果您需要跟踪现有文件和新文件的修改,则应实现ENTRY_MODIFYevents 逻辑中的FileListFilter. 否则,将以相同的方式处理这些事件中的文件。spring-doc.cadn.net.cn

ResettableFileListFilter实现会选择ENTRY_DELETE事件。 因此,他们的文件是为remove()操作。 启用此事件后,过滤器(例如AcceptOnceFileListFilter删除该文件。 因此,如果出现同名文件,则该文件将通过筛选器并作为消息发送。spring-doc.cadn.net.cn

为此,该watch-events属性 (FileReadingMessageSource.setWatchEvents(WatchEventType…​ watchEvents))已被引入。 (WatchEventTypeFileReadingMessageSource.) 有了这样的选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他逻辑。 以下示例展示了如何为同一目录中的创建和修改事件配置不同的逻辑:spring-doc.cadn.net.cn

值得一提的是,ENTRY_DELETE事件涉及监视目录子目录的重命名作。 更具体地说,ENTRY_DELETE事件,与上一个目录名称相关,位于ENTRY_CREATE通知新(重命名)目录的事件。 在某些作系统(如 Windows)上,ENTRY_DELETE必须注册事件来处理这种情况。 否则,在文件资源管理器中重命名监视的子目录可能会导致在该子目录中未检测到新文件。spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="newFiles"
     directory="${input.directory}"
     use-watch-service="true"/>

<int-file:inbound-channel-adapter id="modifiedFiles"
     directory="${input.directory}"
     use-watch-service="true"
     filter="acceptAllFilter"
     watch-events="MODIFY"/> <!-- The default is CREATE. -->

限制内存消耗

您可以使用HeadDirectoryScanner以限制内存中保留的文件数。 这在扫描大型目录时非常有用。 对于 XML 配置,这是通过设置queue-size入站通道适配器上的属性。spring-doc.cadn.net.cn

在 4.2 版之前,此设置与任何其他过滤器的使用不兼容。任何其他过滤器(包括prevent-duplicates="true") 覆盖了用于限制大小的过滤器。spring-doc.cadn.net.cn

使用HeadDirectoryScannerAcceptOnceFileListFilter. 由于在轮询决策期间会咨询所有过滤器,因此AcceptOnceFileListFilter不知道其他过滤器可能正在临时过滤文件。即使之前被HeadDirectoryScanner.HeadFilter现在可用,则AcceptOnceFileListFilter过滤它们。spring-doc.cadn.net.cn

通常,而不是使用AcceptOnceFileListFilter在这种情况下,您应该删除已处理的文件,以便以前过滤的文件在将来的轮询中可用。spring-doc.cadn.net.cn

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
         FileReadingMessageSource source = new FileReadingMessageSource();
         source.setDirectory(new File(INBOUND_PATH));
         source.setFilter(new SimplePatternFileListFilter("*.txt"));
         return source;
    }

    @Bean
    @Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
    public FileToStringTransformer fileToStringTransformer() {
        return new FileToStringTransformer();
    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileReadingFlow() {
         return IntegrationFlow
                  .from(Files.inboundAdapter(new File(INBOUND_PATH))
                              .patternFilter("*.txt"),
                          e -> e.poller(Pollers.fixedDelay(1000)))
                  .transform(Files.toStringTransformer())
                  .channel("processFileChannel")
                  .get();
    }

}

'tailing'文件

另一个流行的用例是从文件的末尾(或尾部)获取“行”,并在添加新行时捕获新行。 提供了两种实现。 第一个,OSDelegatingFileTailingMessageProducer,使用本机tail命令(在具有命令的作系统上)。 这通常是这些平台上最有效的实现。 对于没有tail命令,第二个实现,ApacheCommonsFileTailingMessageProducer,使用 Apachecommons-io Tailer类。spring-doc.cadn.net.cn

在这两种情况下,文件系统事件(例如文件不可用和其他事件)都发布为ApplicationEvent实例使用正常的 Spring 事件发布机制。 此类事件的示例包括:spring-doc.cadn.net.cn

[message=tail: cannot open '/tmp/somefile' for reading:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become inaccessible:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has appeared;
               following end of new file, file=/tmp/somefile]

例如,在轮换文件时,可能会发生上述示例中显示的事件序列。spring-doc.cadn.net.cn

从 5.0 版开始,一个FileTailingIdleEvent在文件中没有数据时发出idleEventInterval. 以下示例显示了此类事件的外观:spring-doc.cadn.net.cn

[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持tail命令提供这些状态消息。

从这些终结点发出的消息具有以下标头:spring-doc.cadn.net.cn

在 5.0 之前的版本中,FileHeaders.FILENAMEheader 包含文件绝对路径的字符串表示。 现在,您可以通过调用getAbsolutePath()在原始文件头上。

以下示例使用默认选项(“-F -n 0”,表示跟随当前末尾的文件名)创建本机适配器。spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	task-executor="exec"
	file="/tmp/foo"/>

以下示例创建一个带有“-F -n +0”选项的本机适配器(意味着遵循文件名,发出所有现有行)。spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	native-options="-F -n +0"
	task-executor="exec"
	file-delay=10000
	file="/tmp/foo"/>

如果tail命令失败(在某些平台上,丢失的文件会导致tail失败,即使有-F指定),则每 10 秒重试一次命令。spring-doc.cadn.net.cn

默认情况下,本机适配器从标准输出中捕获内容,并将内容作为消息发送。 它们还捕获标准误差以引发事件。 从版本 4.3.6 开始,您可以通过将enable-status-readerfalse,如以下示例所示:spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	enable-status-reader="false"
	task-executor="exec"
	file="/tmp/foo"/>

在以下示例中,IdleEventInterval设置为5000,这意味着,如果五秒钟内没有写任何行,FileTailingIdleEvent每五秒触发一次:spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	idle-event-interval="5000"
	task-executor="exec"
	file="/tmp/somefile"/>

当您需要停止适配器时,这会很有用。spring-doc.cadn.net.cn

以下示例创建 Apachecommons-io Tailer适配器,每两秒检查一次文件中的新行,并每十秒检查是否存在丢失的文件:spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="apache"
	channel="input"
	task-executor="exec"
	file="/tmp/bar"
	delay="2000"
	end="false"             (1)
	reopen="true"           (2)
	file-delay="10000"/>
1 该文件从头开始就尾随 (end="false") 而不是结束(这是默认值)。
2 每个块都会重新打开该文件(默认值是保持文件打开)。
指定delay,endreopenattributes 强制使用 Apachecommons-io适配器,并使native-options属性不可用。

处理不完整的数据

文件传输方案中的一个常见问题是如何确定传输是否完成,以便您不会开始读取不完整的文件。 解决此问题的常用技术是使用临时名称编写文件,然后以原子方式将其重命名为最终名称。 这种技术与屏蔽临时文件不被消费者获取的过滤器一起提供了一个强大的解决方案。 此技术由写入文件(本地或远程)的 Spring Integration 组件使用。 默认情况下,它们将.writing添加到文件名,并在传输完成后将其删除。spring-doc.cadn.net.cn

另一种常见的技术是编写第二个“标记”文件以指示文件传输完成。 在这种情况下,您不应考虑somefile.txt(例如)可以使用到somefile.txt.complete也存在。 Spring Integration 5.0 版引入了新的过滤器来支持此机制。 为文件系统(FileSystemMarkerFilePresentFileListFilter)、FTPSFTP。 它们是可配置的,因此标记文件可以具有任何名称,尽管它通常与正在传输的文件相关。 有关更多信息,请参阅 Javadocspring-doc.cadn.net.cn

写入文件

要将消息写入文件系统,您可以使用FileWritingMessageHandler. 此类可以处理以下有效负载类型:spring-doc.cadn.net.cn

对于字符串有效负载,您可以配置编码和字符集。spring-doc.cadn.net.cn

为了简化作,您可以配置FileWritingMessageHandler作为出站通道适配器或出站网关的一部分,使用 XML 命名空间。spring-doc.cadn.net.cn

从 4.3 版开始,您可以指定写入文件时要使用的缓冲区大小。spring-doc.cadn.net.cn

从 5.1 版开始,您可以提供BiConsumer<File, Message<?>> newFileCallback如果您使用FileExistsMode.APPENDFileExistsMode.APPEND_NO_FLUSH并且必须创建一个新文件。 此回调接收新创建的文件和触发该文件的消息。 例如,此回调可用于编写消息标头中定义的 CSV 标头。spring-doc.cadn.net.cn

生成文件名

在最简单的形式中,FileWritingMessageHandler只需要一个目标目录来写入文件。 要写入的文件的名称由处理程序的FileNameGenerator. 默认实现查找键与定义为FileHeaders.FILENAME.spring-doc.cadn.net.cn

或者,您可以指定要根据消息计算的表达式以生成文件名,例如headers['myCustomHeader'] + '.something'. 表达式的计算结果必须为String. 为方便起见,该DefaultFileNameGenerator还提供setHeaderName方法,允许您显式指定其值用作文件名的消息头。spring-doc.cadn.net.cn

设置完成后,DefaultFileNameGenerator采用以下解析步骤来确定给定消息有效负载的文件名:spring-doc.cadn.net.cn

  1. 根据消息计算表达式,如果结果为非空String,将其用作文件名。spring-doc.cadn.net.cn

  2. 否则,如果有效负载是java.io.File,使用File对象的文件名。spring-doc.cadn.net.cn

  3. 否则,请使用附加的消息 ID 。msg作为文件名。spring-doc.cadn.net.cn

使用 XML 命名空间支持时,文件出站通道适配器和文件出站网关都支持以下互斥配置属性:spring-doc.cadn.net.cn

写入文件时,使用临时文件后缀(其默认值为.writing). 在写入文件时,它会附加到文件名中。 要自定义后缀,您可以将temporary-file-suffix文件出站通道适配器和文件出站网关上的属性。spring-doc.cadn.net.cn

使用APPEND文件modetemporary-file-suffix属性被忽略,因为数据直接附加到文件中。

从 4.2.5 版开始,生成的文件名(由于filename-generatorfilename-generator-expressionevaluation)可以表示子路径和目标文件名。 它用作File(File parent, String child)如故。 但是,过去我们没有创建 (mkdirs()) 目录,仅假设文件名。 当我们需要恢复文件系统树以匹配源目录时,这种方法很有用——例如,当解压缩存档并按原始顺序保存目标目录中的所有文件时。spring-doc.cadn.net.cn

指定输出目录

文件出站通道适配器和文件出站网关都提供了两个互斥的配置属性来指定输出目录:spring-doc.cadn.net.cn

Spring Integration 2.2 引入了directory-expression属性。
使用directory属性

当您使用directory属性,则输出目录设置为固定值,该值在FileWritingMessageHandler已初始化。如果未指定此属性,则必须使用directory-expression属性。spring-doc.cadn.net.cn

使用directory-expression属性

如果您想获得完整的 SpEL 支持,您可以使用directory-expression属性。 此属性接受针对正在处理的每条消息计算的 SpEL 表达式。因此,在动态指定输出文件目录时,您可以完全访问消息的有效负载及其标头。spring-doc.cadn.net.cn

SpEL 表达式必须解析为String,java.io.Fileorg.springframework.core.io.Resource. (后者被计算为File无论如何。此外,由此产生的StringFile必须指向一个目录。如果未指定directory-expression属性,则必须将directory属性。spring-doc.cadn.net.cn

使用auto-create-directory属性

默认情况下,如果目标目录不存在,则会自动创建相应的目标目录和任何不存在的父目录。要防止这种行为,您可以将auto-create-directory属性设置为false. 此属性适用于directorydirectory-expression属性。spring-doc.cadn.net.cn

使用directory属性和auto-create-directoryfalse,从 Spring Integration 2.2 开始进行了以下更改:spring-doc.cadn.net.cn

现在,不再在初始化适配器时检查目标目录是否存在,而是对正在处理的每条消息执行此检查。spring-doc.cadn.net.cn

此外,如果auto-create-directorytrue并且在处理消息之间删除了目录,则为正在处理的每条消息重新创建目录。spring-doc.cadn.net.cn

处理现有目标文件

当您写入文件并且目标文件已存在时,默认行为是覆盖该目标文件。您可以通过将mode属性。存在以下选项:spring-doc.cadn.net.cn

Spring Integration 2.2 引入了mode属性和APPEND,FAILIGNORE选项。
REPLACE

如果目标文件已经存在,则将其覆盖。如果mode属性,这是写入文件时的默认行为。spring-doc.cadn.net.cn

REPLACE_IF_MODIFIED

如果目标文件已存在,则仅当上次修改的时间戳与源文件的时间戳不同时,才会覆盖该文件。 为File有效载荷, 有效载荷lastModified时间与现有文件进行比较。对于其他有效负载,FileHeaders.SET_MODIFIED (file_setModified) 标头与现有文件进行比较。如果标头缺失或具有不是Number,则始终替换该文件。spring-doc.cadn.net.cn

APPEND

此模式允许您将消息内容附加到现有文件,而不是每次都创建新文件。请注意,此属性与temporary-file-suffix属性,因为当它将内容追加到现有文件时,适配器不再使用临时文件。该文件在每条消息后关闭。spring-doc.cadn.net.cn

APPEND_NO_FLUSH

此选项具有与APPEND,但数据不会刷新,并且文件不会在每条消息后关闭。这可以提供显着的性能,但在发生故障时存在数据丢失的风险。 看使用时刷新文件APPEND_NO_FLUSH了解更多信息。spring-doc.cadn.net.cn

FAIL

如果目标文件存在,则MessageHandlingException被抛出。spring-doc.cadn.net.cn

IGNORE

如果目标文件存在,则以静默方式忽略消息有效负载。spring-doc.cadn.net.cn

使用临时文件后缀时(默认值为.writing)、IGNORE如果存在最终文件名或临时文件名,则选项适用。

使用时刷新文件APPEND_NO_FLUSH

APPEND_NO_FLUSH模式是在 4.3 版中添加的。使用它可以提高性能,因为文件不会在每条消息后关闭。但是,这可能会导致发生故障时数据丢失。spring-doc.cadn.net.cn

Spring Integration 提供了几种刷新策略来减轻这种数据丢失:spring-doc.cadn.net.cn

  • flushInterval. 如果在此时间段内未写入文件,则会自动刷新该文件。这是近似值,可能高达1.33x这一次(平均1.167x).spring-doc.cadn.net.cn

  • 将包含正则表达式的消息发送到消息处理程序的trigger方法。 具有与模式匹配的绝对路径名的文件将被刷新。spring-doc.cadn.net.cn

  • 为处理程序提供自定义MessageFlushPredicate实现来修改将消息发送到trigger方法。spring-doc.cadn.net.cn

  • 调用处理程序的flushIfNeeded方法通过传入自定义FileWritingMessageHandler.FlushPredicateFileWritingMessageHandler.MessageFlushPredicate实现。spring-doc.cadn.net.cn

为每个打开的文件调用谓词。有关更多信息,请参阅这些接口的 Javadoc。请注意,从 V5.0 开始,谓词方法提供了另一个参数:如果是新的或以前关闭的,则当前文件首次写入的时间。spring-doc.cadn.net.cn

使用时flushInterval,则间隔从上次写入开始。仅当文件在间隔内处于空闲状态时,才会刷新该文件。从版本 4.3.7 开始,附加属性 (flushWhenIdle) 可以设置为false,这意味着间隔从第一次写入先前刷新的(或新)文件开始。spring-doc.cadn.net.cn

文件时间戳

默认情况下,目标文件的lastModifiedtimestamp 是创建文件的时间(就地重命名保留当前时间戳除外)。 从 4.3 版开始,您现在可以配置preserve-timestamp(或setPreserveTimestamp(true)使用 Java 配置时)。 为File有效负载,这会将时间戳从入站文件传输到出站文件(无论是否需要副本)。 对于其他有效负载,如果FileHeaders.SET_MODIFIED标头 (file_setModified) 存在,则用于设置目标文件的lastModifiedtimestamp,只要标头是Number.spring-doc.cadn.net.cn

文件权限

从 V5.0 开始,将文件写入支持 Posix 权限的文件系统时,可以在出站通道适配器或网关上指定这些权限。 该属性是一个整数,通常以熟悉的八进制格式提供,例如0640,这意味着所有者具有读/写权限,组具有只读权限,而其他人没有访问权限。spring-doc.cadn.net.cn

文件出站通道适配器

以下示例配置文件出站通道适配器:spring-doc.cadn.net.cn

<int-file:outbound-channel-adapter id="filesOut" directory="${input.directory.property}"/>

基于命名空间的配置还支持delete-source-files属性。 如果设置为true,则在写入目标后触发删除原始源文件。 该标志的默认值为false. 以下示例演示如何将其设置为true:spring-doc.cadn.net.cn

<int-file:outbound-channel-adapter id="filesOut"
    directory="${output.directory}"
    delete-source-files="true"/>
delete-source-files属性仅当入站消息具有File有效负载或如果FileHeaders.ORIGINAL_FILEheader 值包含源File实例或String表示原始文件路径。

从 4.2 版开始,FileWritingMessageHandler支持append-new-line选择。 如果设置为true,则在写入消息后将向文件附加一行新行。 默认属性值为false. 以下示例演示如何使用append-new-line选择:spring-doc.cadn.net.cn

<int-file:outbound-channel-adapter id="newlineAdapter"
	append-new-line="true"
    directory="${output.directory}"/>

出站网关

如果要继续根据写入的文件处理消息,可以使用outbound-gateway相反。 它的作用类似于outbound-channel-adapter. 但是,在写入文件后,它还将其作为消息的有效负载发送到回复通道。spring-doc.cadn.net.cn

以下示例配置出站网关:spring-doc.cadn.net.cn

<int-file:outbound-gateway id="mover" request-channel="moveInput"
    reply-channel="output"
    directory="${output.directory}"
    mode="REPLACE" delete-source-files="true"/>

如前所述,您还可以指定mode属性,它定义了如何处理目标文件已存在的情况的行为。 有关更多详细信息,请参阅处理现有目标文件。 通常,当使用文件出站网关时,结果文件会作为回复通道上的消息负载返回。spring-doc.cadn.net.cn

这也适用于指定IGNORE模式。 在这种情况下,将返回预先存在的目标文件。 如果请求消息的有效负载是文件,则您仍然可以通过消息头访问该原始文件。 参见FileHeaders.ORIGINAL_FILEspring-doc.cadn.net.cn

“outbound-gateway”在您希望首先移动文件然后通过处理管道发送它的情况下效果很好。 在这种情况下,您可以连接文件命名空间的inbound-channel-adapter元素添加到outbound-gateway,然后连接该网关的reply-channel到管道的开头。

如果您有更详细的要求或需要支持其他有效负载类型作为输入以转换为文件内容,则可以扩展FileWritingMessageHandler,但更好的选择是依赖Transformer.spring-doc.cadn.net.cn

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
@IntegrationComponentScan
public class FileWritingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                      new SpringApplicationBuilder(FileWritingJavaApplication.class)
                              .web(false)
                              .run(args);
             MyGateway gateway = context.getBean(MyGateway.class);
             gateway.writeToFile("foo.txt", new File(tmpDir.getRoot(), "fileWritingFlow"), "foo");
    }

    @Bean
    @ServiceActivator(inputChannel = "writeToFileChannel")
    public MessageHandler fileWritingMessageHandler() {
         Expression directoryExpression = new SpelExpressionParser().parseExpression("headers.directory");
         FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
         handler.setFileExistsMode(FileExistsMode.APPEND);
         return handler;
    }

    @MessagingGateway(defaultRequestChannel = "writeToFileChannel")
    public interface MyGateway {

        void writeToFile(@Header(FileHeaders.FILENAME) String fileName,
                       @Header(FileHeaders.FILENAME) File directory, String data);

    }
}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class FileWritingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                 new SpringApplicationBuilder(FileWritingJavaApplication.class)
                         .web(false)
                         .run(args);
        MessageChannel fileWritingInput = context.getBean("fileWritingInput", MessageChannel.class);
        fileWritingInput.send(new GenericMessage<>("foo"));
    }

    @Bean
   	public IntegrationFlow fileWritingFlow() {
   	    return IntegrationFlow.from("fileWritingInput")
   		        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.txt")
   		                  .header("directory", new File(tmpDir.getRoot(), "fileWritingFlow")))
   	            .handle(Files.outboundGateway(m -> m.getHeaders().get("directory")))
   	            .channel(MessageChannels.queue("fileWritingResultChannel"))
   	            .get();
    }

}

文件转换器

要将从文件系统读取的数据转换为对象,反之亦然,您需要做一些工作。 与FileReadingMessageSource并且在较小程度上FileWritingMessageHandler,您可能需要自己的机制来完成工作。 为此,您可以实现Transformer接口。 或者,您可以扩展AbstractFilePayloadTransformer对于入站消息。 Spring Integration 提供了一些明显的实现。spring-doc.cadn.net.cn

请参阅Javadoc 的Transformer接口查看哪些 Spring Integration 类实现了它。 同样,您可以检查Javadoc 的AbstractFilePayloadTransformer查看哪些 Spring Integration 类扩展了它。spring-doc.cadn.net.cn

FileToByteArrayTransformer延伸AbstractFilePayloadTransformer并转换Fileobject 转换为byte[]通过使用 Spring 的FileCopyUtils. 使用一系列转换器通常比将所有转换放在一个类中要好。 在这种情况下,Filebyte[]转换可能是合乎逻辑的第一步。spring-doc.cadn.net.cn

FileToStringTransformer延伸AbstractFilePayloadTransformer转换一个FileString. 如果不出意外的话,这对于调试很有用(考虑将其与窃听器一起使用)。spring-doc.cadn.net.cn

要配置特定于文件的转换器,您可以使用文件命名空间中的相应元素,如以下示例所示:spring-doc.cadn.net.cn

<int-file:file-to-bytes-transformer  input-channel="input" output-channel="output"
    delete-files="true"/>

<int-file:file-to-string-transformer input-channel="input" output-channel="output"
    delete-files="true" charset="UTF-8"/>

delete-files选项向转换器发出信号,表明它应该在转换完成后删除入站文件。 这绝不能替代使用AcceptOnceFileListFilterFileReadingMessageSource在多线程环境中使用(例如,当您通常使用 Spring Integration 时)。spring-doc.cadn.net.cn

文件拆分器

FileSplitter在 4.1.2 版中添加了,并且在 4.2 版中添加了其命名空间支持。 这FileSplitter将文本文件拆分为单独的行,基于BufferedReader.readLine(). 默认情况下,拆分器使用Iterator在从文件中读取一行时一次发出一行。 设置iterator属性设置为false导致它在将所有行作为消息发出之前将其读取到内存中。 这方面的一个用例可能是,如果要在发送任何包含行的消息之前检测文件上的 I/O 错误。 但是,它仅适用于相对较短的文件。spring-doc.cadn.net.cn

入站有效负载可以是File,String(一个File路径)、InputStreamReader. 其他有效负载类型将保持不变。spring-doc.cadn.net.cn

以下列表显示了配置FileSplitter:spring-doc.cadn.net.cn

Java DSL
@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
Kotlin DSL
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
Java
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
XML
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 拆分器的 bean 名称。
2 设置为true(默认)使用迭代器或false在发送行之前将文件加载到内存中。
3 设置为true在文件数据之前和之后发出文件开始和文件结束标记消息。 标记是带有FileSplitter.FileMarker有效负载(使用STARTENDmark属性)。 在过滤某些行的下游流中按顺序处理文件时,可以使用标记。 它们使下游处理能够知道文件何时已完全处理。 此外,一个file_marker包含STARTEND添加到这些消息中。 这END标记包括行数。 如果文件为空,则仅STARTEND标记会以0作为lineCount. 默认值为false. 什么时候true,apply-sequencefalse默认情况下。 也可以看看markers-json(下一个属性)。
4 什么时候markers为 true,则将此设置为true要拥有FileMarker对象转换为 JSON 字符串。 (使用SimpleJsonSerializer下面)。
5 设置为false禁用包含sequenceSizesequenceNumber报头。 默认值为true除非markerstrue. 什么时候truemarkerstrue,标记包含在测序中。 什么时候trueiteratortruesequenceSizeheader 设置为0,因为尺寸未知。
6 设置为true导致RequiresReplyException如果文件中没有行,则引发。 默认值为false.
7 设置将文本数据读入String负载。 默认值是平台字符集。
8 要作为其余行发出的邮件中的标头携带的第一行的标头名称。 从 5.0 版本开始。
9 设置用于向拆分器发送消息的输入通道。
10 设置将消息发送到的输出通道。
11 设置发送超时。 仅适用于output-channel可以阻止 — 例如 fullQueueChannel.
12 设置为false以禁用在刷新上下文时自动启动拆分器。 默认值为true.
13 设置此端点的顺序,如果input-channel是一个<publish-subscribe-channel/>.
14 设置分路器的启动阶段(在以下情况下使用auto-startuptrue).

FileSplitter还拆分任何基于文本的InputStream成行。 从版本 4.3 开始,当与 FTP 或 SFTP 流式入站通道适配器或使用stream选项检索文件时,拆分器会在文件完全使用时自动关闭支持流的会话 有关这些设施的详细信息,请参阅 FTP 流式处理入站通道适配器SFTP 流式处理入站通道适配器以及 FTP 出站网关SFTP 出站网关spring-doc.cadn.net.cn

使用 Java 配置时,可以使用其他构造函数,如以下示例所示:spring-doc.cadn.net.cn

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

什么时候markersJson为 true,则标记表示为 JSON 字符串(使用SimpleJsonSerializer).spring-doc.cadn.net.cn

5.0 版本引入了firstLineAsHeader选项来指定内容的第一行是标题(例如 CSV 文件中的列名)。 传递给此属性的参数是标头名称,在该标头名称下,第一行作为其余行发出的消息中的标头。 此行不包含在序列标头中(如果applySequence为 true),也不在lineCount关联FileMarker.END. 注意:从版本 5.5 开始,lineCount' 也包含在FileHeaders.LINE_COUNTFileMarker.ENDmessage,因为FileMarker可以序列化为 JSON。 如果文件仅包含标题行,则该文件被视为空,因此仅FileMarker在拆分期间发出实例(如果启用了标记,否则不会发出任何消息)。 默认情况下(如果未设置标头名称),则第一行被视为数据,并成为第一个发出的消息的有效负载。spring-doc.cadn.net.cn

如果您需要有关从文件内容中提取标头的更复杂的逻辑(不是第一行,不是行的全部内容,不是一个特定标头等),请考虑在FileSplitter. 请注意,已移动到标题的行可能会从正常内容进程的下游进行过滤。spring-doc.cadn.net.cn

幂等下游处理拆分文件

什么时候apply-sequence为 true,则拆分器在SEQUENCE_NUMBER标头(当markers为 true,则标记计为行)。 行号可以与幂等接收器一起使用,以避免在重新启动后重新处理行。spring-doc.cadn.net.cn

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}

文件聚合器

从 5.5 版开始,一个FileAggregator被引入以涵盖FileSplitter启用 START/END 标记时的用例。 为方便起见,该FileAggregator实现所有三种序列详细信息策略:spring-doc.cadn.net.cn

  • HeaderAttributeCorrelationStrategy使用FileHeaders.FILENAME属性用于关联键计算。 当在FileSplitter,它不会填充序列详细信息标头,因为 START/END 标记消息也包含在序列大小中。 这FileHeaders.FILENAME仍会为发出的每一行填充,包括 START/END 标记消息。spring-doc.cadn.net.cn

  • FileMarkerReleaseStrategy- 检查FileSplitter.FileMarker.Mark.END消息,然后比较FileHeaders.LINE_COUNT标头值,组大小减去2 - FileSplitter.FileMarker实例。 它还实现了方便的GroupConditionProvider联系方式conditionSupplier函数要用于AbstractCorrelatingMessageHandler. 有关详细信息,请参阅消息组条件spring-doc.cadn.net.cn

  • FileAggregatingMessageGroupProcessor只需删除FileSplitter.FileMarker消息,并将其余消息收集到列表有效负载中进行生成。spring-doc.cadn.net.cn

以下列表显示了配置FileAggregator:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
    return f -> f
            .split(Files.splitter()
                    .markers()
                    .firstLineAsHeader("firstLine"))
            .channel(c -> c.executor(taskExecutor))
            .filter(payload -> !(payload instanceof FileSplitter.FileMarker),
                    e -> e.discardChannel("aggregatorChannel"))
            .<String, String>transform(String::toUpperCase)
            .channel("aggregatorChannel")
            .aggregate(new FileAggregator())
            .channel(c -> c.queue("resultChannel"));
}
Kotlin DSL
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
    integrationFlow {
        split(Files.splitter().markers().firstLineAsHeader("firstLine"))
        channel { executor(taskExecutor) }
        filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
        transform(String::toUpperCase)
        channel("aggregatorChannel")
        aggregate(FileAggregator())
        channel { queue("resultChannel") }
    }
Java
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setProcessorBean(new FileAggregator());
    aggregator.setOutputChannel(outputChannel);
    return aggregator;
}
XML
<int:chain input-channel="input" output-channel="output">
    <int-file:splitter markers="true"/>
    <int:aggregator>
        <bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
    </int:aggregator>
</int:chain>

如果默认行为的FileAggregator不满足目标逻辑,建议使用单个策略配置聚合器端点。 看FileAggregatorJavaDocs 了解更多信息。spring-doc.cadn.net.cn

远程持久文件列表过滤器

入站和流式入站远程文件通道适配器 (FTP,SFTP和其他技术)配置了相应的实现AbstractPersistentFileListFilter默认情况下,配置了内存中的MetadataStore. 要在集群中运行,可以使用共享的MetadataStore(有关详细信息,请参阅元数据存储)。 这些过滤器用于防止多次获取同一文件(除非修改时间更改)。 从 5.2 版开始,在获取文件之前,会立即将文件添加到过滤器中(如果获取失败,则将其反转)。spring-doc.cadn.net.cn

如果发生灾难性故障(例如断电),当前正在获取的文件可能会保留在过滤器中,并且在重新启动应用程序时不会重新获取。 在这种情况下,您需要手动从MetadataStore.

在以前的版本中,在获取任何文件之前都会过滤这些文件,这意味着在发生灾难性故障后,多个文件可能处于此状态。spring-doc.cadn.net.cn

为了促进这种新行为,在FileListFilter.spring-doc.cadn.net.cn

boolean accept(F file);

boolean supportsSingleFileFiltering();

如果过滤器返回truesupportsSingleFileFiltering,它必须实现accept().spring-doc.cadn.net.cn

如果远程过滤器不支持单个文件过滤(例如AbstractMarkerFilePresentFileListFilter),适配器将恢复到以前的行为。spring-doc.cadn.net.cn

如果使用了多个过滤器(使用CompositeFileListFilterChainFileListFilter),则所有委托筛选器都必须支持单个文件筛选,复合筛选器才能支持它。spring-doc.cadn.net.cn

持久文件列表过滤器现在具有布尔属性forRecursion. 将此属性设置为true,也设置alwaysAcceptDirectories,这意味着出站网关上的递归作 (lsmget) 现在每次都会遍历完整的目录树。 这是为了解决未检测到目录树深处更改的问题。 另外forRecursion=true导致文件的完整路径用作元数据存储键;这解决了如果具有相同名称的文件多次出现在不同目录中,则过滤器无法正常工作的问题。 重要提示:这意味着对于顶级目录下的文件,将找不到持久元数据存储中的现有密钥。 因此,该属性是false默认情况下;这可能会在将来的版本中更改。spring-doc.cadn.net.cn