SFTP 入站通道适配器是一个特殊的侦听器,它连接到服务器并侦听远程目录事件(例如正在创建新文件),此时它将启动文件传输。 以下示例说明如何配置 SFTP 入站通道适配器:spring-doc.cn

<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
              session-factory="sftpSessionFactory"
            channel="requestChannel"
            filename-pattern="*.txt"
            remote-directory="/foo/bar"
            preserve-timestamp="true"
            local-directory="file:target/foo"
            auto-create-local-directory="true"
            local-filename-generator-expression="#this.toUpperCase() + '.a'"
            scanner="myDirScanner"
            local-filter="myFilter"
            temporary-file-suffix=".writing"
            max-fetch-size="-1"
            delete-remote-files="false">
        <int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

前面的配置示例显示了如何为各种属性提供值,包括以下内容:spring-doc.cn

  • local-directory:文件将要传输到的位置spring-doc.cn

  • remote-directory:要从中传输文件的远程源目录spring-doc.cn

  • session-factory:对我们之前配置的 bean 的引用spring-doc.cn

默认情况下,传输的文件与原始文件同名。 如果要覆盖此行为,可以设置该属性,该属性允许您提供 SPEL 表达式来生成本地文件的名称。 与出站网关和适配器不同,其中 SPEL 评估上下文的根对象是 ,此入站适配器在评估时还没有消息,因为这是它最终以传输的文件作为其有效负载生成的消息。 因此,SPEL 评估上下文的根对象是远程文件的原始名称(a )。local-filename-generator-expressionMessageStringspring-doc.cn

入站通道适配器首先将文件检索到本地目录,然后根据 Poller 配置发出每个文件。 从版本 5.0 开始,当需要检索新文件时,您可以限制从 SFTP 服务器获取的文件数。 当目标文件很大或在具有持久文件列表过滤器的集群系统中运行时,这可能是有益的,本节稍后将讨论。 用于此目的。 负值(默认值)表示没有限制,并且将检索所有匹配的文件。 有关更多信息,请参见入站通道适配器:控制远程文件获取。 从版本 5.0 开始,您还可以通过设置 attribute 来为 提供自定义实现。max-fetch-sizeDirectoryScannerinbound-channel-adapterscannerspring-doc.cn

从 Spring Integration 3.0 开始,您可以指定属性(默认为 )。 当 时,本地文件的修改时间戳设置为从服务器检索的值。 否则,它将设置为当前时间。preserve-timestampfalsetruespring-doc.cn

从版本 4.2 开始,您可以指定 instead 而不是 ,这样您就可以动态确定每个轮询的目录 — 例如, .remote-directory-expressionremote-directoryremote-directory-expression="@myBean.determineRemoteDir()"spring-doc.cn

有时,基于 via 属性指定的简单模式的文件过滤可能还不够。 如果是这种情况,则可以使用该属性指定正则表达式(例如 )。 如果需要完全控制,则可以使用该属性提供对 的自定义实现的引用,该实现是用于筛选文件列表的策略接口。 此筛选器确定要检索的远程文件。 您还可以使用 .filename-patternfilename-regexfilename-regex=".*\.test$"filterorg.springframework.integration.file.filters.FileListFilterAcceptOnceFileListFilterCompositeFileListFilterspring-doc.cn

将其状态存储在内存中。 如果您希望该状态在系统重启后继续存在,请考虑改用 。 此筛选条件将接受的文件名存储在策略的实例中(请参阅 元数据存储)。 此过滤器匹配文件名和远程修改时间。AcceptOnceFileListFilterSftpPersistentAcceptOnceFileListFilterMetadataStorespring-doc.cn

从 4.0 版本开始,此过滤器需要一个 . 当与共享数据存储一起使用时(例如与 ),这允许在多个应用程序或服务器实例之间共享筛选键。ConcurrentMetadataStoreRedisRedisMetadataStorespring-doc.cn

从版本 5.0 开始,默认情况下,将对 . 此过滤器还与 XML 配置中的 or 选项一起应用,以及 Java DSL 中的 through。 您可以使用 (或 ) 处理任何其他用例。SftpPersistentAcceptOnceFileListFilterSimpleMetadataStoreSftpInboundFileSynchronizerregexpatternSftpInboundChannelAdapterSpecCompositeFileListFilterChainFileListFilterspring-doc.cn

上面的讨论是指在检索文件之前过滤文件。 检索文件后,将对文件系统上的文件应用额外的过滤器。 默认情况下,这是一个 'AcceptOnceFileListFilter',如本节所述,它将状态保留在内存中,并且不考虑文件的修改时间。 除非您的应用程序在处理后删除文件,否则默认情况下,适配器会在应用程序重新启动后重新处理磁盘上的文件。spring-doc.cn

此外,如果将 配置为使用 a 并且远程文件时间戳更改(导致重新获取),则默认本地过滤器不允许处理此新文件。filterSftpPersistentAcceptOnceFileListFilterspring-doc.cn

有关此筛选器及其使用方法的更多信息,请参阅远程持久性文件列表筛选器spring-doc.cn

您可以使用该属性来配置本地文件系统过滤器的行为。 从版本 4.3.8 开始,默认配置 a。 此筛选条件将接受的文件名和修改后的时间戳存储在策略的实例中(请参阅元数据存储),并检测对本地文件修改时间的更改。 默认值是将状态存储在内存中的 a。local-filterFileSystemPersistentAcceptOnceFileListFilterMetadataStoreMetadataStoreSimpleMetadataStorespring-doc.cn

