文件支持

文件支持

Spring 集成的文件支持通过专用词汇表扩展了 Spring 集成核心,以处理读取、写入和转换文件。spring-doc.cn

您需要将此依赖项包含在您的项目中:spring-doc.cn

Maven 系列
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>6.0.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-file:6.0.9"

它提供了一个名称空间,该名称空间允许元素定义专用于文件的通道适配器,并支持可以将文件内容读取到字符串或字节数组中的转换器。spring-doc.cn

本节解释了 和 的工作原理,以及如何将它们配置为 bean。 它还讨论了通过特定于文件的实现来处理文件的支持。 最后,它解释了特定于文件的命名空间。FileReadingMessageSourceFileWritingMessageHandlerTransformerspring-doc.cn

读取文件

A 可用于使用文件系统中的文件。 这是从文件系统目录创建消息的实现。 以下示例显示如何配置 :FileReadingMessageSourceMessageSourceFileReadingMessageSourcespring-doc.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:directory="${input.directory}"/>

要防止为某些文件创建消息,您可以提供 . 默认情况下,我们使用以下筛选器:FileListFilterspring-doc.cn

可确保不处理隐藏文件。 请注意,hidden 的确切定义取决于系统。 例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏文件。 另一方面,Microsoft Windows 有一个专用的 file 属性来指示隐藏文件。IgnoreHiddenFileListFilterspring-doc.cn

版本 4.2 引入了 . 在以前的版本中,包含隐藏文件。 使用默认配置时,首先触发 ,然后触发 .IgnoreHiddenFileListFilterIgnoreHiddenFileListFilterAcceptOnceFileListFilterspring-doc.cn

这可确保仅从目录中选取一次文件。AcceptOnceFileListFilterspring-doc.cn

将其状态存储在内存中。 如果您希望该状态在系统重启后仍然存在,可以使用 . 此过滤器将接受的文件名存储在实施中(请参阅 元数据存储)。 此筛选条件与文件名和修改时间匹配。AcceptOnceFileListFilterFileSystemPersistentAcceptOnceFileListFilterMetadataStorespring-doc.cn

从 4.0 版本开始,此过滤器需要一个 . 当与共享数据存储一起使用时(例如与 ),它允许在多个应用程序实例之间或在多个服务器使用的网络文件共享之间共享筛选键。ConcurrentMetadataStoreRedisRedisMetadataStorespring-doc.cn

从版本 4.1.5 开始,此过滤器具有一个新属性 (),该属性会导致它在每次更新时刷新元数据存储(如果存储实现)。flushOnUpdateFlushablespring-doc.cn

持久性文件列表过滤器现在具有 boolean 属性 。 将此属性设置为 , 还会设置 ,这意味着出站网关( 和 )上的递归操作现在每次都将始终遍历整个目录树。 这是为了解决未检测到目录树深处更改的问题。 此外,还会导致将文件的完整路径用作元数据存储键;这解决了以下问题:如果具有相同名称的文件在不同目录中多次出现,则过滤器无法正常工作。 重要说明:这意味着对于顶级目录下的文件,将无法找到持久性元数据存储中的现有键。 因此,默认情况下,该属性为;这可能会在未来版本中更改。forRecursiontruealwaysAcceptDirectorieslsmgetforRecursion=truefalsespring-doc.cn

以下示例使用 filter 配置 a:FileReadingMessageSourcespring-doc.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="customFilterBean"/>

读取文件的常见问题是,文件可能在准备就绪之前被检测到(即,其他进程可能仍在写入该文件)。 默认值不会阻止此操作。 在大多数情况下,如果文件写入过程在准备好读取每个文件后立即重命名每个文件,则可以防止这种情况。 一个 or 过滤器只接受准备好的文件(可能基于已知后缀),用 default 组成,允许这种情况。 启用合成,如下例所示:AcceptOnceFileListFilterfilename-patternfilename-regexAcceptOnceFileListFilterCompositeFileListFilterspring-doc.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="compositeFilter"/>

<bean id="compositeFilter"
    class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
            <bean class="o.s.i.file.filters.RegexPatternFileListFilter">
                <constructor-arg value="^test.*$"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

如果无法使用临时名称创建文件并重命名为最终名称,则 Spring 集成提供了另一种选择。 版本 4.2 添加了 . 可以使用属性配置此过滤器,以便过滤器仅传递早于此值的文件。 该年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早获取文件(例如,由于网络故障)。 以下示例显示如何配置 :LastModifiedFileListFilterageLastModifiedFileListFilterspring-doc.cn

<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
    <property name="age" value="120" />
</bean>

从版本 4.3.7 开始,引入了 a (扩展 ) 以允许后续过滤器只能看到前一个过滤器的结果。 (使用 ,所有过滤器都会看到所有文件,但它只传递已通过所有过滤器的文件)。 需要新行为的一个示例是 和 的组合,当我们在经过一段时间之前不希望接受文件时。 使用 ,由于 it 在第一次传递时看到所有文件,因此当另一个过滤器稍后传递时,它不会传递它。 当模式筛选器与自定义筛选器结合使用时,该方法非常有用,自定义筛选器查找辅助文件以指示文件传输已完成。 模式过滤器可能只传递主文件(例如 ),但 “done” 过滤器需要查看(例如)是否存在。ChainFileListFilterCompositeFileListFilterCompositeFileListFilterLastModifiedFileListFilterAcceptOnceFileListFilterCompositeFileListFilterAcceptOnceFileListFilterCompositeFileListFiltersomething.txtsomething.donespring-doc.cn

假设我们有文件 、 、 和 .a.txta.doneb.txtspring-doc.cn

模式过滤器仅传递 和 ,而 “done” 过滤器看到所有三个文件并仅传递 。 复合筛选器的最终结果是 only 被释放。a.txtb.txta.txta.txtspring-doc.cn

使用 ,如果链中的任何过滤器返回空列表,则不会调用其余过滤器。ChainFileListFilter

版本 5.0 引入了对文件执行 SPEL 表达式作为上下文评估根对象。 为此,所有用于文件处理的 XML 组件(本地和远程)以及现有属性都提供了该选项,如下例所示:ExpressionFileListFilterfilterfilter-expressionspring-doc.cn

<int-file:inbound-channel-adapter
        directory="${inputdir}"
        filter-expression="name matches '.text'"
        auto-startup="false"/>

版本 5.0.5 引入了对被拒绝文件感兴趣的实现。 为此,应通过 . 在框架中,此功能从 中与 结合使用。 与常规 不同,它根据目标文件系统上的事件提供用于处理的文件。 在用这些文件轮询内部队列时,可能会丢弃它们,因为它们相对于其配置的太年轻。 因此,我们将丢失该文件以备将来可能的考虑。 discard 回调钩子允许我们将文件保留在内部队列中,以便在后续轮询中可以对其进行检查。 它还实现了一个 并填充了一个 discard 回调到其所有委托。DiscardAwareFileListFilteraddDiscardCallback(Consumer<File>)FileReadingMessageSource.WatchServiceDirectoryScannerLastModifiedFileListFilterDirectoryScannerWatchServiceLastModifiedFileListFilterageageCompositeFileListFilterDiscardAwareFileListFilterDiscardAwareFileListFilterspring-doc.cn

由于文件与所有委托匹配,因此可以对同一文件多次调用 。CompositeFileListFilterdiscardCallback

从版本 5.1 开始,它不会检查目录是否存在,并且在调用它之前不会创建它(通常通过 wrapping )。 以前,在引用目录时(例如,从测试中引用)或稍后应用权限时,没有简单的方法来防止操作系统权限错误。FileReadingMessageSourcestart()SourcePollingChannelAdapterspring-doc.cn

消息报头