从版本 4.1.5 开始,这些过滤器有一个名为 的新属性,该属性使它们刷新 元数据存储(如果存储实现)。flushOnUpdateFlushablespring-doc.cn

此外,如果您使用分布式(例如 Redis 元数据存储),则可以拥有同一适配器或应用程序的多个实例,并确保一个且只有一个实例处理一个文件。MetadataStore

实际的本地过滤器是包含提供的过滤器和模式过滤器的 a,该过滤器阻止处理正在下载的文件(基于 )。 下载带有此后缀的文件(默认为 ),并在传输完成后将文件重命名为其最终名称,使其对过滤器“可见”。CompositeFileListFiltertemporary-file-suffix.writingspring-doc.cn

有关这些属性的更多详细信息,请参阅 schema.spring-doc.cn

SFTP 入站通道适配器是轮询使用者。 因此,你必须配置一个 Poller (全局默认值或本地元素)。 将文件传输到本地目录后,将生成一条 payload type 为的消息,并将其发送到由属性标识的通道。java.io.Filechannelspring-doc.cn

从版本 6.2 开始,您可以使用 根据上次修改的策略筛选 SFTP 文件。 可以使用属性配置此过滤器,以便过滤器仅传递早于此值的文件。 该年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早获取文件(例如,由于网络故障)。 有关详细信息,请查看其 Javadoc。SftpLastModifiedFileListFilteragespring-doc.cn

此外,如果您使用分布式(例如 Redis 元数据存储),则可以拥有同一适配器或应用程序的多个实例,并确保一个且只有一个实例处理一个文件。MetadataStore

详细了解文件筛选和大文件

有时,刚刚出现在受监视 (远程) 目录中的文件并不完整。 通常,此类文件使用某个临时扩展名(例如在 名为 的 file 上)写入,然后在写入过程完成后重命名。 在大多数情况下,开发人员只对完整的文件感兴趣,并且只想筛选这些文件。 要处理这些情况,您可以使用 、 和 属性提供的筛选支持。 如果您需要自定义过滤器实现,可以通过设置 attribute 在适配器中包含引用。 以下示例显示了如何执行此操作:.writingsomething.txt.writingfilename-patternfilename-regexfilterfilterspring-doc.cn

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="receiveChannel"
            session-factory="sftpSessionFactory"
            filter="customFilter"
            local-directory="file:/local-test-dir"
            remote-directory="/remote-test-dir">
        <int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>

<bean id="customFilter" class="org.foo.CustomFilter"/>

从故障中恢复

您应该了解适配器的架构。 文件同步器获取文件,并为每个同步文件发出一条消息。 如前所述,涉及两个过滤器。 属性 (和模式) 引用远程 (SFTP) 文件列表,以避免获取已获取的文件。 使用 来确定哪些文件将作为消息发送。FileReadingMessageSourcefilterFileReadingMessageSourcelocal-filterspring-doc.cn

同步器列出远程文件并查阅其过滤器。 然后传输文件。 如果在文件传输过程中发生 IO 错误,则会删除已添加到过滤器的任何文件,以便它们有资格在下次轮询时重新获取。 仅当过滤器实现 (如 ) 时,这才适用。ReversibleFileListFilterAcceptOnceFileListFilterspring-doc.cn

如果在同步文件后,处理文件的下游流发生错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。spring-doc.cn

如果您希望在失败后重新处理此类文件,可以使用类似于以下内容的配置,以便于从过滤器中删除失败的文件:spring-doc.cn

<int-sftp:inbound-channel-adapter id="sftpAdapter"
        session-factory="sftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/sftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-sftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

上述配置适用于任何 .ResettableFileListFilterspring-doc.cn

从版本 5.0 开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。 那也可以是远程子路径。 为了能够递归读取本地目录以根据层次结构支持进行修改,您现在可以根据算法为 internal 提供新的目录。 有关更多信息,请参见AbstractInboundFileSynchronizingMessageSource.setScanner()。 此外,您现在可以将 切换到 -based by using 选项。 它还为所有实例配置了 ID,以便对 local directory 中的任何修改做出反应。 前面显示的 reprocessing 示例基于 的内置功能,该功能使用从本地目录中删除文件时 ()。 有关更多信息,请参阅 WatchServiceDirectoryScannerFileReadingMessageSourceRecursiveDirectoryScannerFiles.walk()AbstractInboundFileSynchronizingMessageSourceWatchServiceDirectoryScannersetUseWatchService()WatchEventTypeFileReadingMessageSource.WatchServiceDirectoryScannerResettableFileListFilter.remove()StandardWatchEventKinds.ENTRY_DELETEspring-doc.cn

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置入站适配器的示例:spring-doc.cn

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 进行配置

Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:spring-doc.cn

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlow
            .from(Sftp.inboundAdapter(this.sftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilenameExpression("#this.toUpperCase() + '.a'")
                    .localDirectory(new File("sftp-inbound")),
                 e -> e.id("sftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

处理不完整的数据

提供 用于筛选在远程系统上没有相应标记文件的远程文件。 有关配置信息,请参阅 JavadocSftpSystemMarkerFilePresentFileListFilterspring-doc.cn