从版本 5.0 开始, (除了 as a polled ) 将以下标头填充到 outbound :FileReadingMessageSourcepayloadFileMessagespring-doc.cn

  • FileHeaders.FILENAME:要发送的文件。 可用于后续的重命名或复制逻辑。File.getName()spring-doc.cn

  • FileHeaders.ORIGINAL_FILE:对象本身。 通常,当我们丢失原始对象时,框架组件(例如拆分器转换器)会自动填充此标头。 但是,为了与任何其他自定义用例保持一致和方便,此标头可用于访问原始文件。FileFilespring-doc.cn

  • FileHeaders.RELATIVE_PATH:引入了一个新的标头,用于表示文件路径相对于扫描的根目录的部分。 当要求在其他地方恢复源目录层次结构时,此标头可能很有用。 为此,可以将 (请参阅“'生成文件名) 配置为使用此标头。DefaultFileNameGeneratorspring-doc.cn

目录扫描和轮询

不会立即为目录中的文件生成消息。 它使用内部队列来存储由 . 该选项用于确保在每次轮询时使用最新的输入目录内容刷新内部队列。 默认情况下 () ,它会在再次扫描目录之前清空其队列。 此默认行为对于减少对目录中大量文件的扫描特别有用。 但是,在需要自定义排序的情况下,请务必考虑将此标志设置为 的效果。 文件的处理顺序可能与预期不符。 默认情况下,队列中的文件按其自然 () 顺序处理。 通过扫描添加的新文件,即使队列中已有文件,也会入到适当的位置,以保持该自然顺序。 要自定义顺序,可以接受 a 作为构造函数参数。 内部 () 使用它根据业务要求对其内容重新排序。 因此,要按特定顺序处理文件,您应该提供一个比较器,而不是对自定义生成的列表进行排序。FileReadingMessageSourcescannerscanEachPollscanEachPoll = falseFileReadingMessageSourcetruepathFileReadingMessageSourceComparator<File>PriorityBlockingQueueFileReadingMessageSourceDirectoryScannerspring-doc.cn

引入了版本 5.0 以执行文件树访问。 实现基于功能。 根目录 () 参数将从结果中排除。 所有其他子目录包含和排除都基于目标实施。 例如,默认情况下会筛选出目录。 有关更多信息,请参见AbstractDirectoryAwareFileListFilter及其实现。RecursiveDirectoryScannerFiles.walk(Path start, int maxDepth, FileVisitOption…​ options)DirectoryScanner.listFiles(File)FileListFilterSimplePatternFileListFilterspring-doc.cn

从版本 5.5 开始,Java DSL 有一个方便的选项,可以在目标中使用 a 而不是默认选项。FileInboundChannelAdapterSpecrecursive(boolean)RecursiveDirectoryScannerFileReadingMessageSource

命名空间支持

通过使用特定于文件的命名空间,可以简化文件读取的配置。 为此,请使用以下模板:spring-doc.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-file="http://www.springframework.org/schema/integration/file"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>

在此命名空间中,你可以减少并将其包装在入站 Channel Adapter 中,如下所示:FileReadingMessageSourcespring-doc.cn

<int-file:inbound-channel-adapter id="filesIn1"
    directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>

<int-file:inbound-channel-adapter id="filesIn2"
    directory="file:${input.directory}"
    filter="customFilterBean" />

<int-file:inbound-channel-adapter id="filesIn3"
    directory="file:${input.directory}"
    filename-pattern="test*" />

<int-file:inbound-channel-adapter id="filesIn4"
    directory="file:${input.directory}"
    filename-regex="test[0-9]+\.txt" />

第一个 channel adapter 示例依赖于默认实现:FileListFilterspring-doc.cn

因此,您也可以省略 and 属性,因为它们默认是 and 属性。prevent-duplicatesignore-hiddentruespring-doc.cn

Spring Integration 4.2 引入了该属性。 在以前的版本中,包含隐藏文件。ignore-hiddenspring-doc.cn

第二个通道适配器示例使用自定义过滤器,第三个示例使用属性添加基于模式的过滤器,第四个示例使用该属性将基于正则表达式模式的过滤器添加到 . 和 属性都与常规 reference 属性互斥。 但是,您可以使用该属性来引用 的实例,该实例组合了任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。filename-patternAntPathMatcherfilename-regexFileReadingMessageSourcefilename-patternfilename-regexfilterfilterCompositeFileListFilterspring-doc.cn

当多个进程从同一目录读取数据时,您可能希望锁定文件以防止它们被并发选取。 为此,您可以使用 . 有一个基于 的实现可用,但也可以实现您自己的锁定方案。 储物柜可以按如下方式注入:FileLockerjava.nioniospring-doc.cn

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:nio-locker/>
</int-file:inbound-channel-adapter>

您可以按如下方式配置自定义保险箱:spring-doc.cn

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了locker时,它负责在允许接收文件之前获取锁。 它不承担解锁文件的责任。 如果已处理文件并保持锁挂起,则存在内存泄漏。 如果这是一个问题,您应该在适当的时候打电话给自己。FileLocker.unlock(File file)

当筛选和锁定文件还不够时,您可能需要完全控制文件的列出方式。 要实现此类要求,您可以使用 . 此扫描程序可让您准确确定每个轮询中列出的文件。 这也是 Spring 集成在内部用来连接实例和 . 您可以将自定义注入 on 属性,如下例所示:DirectoryScannerFileListFilterFileLockerFileReadingMessageSourceDirectoryScanner<int-file:inbound-channel-adapter/>scannerspring-doc.cn

<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
     scanner="customDirectoryScanner"/>

这样做可以让您完全自由地选择排序、列表和锁定策略。spring-doc.cn

了解过滤器(包括 、 、 、 等)和实例实际上由 . 在适配器上设置的这些属性中的任何一个随后都会注入到内部 . 对于 external ,禁止在 . 必须在该自定义上指定它们(如果需要)。 换句话说,如果将 a 注入到 中,则应在 上提供 ,而不是在 上。patternsregexprevent-duplicateslockerscannerscannerscannerFileReadingMessageSourceDirectoryScannerscannerFileReadingMessageSourcefilterlockerscannerFileReadingMessageSourcespring-doc.cn

默认情况下,使用 an 和 . 要防止使用它们,您可以配置自己的过滤器(如 ),甚至将其设置为 。DefaultDirectoryScannerIgnoreHiddenFileListFilterAcceptOnceFileListFilterAcceptAllFileListFilternull

WatchServiceDirectoryScanner

当新文件添加到目录时,它依赖于文件系统事件。 在初始化期间,将注册目录以生成事件。 初始文件列表也是在初始化期间构建的。 在遍历目录树时,遇到的任何子目录也会被注册以生成事件。 在第一次轮询时,将返回遍历目录的初始文件列表。 在后续轮询中,将返回来自新创建事件的文件。 如果添加了新的子目录,则其 creation 事件用于遍历新子树以查找现有文件并注册找到的任何新子目录。FileReadingMessageSource.WatchServiceDirectoryScannerspring-doc.cn

当其内部事件没有在目录修改事件发生时,程序不会尽快耗尽其内部事件,则存在问题。 如果超过队列大小,则发出 a 以指示某些文件系统事件可能会丢失。 在这种情况下,将完全重新扫描根目录。 为避免重复,请考虑使用适当的 (如 ) 或在处理完成时删除文件。WatchKeyqueueStandardWatchEventKinds.OVERFLOWFileListFilterAcceptOnceFileListFilter

可以通过 option 启用 ,该选项与 option 互斥。 为提供的 .WatchServiceDirectoryScannerFileReadingMessageSource.use-watch-servicescannerFileReadingMessageSource.WatchServiceDirectoryScannerdirectoryspring-doc.cn

此外,现在轮询逻辑可以跟踪 和 。WatchServiceStandardWatchEventKinds.ENTRY_MODIFYStandardWatchEventKinds.ENTRY_DELETEspring-doc.cn

如果需要跟踪现有文件和新文件的修改,则应在 . 否则,将以相同的方式处理这些事件中的文件。ENTRY_MODIFYFileListFilterspring-doc.cn

implementations 选取事件。 因此,将为操作提供其文件。 启用此事件后,过滤器(如 )将删除该文件。 因此,如果出现同名文件,它将通过过滤器并作为消息发送。ResettableFileListFilterENTRY_DELETEremove()AcceptOnceFileListFilterspring-doc.cn

为此,引入了属性 ()。 ( 是 中的公共内部枚举。 使用此选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他一些 logic 。 以下示例显示了如何为同一目录中的 create 和 modify 事件配置不同的逻辑:watch-eventsFileReadingMessageSource.setWatchEvents(WatchEventType…​ watchEvents)WatchEventTypeFileReadingMessageSourcespring-doc.cn

值得一提的是,该事件涉及到被监视目录的 sub-directory 的 rename 操作。 更具体地说,与先前目录名称相关的 event 位于通知新(重命名)目录的 event 之前。 在某些操作系统(如 Windows)上,必须注册事件才能处理这种情况。 否则,在文件资源管理器中重命名监视的子目录可能会导致在该子目录中无法检测到新文件。ENTRY_DELETEENTRY_DELETEENTRY_CREATEENTRY_DELETEspring-doc.cn

<int-file:inbound-channel-adapter id="newFiles"
     directory="${input.directory}"
     use-watch-service="true"/>

<int-file:inbound-channel-adapter id="modifiedFiles"
     directory="${input.directory}"
     use-watch-service="true"
     filter="acceptAllFilter"
     watch-events="MODIFY"/> <!-- The default is CREATE. -->

限制内存消耗

您可以使用 a 来限制内存中保留的文件数。 这在扫描大型目录时非常有用。 使用 XML 配置时,可以通过在入站通道适配器上设置属性来启用此功能。HeadDirectoryScannerqueue-sizespring-doc.cn

在版本 4.2 之前,此设置与任何其他过滤器的使用不兼容。 任何其他过滤器(包括 )都会覆盖用于限制大小的过滤器。prevent-duplicates="true"spring-doc.cn

使用 a 与 不兼容。 由于在轮询决策期间会查询所有筛选器,因此 不知道其他筛选器可能正在临时筛选文件。 即使以前由 筛选的文件现在可用,也会对其进行筛选。HeadDirectoryScannerAcceptOnceFileListFilterAcceptOnceFileListFilterHeadDirectoryScanner.HeadFilterAcceptOnceFileListFilterspring-doc.cn

通常,在这种情况下,您应该删除已处理的文件,以便以前过滤的文件在将来的轮询中可用,而不是使用 an。AcceptOnceFileListFilterspring-doc.cn

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:spring-doc.cn

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
         FileReadingMessageSource source = new FileReadingMessageSource();
         source.setDirectory(new File(INBOUND_PATH));
         source.setFilter(new SimplePatternFileListFilter("*.txt"));
         return source;
    }

    @Bean
    @Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
    public FileToStringTransformer fileToStringTransformer() {
        return new FileToStringTransformer();
    }

}

使用 Java DSL 进行配置

Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:spring-doc.cn

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileReadingFlow() {
         return IntegrationFlow
                  .from(Files.inboundAdapter(new File(INBOUND_PATH))
                              .patternFilter("*.txt"),
                          e -> e.poller(Pollers.fixedDelay(1000)))
                  .transform(Files.toStringTransformer())
                  .channel("processFileChannel")
                  .get();
    }

}

'tail'ing 文件

另一个常见的用例是从文件的末尾(或尾部)获取 'lines',并在添加新行时捕获新行。 提供了两种实现。 第一个 , 使用 native 命令 (在具有 native 命令的 os 上)。 这通常是这些平台上最有效的实现。 对于没有命令的操作系统,第二个实现 使用 Apache 类。OSDelegatingFileTailingMessageProducertailtailApacheCommonsFileTailingMessageProducercommons-ioTailerspring-doc.cn

在这两种情况下,文件系统事件(例如文件不可用和其他事件)都使用正常的 Spring 事件发布机制作为实例发布。 此类事件的示例包括:ApplicationEventspring-doc.cn

[message=tail: cannot open '/tmp/somefile' for reading:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become inaccessible:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has appeared;
               following end of new file, file=/tmp/somefile]

例如,在旋转文件时,可能会发生前面示例中显示的事件序列。spring-doc.cn

从版本 5.0 开始,在 期间,当文件中没有数据时,会发出 a 。 以下示例显示了此类事件的外观:FileTailingIdleEventidleEventIntervalspring-doc.cn

[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持命令的平台都提供这些状态消息。tail

从这些终端节点发出的消息具有以下标头:spring-doc.cn

在版本 5.0 之前的版本中,标头包含文件绝对路径的字符串表示形式。 现在,您可以通过调用原始文件头来获取该字符串表示形式。FileHeaders.FILENAMEgetAbsolutePath()

以下示例使用默认选项 ('-F -n 0',表示从当前端开始遵循文件名) 创建本机适配器。spring-doc.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	task-executor="exec"
	file="/tmp/foo"/>

以下示例使用 '-F -n +0' 选项(表示遵循文件名,发出所有现有行)创建本机适配器。spring-doc.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	native-options="-F -n +0"
	task-executor="exec"
	file-delay=10000
	file="/tmp/foo"/>

如果命令失败(在某些平台上,缺少文件会导致失败,即使指定了命令),则每 10 秒重试一次该命令。tailtail-Fspring-doc.cn

默认情况下,本机适配器从标准输出捕获并将内容作为消息发送。 他们还从标准错误中捕获以引发事件。 从版本 4.3.6 开始,您可以通过将 设置为 ,从而丢弃标准错误事件,如下例所示:enable-status-readerfalsespring-doc.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	enable-status-reader="false"
	task-executor="exec"
	file="/tmp/foo"/>

在以下示例中,设置为 ,这意味着,如果 5 秒内未写入任何行,则每 5 秒触发一次:IdleEventInterval5000FileTailingIdleEventspring-doc.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	idle-event-interval="5000"
	task-executor="exec"
	file="/tmp/somefile"/>

当您需要停止适配器时,这可能很有用。spring-doc.cn

下面的示例创建一个 Apache 适配器,该适配器每 2 秒检查一次文件是否有新行,并每 10 秒检查一次是否存在缺失的文件:commons-ioTailerspring-doc.cn

<int-file:tail-inbound-channel-adapter id="apache"
	channel="input"
	task-executor="exec"
	file="/tmp/bar"
	delay="2000"
	end="false"             (1)
	reopen="true"           (2)
	file-delay="10000"/>
1 文件从开头 () 而不是结尾(这是默认值)开始拖尾。end="false"
2 将为每个块重新打开文件(默认为保持文件打开)。
指定 、 或 属性会强制使用 Apache 适配器,并使该属性不可用。delayendreopencommons-ionative-options

处理不完整的数据

文件传输方案中的一个常见问题是如何确定传输已完成,以便不会开始读取不完整的文件。 解决此问题的一种常见方法是使用临时名称编写文件,然后以原子方式将其重命名为最终名称。 此技术与遮盖临时文件不被使用者选取的过滤器一起,提供了一个强大的解决方案。 这种技术被写入文件(本地或远程)的 Spring 集成组件使用。 默认情况下,它们会附加到文件名,并在传输完成后将其删除。.writingspring-doc.cn

另一种常见的技术是编写第二个 “marker” 文件以指示文件传输已完成。 在这种情况下,您不应考虑 (例如) 在 也存在之前可供使用。 Spring 集成版本 5.0 引入了新的过滤器来支持这种机制。 为文件系统 ()、FTPSFTP 提供了实现。 它们是可配置的,因此标记文件可以具有任何名称,尽管它通常与正在传输的文件相关。 有关更多信息,请参阅 Javadocsomefile.txtsomefile.txt.completeFileSystemMarkerFilePresentFileListFilterspring-doc.cn

写入文件

要将消息写入文件系统,可以使用 FileWritingMessageHandler。 此类可以处理以下有效负载类型:spring-doc.cn

对于 String 负载,您可以配置编码和 charset。spring-doc.cn

为了简化操作,您可以使用 XML 名称空间将 作为出站通道适配器或出站网关的一部分进行配置。FileWritingMessageHandlerspring-doc.cn

从版本 4.3 开始,您可以指定写入文件时要使用的缓冲区大小。spring-doc.cn

从版本 5.1 开始,您可以提供一个 如果使用 or 并且必须创建新文件,则会触发该 URL。 此回调接收新创建的文件以及触发该文件的消息。 例如,此回调可用于编写消息标头中定义的 CSV 标头。BiConsumer<File, Message<?>>newFileCallbackFileExistsMode.APPENDFileExistsMode.APPEND_NO_FLUSHspring-doc.cn

生成文件名

在最简单的形式中,只需要一个用于写入文件的目标目录。 要写入的文件的名称由处理程序的 FileNameGenerator 确定。 默认实现查找其键与定义为 FileHeaders.FILENAME 的常量匹配的消息标头。FileWritingMessageHandlerspring-doc.cn

或者,您可以指定要根据消息计算的表达式以生成文件名 — 例如 . 表达式的计算结果必须为 . 为方便起见,还提供了该方法,允许您显式指定其值将用作文件名的消息头。headers['myCustomHeader'] + '.something'StringDefaultFileNameGeneratorsetHeaderNamespring-doc.cn

设置完成后,将采用以下解析步骤来确定给定消息有效负载的文件名:DefaultFileNameGeneratorspring-doc.cn

  1. 根据消息评估表达式,如果结果是 非空 ,则将其用作文件名。Stringspring-doc.cn

  2. 否则,如果有效负载是 a ,请使用对象的文件名。java.io.FileFilespring-doc.cn

  3. 否则,请使用附加了 .作为文件名。msgspring-doc.cn

当您使用 XML 名称空间支持时,文件出站通道适配器和文件出站网关都支持以下互斥的配置属性:spring-doc.cn

  • filename-generator(对 implementation 的引用)FileNameGeneratorspring-doc.cn

  • filename-generator-expression(计算结果为String)spring-doc.cn

写入文件时,将使用临时文件后缀(其默认值为 )。 在写入文件时,它会附加到文件名中。 要自定义后缀,您可以在文件出站通道适配器和文件出站网关上设置该属性。.writingtemporary-file-suffixspring-doc.cn

使用 file 时,将忽略该属性,因为数据会直接附加到文件中。APPENDmodetemporary-file-suffix

从版本 4.2.5 开始,生成的文件名(作为 result of 或 evaluation)可以与目标文件名一起表示子路径。 它像以前一样用作第二个构造函数参数。 但是,在过去,我们没有为子路径创建 () 目录,只假设文件名。 当我们需要恢复文件系统树以匹配源目录时,这种方法非常有用,例如,解压缩存档并按原始顺序保存目标目录中的所有文件时。filename-generatorfilename-generator-expressionFile(File parent, String child)mkdirs()spring-doc.cn

指定输出目录

文件出站通道适配器和文件出站网关都提供了两个互斥的配置属性,用于指定输出目录:spring-doc.cn

Spring Integration 2.2 引入了该属性。directory-expression
使用属性directory

使用该属性时,输出目录将设置为固定值,该值在初始化时设置。 如果未指定此属性,则必须使用该属性。directoryFileWritingMessageHandlerdirectory-expressionspring-doc.cn

使用属性directory-expression

如果你想获得完整的 SPEL 支持,你可以使用该属性。 此属性接受一个 SPEL 表达式,该表达式针对正在处理的每条消息进行评估。 因此,当您动态指定输出文件目录时,您对消息的有效负载及其标头具有完全访问权限。directory-expressionspring-doc.cn

SPEL 表达式必须解析为 a 、 或 。 (后者无论如何都会被评估为 a。 此外,生成的 or 必须指向目录。 如果未指定 attribute,则必须设置 attribute。Stringjava.io.Fileorg.springframework.core.io.ResourceFileStringFiledirectory-expressiondirectoryspring-doc.cn

使用属性auto-create-directory

默认情况下,如果目标目录不存在,则会自动创建相应的目标目录和任何不存在的父目录。 要防止这种行为,可以将该属性设置为 。 此属性适用于 和 属性。auto-create-directoryfalsedirectorydirectory-expressionspring-doc.cn

使用属性 and is 时,从 Spring Integration 2.2 开始进行了以下更改:directoryauto-create-directoryfalsespring-doc.cn

现在,对正在处理的每条消息执行此检查,而不是在初始化适配器时检查目标目录是否存在。spring-doc.cn

此外,如果 is 且在消息处理之间删除了目录,则会为正在处理的每条消息重新创建目录。auto-create-directorytruespring-doc.cn

处理现有目标文件

当您写入文件并且目标文件已存在时,默认行为是覆盖该目标文件。 您可以通过在相关文件出站组件上设置属性来更改此行为。 存在以下选项:modespring-doc.cn

Spring Integration 2.2 引入了属性以及 、 和 选项。modeAPPENDFAILIGNORE
REPLACE

如果目标文件已存在,则将其覆盖。 如果未指定该属性,则这是写入文件时的默认行为。modespring-doc.cn

REPLACE_IF_MODIFIED

如果目标文件已存在,则仅当上次修改的时间戳与源文件的时间戳不同时,才会覆盖该目标文件。 对于有效负载,将有效负载时间与现有文件进行比较。 对于其他有效负载,将 () 标头与现有文件进行比较。 如果标头缺失或具有非 a 值,则始终会替换该文件。FilelastModifiedFileHeaders.SET_MODIFIEDfile_setModifiedNumberspring-doc.cn

APPEND

此模式允许您将消息内容附加到现有文件,而不是每次都创建新文件。 请注意,此属性与该属性是互斥的,因为当它将内容附加到现有文件时,适配器不再使用临时文件。 文件在每条消息后关闭。temporary-file-suffixspring-doc.cn

APPEND_NO_FLUSH

此选项的语义与 相同,但不会刷新数据,也不会在每条消息后关闭文件。 这可以提供显著的性能,但在发生故障时有丢失数据的风险。 有关更多信息,请参阅使用 APPEND_NO_FLUSH 时刷新文件APPENDspring-doc.cn

FAIL

如果目标文件存在,则引发 MessageHandlingExceptionspring-doc.cn

IGNORE

如果目标文件存在,则消息负载将被静默忽略。spring-doc.cn

使用临时文件后缀(默认值为 )时,如果最终文件名或临时文件名存在,则此选项适用。.writingIGNORE

使用 时刷新文件APPEND_NO_FLUSH

该模式是在版本 4.3 中添加的。 使用它可以提高性能,因为文件不会在每条消息后关闭。 但是,如果发生故障,这可能会导致数据丢失。APPEND_NO_FLUSHspring-doc.cn

Spring 集成提供了几种刷新策略来减轻这种数据丢失:spring-doc.cn

  • 用。 如果文件在这段时间内未写入,则会自动刷新该文件。 这是近似值,可能到这个时间为止(平均值为 )。flushInterval1.33x1.167xspring-doc.cn

  • 将包含正则表达式的消息发送到消息处理程序的方法。 具有与模式匹配的绝对路径名的文件将被刷新。triggerspring-doc.cn

  • 为处理程序提供自定义实现,以修改将消息发送到该方法时执行的操作。MessageFlushPredicatetriggerspring-doc.cn

  • 通过传入 custom 或 implementation 来调用处理程序的方法之一。flushIfNeededFileWritingMessageHandler.FlushPredicateFileWritingMessageHandler.MessageFlushPredicatespring-doc.cn

将为每个打开的文件调用谓词。 有关更多信息,请参阅这些接口的 Javadoc。 请注意,从版本 5.0 开始,谓词方法提供了另一个参数:当前文件的首次写入时间(如果是新的或以前的关闭)。spring-doc.cn

使用 时,间隔从最后一次写入开始。 仅当文件在时间间隔内处于空闲状态时,才会刷新该文件。 从版本 4.3.7 开始,可以将附加属性 () 设置为 ,这意味着间隔从第一次写入以前刷新(或新)文件开始。flushIntervalflushWhenIdlefalsespring-doc.cn

文件时间戳

默认情况下,目标文件的时间戳是创建文件的时间(就地重命名保留当前时间戳除外)。 从版本 4.3 开始,您现在可以配置 (或使用 Java 配置时)。 对于有效负载,这会将时间戳从入站文件传输到出站文件(无论是否需要副本)。 对于其他负载,如果存在标头 (),则只要标头是 .lastModifiedpreserve-timestampsetPreserveTimestamp(true)FileFileHeaders.SET_MODIFIEDfile_setModifiedlastModifiedNumberspring-doc.cn

文件权限

从版本 5.0 开始,在将文件写入支持 Posix 权限的文件系统时,您可以在出站通道适配器或网关上指定这些权限。 该属性是一个整数,通常以熟悉的八进制格式提供,例如,表示所有者具有读/写权限,组具有只读权限,而其他人没有访问权限。0640spring-doc.cn

文件出站通道适配器

下面的示例配置文件出站通道适配器:spring-doc.cn

<int-file:outbound-channel-adapter id="filesOut" directory="${input.directory.property}"/>

基于命名空间的配置还支持属性。 如果设置为 ,则会在写入目标后触发删除原始源文件。 该标志的默认值为 . 以下示例显示如何将其设置为 :delete-source-filestruefalsetruespring-doc.cn

<int-file:outbound-channel-adapter id="filesOut"
    directory="${output.directory}"
    delete-source-files="true"/>
仅当入站消息具有有效负载,或者标头值包含源实例或表示原始文件路径时,该属性才有效。delete-source-filesFileFileHeaders.ORIGINAL_FILEFileString

从版本 4.2 开始,支持 an 选项。 如果设置为 ,则在写入消息后,会向文件附加一个新行。 默认属性值为 . 以下示例演示如何使用该选项:FileWritingMessageHandlerappend-new-linetruefalseappend-new-linespring-doc.cn

<int-file:outbound-channel-adapter id="newlineAdapter"
	append-new-line="true"
    directory="${output.directory}"/>

出站网关

如果要继续根据写入的文件处理消息,可以改用 。 它的作用类似于 . 但是,在写入文件后,它还会将其作为消息的有效负载发送到回复通道。outbound-gatewayoutbound-channel-adapterspring-doc.cn

以下示例配置出站网关:spring-doc.cn

<int-file:outbound-gateway id="mover" request-channel="moveInput"
    reply-channel="output"
    directory="${output.directory}"
    mode="REPLACE" delete-source-files="true"/>

如前所述,您还可以指定 attribute,该属性定义如何处理目标文件已存在的情况的行为。 有关更多详细信息,请参阅 Dealing with Existing Destination Files。 通常,在使用文件出站网关时,结果文件将作为回复通道上的消息负载返回。modespring-doc.cn

这在指定模式时也适用。 在这种情况下,将返回预先存在的目标文件。 如果请求消息的负载是一个文件,您仍然可以通过消息标头访问该原始文件。 请参阅 FileHeaders.ORIGINAL_FILEIGNOREspring-doc.cn

“outbound-gateway” 在您希望首先移动文件,然后通过处理管道发送文件的情况下效果很好。 在这种情况下,您可以将文件命名空间的元素连接到 ,然后将该网关的元素连接到管道的开头。inbound-channel-adapteroutbound-gatewayreply-channel

如果您有更详细的要求或需要支持其他有效负载类型作为要转换为文件内容的输入,则可以扩展 ,但更好的选择是依赖 TransformerFileWritingMessageHandlerspring-doc.cn

使用 Java 配置进行配置

Spring 下面的 Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:spring-doc.cn

@SpringBootApplication
@IntegrationComponentScan
public class FileWritingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                      new SpringApplicationBuilder(FileWritingJavaApplication.class)
                              .web(false)
                              .run(args);
             MyGateway gateway = context.getBean(MyGateway.class);
             gateway.writeToFile("foo.txt", new File(tmpDir.getRoot(), "fileWritingFlow"), "foo");
    }

    @Bean
    @ServiceActivator(inputChannel = "writeToFileChannel")
    public MessageHandler fileWritingMessageHandler() {
         Expression directoryExpression = new SpelExpressionParser().parseExpression("headers.directory");
         FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
         handler.setFileExistsMode(FileExistsMode.APPEND);
         return handler;
    }

    @MessagingGateway(defaultRequestChannel = "writeToFileChannel")
    public interface MyGateway {

        void writeToFile(@Header(FileHeaders.FILENAME) String fileName,
                       @Header(FileHeaders.FILENAME) File directory, String data);

    }
}

使用 Java DSL 进行配置

Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:spring-doc.cn

@SpringBootApplication
public class FileWritingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                 new SpringApplicationBuilder(FileWritingJavaApplication.class)
                         .web(false)
                         .run(args);
        MessageChannel fileWritingInput = context.getBean("fileWritingInput", MessageChannel.class);
        fileWritingInput.send(new GenericMessage<>("foo"));
    }

    @Bean
   	public IntegrationFlow fileWritingFlow() {
   	    return IntegrationFlow.from("fileWritingInput")
   		        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.txt")
   		                  .header("directory", new File(tmpDir.getRoot(), "fileWritingFlow")))
   	            .handle(Files.outboundGateway(m -> m.getHeaders().get("directory")))
   	            .channel(MessageChannels.queue("fileWritingResultChannel"))
   	            .get();
    }

}

文件转换器

要将从文件系统读取的数据转换为对象,反之亦然,您需要做一些工作。 与 不同 和 在较小程度上 ,您可能需要自己的机制来完成工作。 为此,您可以实现 interface. 或者,您可以扩展 for inbound messages. Spring 集成提供了一些明显的实现。FileReadingMessageSourceFileWritingMessageHandlerTransformerAbstractFilePayloadTransformerspring-doc.cn

请参阅 Transformer 接口的 Javadoc 以查看哪些 Spring 集成类实现了它。 同样,你可以检查AbstractFilePayloadTransformer类的 Javadoc 以查看哪些 Spring 集成类扩展了它。spring-doc.cn

FileToByteArrayTransformer使用 Spring 的 . 通常,使用一系列 transformer 比将所有转换放在一个类中要好。 在这种情况下,to 转换可能是合乎逻辑的第一步。AbstractFilePayloadTransformerFilebyte[]FileCopyUtilsFilebyte[]spring-doc.cn

FileToStringTransformerextends 将对象转换为 . 如果不出意外,这对调试很有用(考虑将其与 wire tap 一起使用)。AbstractFilePayloadTransformerFileStringspring-doc.cn

要配置特定于文件的转换器,可以使用 file 命名空间中的相应元素,如下例所示:spring-doc.cn

<int-file:file-to-bytes-transformer  input-channel="input" output-channel="output"
    delete-files="true"/>

<int-file:file-to-string-transformer input-channel="input" output-channel="output"
    delete-files="true" charset="UTF-8"/>

该选项向转换器发出信号,指示它应在转换完成后删除入站文件。 这绝不能替代在多线程环境中使用时(例如,当您通常使用 Spring Integration时)。delete-filesAcceptOnceFileListFilterFileReadingMessageSourcespring-doc.cn

文件拆分器

它是在版本 4.1.2 中添加的,其命名空间支持是在版本 4.2 中添加的。 根据 将文本文件拆分为单独的行。 默认情况下,拆分器使用 an 在从文件中读取行时一次发出一行。 将属性设置为 to 会导致它在将所有行作为消息发出之前将它们读入内存。 一个用例可能是,您希望在发送任何包含行的消息之前检测文件上的 I/O 错误。 但是,它仅适用于相对较短的文件。FileSplitterFileSplitterBufferedReader.readLine()Iteratoriteratorfalsespring-doc.cn

入站负载可以是 、 (路径)、 或 。 其他负载类型保持不变。FileStringFileInputStreamReaderspring-doc.cn

以下清单显示了配置 :FileSplitterspring-doc.cn

Java DSL
@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
Kotlin DSL
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
Java
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
XML 格式
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 splitter 的 bean 名称。
2 设置为(默认值)以使用迭代器或在发送行之前将文件加载到内存中。truefalse
3 设置为 to 可在文件数据之前和之后发出文件开始和文件结束标记消息。 标记是具有有效负载(属性中包含 and 值)的消息。 在筛选某些行的下游流中按顺序处理文件时,可以使用标记。 它们使下游处理能够知道文件何时被完全处理。 此外,包含这些消息或添加到这些消息的标头。 该标记包括行数。 如果文件为空,则仅发出 和 标记,并将其作为 . 默认值为 . 默认情况下,, 是 。 另请参阅 (next 属性)。trueFileSplitter.FileMarkerSTARTENDmarkfile_markerSTARTENDENDSTARTEND0lineCountfalsetrueapply-sequencefalsemarkers-json
4 当为 true 时,将此项设置为将对象转换为 JSON 字符串。 (使用 under)。markerstrueFileMarkerSimpleJsonSerializer
5 设置为 可禁用在邮件中包含 和 标头。 默认值为 ,除非为 。 当 和 is 时,标记包含在排序中。 当 和 is 时,标头设置为 ,因为大小未知。falsesequenceSizesequenceNumbertruemarkerstruetruemarkerstruetrueiteratortruesequenceSize0
6 设置为 to 会导致在文件中没有行时引发 a。 默认值为 .trueRequiresReplyExceptionfalse
7 设置将文本数据读取到有效负载时要使用的字符集名称。 默认值为 platform charset。String
8 在为其余行发出的消息中作为标题携带的第一行的标题名称。 从 5.0 版本开始。
9 设置用于将消息发送到拆分器的输入通道。
10 设置将消息发送到的输出通道。
11 设置发送超时。 仅当 can 阻止时适用 — 例如完整的 .output-channelQueueChannel
12 设置为 to 以禁用在刷新上下文时自动启动拆分器。 默认值为 .falsetrue
13 如果 是 ,则设置此端点的顺序。input-channel<publish-subscribe-channel/>
14 设置分流器的启动阶段(在 is 时使用)。auto-startuptrue

还会将任何基于文本的内容拆分为多行。 从版本 4.3 开始,当与 FTP 或 SFTP 流入站通道适配器或者使用该选项检索文件的 FTP 或 SFTP 出站网关结合使用时,拆分器会在文件完全使用时自动关闭支持该流的会话 有关这些工具的更多信息,请参阅 FTP Streaming Inbound Channel AdapterSFTP Streaming Inbound Channel Adapter 以及 FTP Outbound GatewaySFTP Outbound GatewayFileSplitterInputStreamstreamspring-doc.cn

使用 Java 配置时,可以使用其他构造函数,如下例所示:spring-doc.cn

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

当为 true 时,标记表示为 JSON 字符串(使用 a )。markersJsonSimpleJsonSerializerspring-doc.cn

版本 5.0 引入了一个选项,用于指定内容的第一行是标题(例如 CSV 文件中的列名称)。 传递给此属性的参数是标头名称,在该名称下,第一行作为其余行发出的消息中的标头。 此行不包含在序列标题中(如果为 true),也不包含在与 关联的 中。 注意:从版本 5.5 开始,lineCount' 也作为消息的 into 标头包含在内,因为可以序列化为 JSON。 如果文件仅包含 header 行,则该文件将被视为空,因此,在拆分期间仅发出实例(如果启用了标记 — 否则,不会发出任何消息)。 默认情况下(如果未设置标头名称),第一行被视为 data 并成为第一个发出的消息的有效负载。firstLineAsHeaderapplySequencelineCountFileMarker.ENDFileHeaders.LINE_COUNTFileMarker.ENDFileMarkerFileMarkerspring-doc.cn

如果您需要有关从文件内容中提取标头的更复杂的逻辑(不是第一行、不是行的整个内容、不是一个特定的标头等),请考虑在 . 请注意,已移动到标题的行可能会从正常内容流程的下游进行筛选。FileSplitterspring-doc.cn

幂等下游处理拆分文件

当 为 true 时,拆分器会在标题中添加行号(当为 true 时,标记计为行)。 该行号可与幂等接收器一起使用,以避免在重新启动后重新处理行。apply-sequenceSEQUENCE_NUMBERmarkersspring-doc.cn

例如:spring-doc.cn

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}

文件聚合器

从版本 5.5 开始,引入了 a 以在启用 START/END 标记时覆盖用例的另一端。 为方便起见,它实现了所有三种 sequence details 策略:FileAggregatorFileSplitterFileAggregatorspring-doc.cn

  • with the attribute 用于相关键计算。 在 上启用标记后,它不会填充序列详细信息标头,因为 START/END 标记消息也包含在序列大小中。 仍会为发出的每一行填充 ,包括 START/END 标记消息。HeaderAttributeCorrelationStrategyFileHeaders.FILENAMEFileSplitterFileHeaders.FILENAMEspring-doc.cn

  • 的 - 检查组中的消息,然后将标头值与组大小减去 - 实例进行比较。 它还实现了一个方便的 contact for 函数,以便在 . 有关更多信息,请参阅 Message Group ConditionFileMarkerReleaseStrategyFileSplitter.FileMarker.Mark.ENDFileHeaders.LINE_COUNT2FileSplitter.FileMarkerGroupConditionProviderconditionSupplierAbstractCorrelatingMessageHandlerspring-doc.cn

  • 只需从组中删除消息,并将其余消息收集到列表有效负载中以生成。FileAggregatingMessageGroupProcessorFileSplitter.FileMarkerspring-doc.cn

以下清单显示了配置 :FileAggregatorspring-doc.cn

Java DSL
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
    return f -> f
            .split(Files.splitter()
                    .markers()
                    .firstLineAsHeader("firstLine"))
            .channel(c -> c.executor(taskExecutor))
            .filter(payload -> !(payload instanceof FileSplitter.FileMarker),
                    e -> e.discardChannel("aggregatorChannel"))
            .<String, String>transform(String::toUpperCase)
            .channel("aggregatorChannel")
            .aggregate(new FileAggregator())
            .channel(c -> c.queue("resultChannel"));
}
Kotlin DSL
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
    integrationFlow {
        split(Files.splitter().markers().firstLineAsHeader("firstLine"))
        channel { executor(taskExecutor) }
        filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
        transform(String::toUpperCase)
        channel("aggregatorChannel")
        aggregate(FileAggregator())
        channel { queue("resultChannel") }
    }
Java
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setProcessorBean(new FileAggregator());
    aggregator.setOutputChannel(outputChannel);
    return aggregator;
}
XML 格式
<int:chain input-channel="input" output-channel="output">
    <int-file:splitter markers="true"/>
    <int:aggregator>
        <bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
    </int:aggregator>
</int:chain>

如果 的默认行为不满足目标逻辑,则建议使用单个策略配置聚合器终端节点。 有关更多信息,请参阅 JavaDocs。FileAggregatorFileAggregatorspring-doc.cn

远程持久文件列表过滤器

默认情况下,入站和流入站远程文件通道适配器(、 和其他技术)配置了相应的实施,并配置了内存中的 。 要在集群中运行,可以使用共享的过滤器替换这些过滤器(有关更多信息,请参阅元数据存储)。 这些过滤器用于防止多次获取同一文件(除非修改时间更改)。 从版本 5.2 开始,在获取文件之前立即将文件添加到过滤器中(如果获取失败,则撤消)。FTPSFTPAbstractPersistentFileListFilterMetadataStoreMetadataStorespring-doc.cn

如果发生灾难性故障(例如断电),当前正在获取的文件可能会保留在过滤器中,并且在重新启动应用程序时不会重新获取。 在这种情况下,您需要从 .MetadataStore

在以前的版本中,在提取任何文件之前都会过滤文件,这意味着在发生灾难性故障后,多个文件可能处于此状态。spring-doc.cn

为了促进这种新行为,向 .FileListFilterspring-doc.cn

boolean accept(F file);

boolean supportsSingleFileFiltering();

如果过滤器返回 in ,则它必须实现 。truesupportsSingleFileFilteringaccept()spring-doc.cn

如果远程过滤器不支持单个文件过滤(例如 ),则适配器将恢复到以前的行为。AbstractMarkerFilePresentFileListFilterspring-doc.cn

如果使用多个过滤器(使用 或 ),则所有委托过滤器都必须支持单个文件过滤,以便复合过滤器支持它。CompositeFileListFilterChainFileListFilterspring-doc.cn

持久性文件列表过滤器现在具有 boolean 属性 。 将此属性设置为 , 还会设置 ,这意味着出站网关( 和 )上的递归操作现在每次都将始终遍历整个目录树。 这是为了解决未检测到目录树深处更改的问题。 此外,还会导致将文件的完整路径用作元数据存储键;这解决了以下问题:如果具有相同名称的文件在不同目录中多次出现,则过滤器无法正常工作。 重要说明:这意味着对于顶级目录下的文件,将无法找到持久性元数据存储中的现有键。 因此,默认情况下,该属性为;这可能会在未来版本中更改。forRecursiontruealwaysAcceptDirectorieslsmgetforRecursion=truefalsespring-doc.cn