对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
SFTP 入站通道适配器
SFTP 入站通道适配器是一个特殊的侦听器,它连接到服务器并侦听远程目录事件(例如正在创建新文件),此时它将启动文件传输。 以下示例说明如何配置 SFTP 入站通道适配器:
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
前面的配置示例显示了如何为各种属性提供值,包括以下内容:
-
local-directory
:文件将要传输到的位置 -
remote-directory
:要从中传输文件的远程源目录 -
session-factory
:对我们之前配置的 bean 的引用
默认情况下,传输的文件与原始文件同名。
如果要覆盖此行为,可以设置该属性,该属性允许您提供 SPEL 表达式来生成本地文件的名称。
与出站网关和适配器不同,其中 SPEL 评估上下文的根对象是 ,此入站适配器在评估时还没有消息,因为这是它最终以传输的文件作为其有效负载生成的消息。
因此,SPEL 评估上下文的根对象是远程文件的原始名称(a )。local-filename-generator-expression
Message
String
入站通道适配器首先将文件检索到本地目录,然后根据 Poller 配置发出每个文件。
从版本 5.0 开始,当需要检索新文件时,您可以限制从 SFTP 服务器获取的文件数。
当目标文件很大或在具有持久文件列表过滤器的集群系统中运行时,这可能是有益的,本节稍后将讨论。
用于此目的。
负值(默认值)表示没有限制,并且将检索所有匹配的文件。
有关更多信息,请参见入站通道适配器:控制远程文件获取。
从版本 5.0 开始,您还可以通过设置 attribute 来为 提供自定义实现。max-fetch-size
DirectoryScanner
inbound-channel-adapter
scanner
从 Spring Integration 3.0 开始,您可以指定属性(默认为 )。
当 时,本地文件的修改时间戳设置为从服务器检索的值。
否则,它将设置为当前时间。preserve-timestamp
false
true
从版本 4.2 开始,您可以指定 instead 而不是 ,这样您就可以动态确定每个轮询的目录 — 例如, .remote-directory-expression
remote-directory
remote-directory-expression="@myBean.determineRemoteDir()"
有时,基于 via 属性指定的简单模式的文件过滤可能还不够。
如果是这种情况,则可以使用该属性指定正则表达式(例如 )。
如果需要完全控制,则可以使用该属性提供对 的自定义实现的引用,该实现是用于筛选文件列表的策略接口。
此筛选器确定要检索的远程文件。
您还可以使用 .filename-pattern
filename-regex
filename-regex=".*\.test$"
filter
org.springframework.integration.file.filters.FileListFilter
AcceptOnceFileListFilter
CompositeFileListFilter
将其状态存储在内存中。
如果您希望该状态在系统重启后继续存在,请考虑改用 。
此筛选条件将接受的文件名存储在策略的实例中(请参阅 元数据存储)。
此过滤器匹配文件名和远程修改时间。AcceptOnceFileListFilter
SftpPersistentAcceptOnceFileListFilter
MetadataStore
从 4.0 版本开始,此过滤器需要一个 .
当与共享数据存储一起使用时(例如与 ),这允许在多个应用程序或服务器实例之间共享筛选键。ConcurrentMetadataStore
Redis
RedisMetadataStore
从版本 5.0 开始,默认情况下,将对 .
此过滤器还与 XML 配置中的 or 选项一起应用,以及 Java DSL 中的 through。
您可以使用 (或 ) 处理任何其他用例。SftpPersistentAcceptOnceFileListFilter
SimpleMetadataStore
SftpInboundFileSynchronizer
regex
pattern
SftpInboundChannelAdapterSpec
CompositeFileListFilter
ChainFileListFilter
上面的讨论是指在检索文件之前过滤文件。 检索文件后,将对文件系统上的文件应用额外的过滤器。 默认情况下,这是一个 'AcceptOnceFileListFilter',如本节所述,它将状态保留在内存中,并且不考虑文件的修改时间。 除非您的应用程序在处理后删除文件,否则默认情况下,适配器会在应用程序重新启动后重新处理磁盘上的文件。
此外,如果将 配置为使用 a 并且远程文件时间戳更改(导致重新获取),则默认本地过滤器不允许处理此新文件。filter
SftpPersistentAcceptOnceFileListFilter
有关此筛选器及其使用方法的更多信息,请参阅远程持久性文件列表筛选器。
您可以使用该属性来配置本地文件系统过滤器的行为。
从版本 4.3.8 开始,默认配置 a。
此筛选条件将接受的文件名和修改后的时间戳存储在策略的实例中(请参阅元数据存储),并检测对本地文件修改时间的更改。
默认值是将状态存储在内存中的 a。local-filter
FileSystemPersistentAcceptOnceFileListFilter
MetadataStore
MetadataStore
SimpleMetadataStore
从版本 4.1.5 开始,这些过滤器有一个名为 的新属性,该属性使它们刷新
元数据存储(如果存储实现)。flushOnUpdate
Flushable
此外,如果您使用分布式(例如 Redis 元数据存储),则可以拥有同一适配器或应用程序的多个实例,并确保一个且只有一个实例处理一个文件。MetadataStore |
实际的本地过滤器是包含提供的过滤器和模式过滤器的 a,该过滤器阻止处理正在下载的文件(基于 )。
下载带有此后缀的文件(默认为 ),并在传输完成后将文件重命名为其最终名称,使其对过滤器“可见”。CompositeFileListFilter
temporary-file-suffix
.writing
有关这些属性的更多详细信息,请参阅 schema.
SFTP 入站通道适配器是轮询使用者。
因此,你必须配置一个 Poller (全局默认值或本地元素)。
将文件传输到本地目录后,将生成一条 payload type 为的消息,并将其发送到由属性标识的通道。java.io.File
channel
从版本 6.2 开始,您可以使用 根据上次修改的策略筛选 SFTP 文件。
可以使用属性配置此过滤器,以便过滤器仅传递早于此值的文件。
该年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早获取文件(例如,由于网络故障)。
有关详细信息,请查看其 Javadoc。SftpLastModifiedFileListFilter
age
详细了解文件筛选和大文件
有时,刚刚出现在受监视 (远程) 目录中的文件并不完整。
通常,此类文件使用某个临时扩展名(例如在 名为 的 file 上)写入,然后在写入过程完成后重命名。
在大多数情况下,开发人员只对完整的文件感兴趣,并且只想筛选这些文件。
要处理这些情况,您可以使用 、 和 属性提供的筛选支持。
如果您需要自定义过滤器实现,可以通过设置 attribute 在适配器中包含引用。
以下示例显示了如何执行此操作:.writing
something.txt.writing
filename-pattern
filename-regex
filter
filter
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
从故障中恢复
您应该了解适配器的架构。
文件同步器获取文件,并为每个同步文件发出一条消息。
如前所述,涉及两个过滤器。
属性 (和模式) 引用远程 (SFTP) 文件列表,以避免获取已获取的文件。
使用 来确定哪些文件将作为消息发送。FileReadingMessageSource
filter
FileReadingMessageSource
local-filter
同步器列出远程文件并查阅其过滤器。
然后传输文件。
如果在文件传输过程中发生 IO 错误,则会删除已添加到过滤器的任何文件,以便它们有资格在下次轮询时重新获取。
仅当过滤器实现 (如 ) 时,这才适用。ReversibleFileListFilter
AcceptOnceFileListFilter
如果在同步文件后,处理文件的下游流发生错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。
如果您希望在失败后重新处理此类文件,可以使用类似于以下内容的配置,以便于从过滤器中删除失败的文件:
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 .ResettableFileListFilter
从版本 5.0 开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。
那也可以是远程子路径。
为了能够递归读取本地目录以根据层次结构支持进行修改,您现在可以根据算法为 internal 提供新的目录。
有关更多信息,请参见AbstractInboundFileSynchronizingMessageSource.setScanner()。
此外,您现在可以将 切换到 -based by using 选项。
它还为所有实例配置了 ID,以便对 local directory 中的任何修改做出反应。
前面显示的 reprocessing 示例基于 的内置功能,该功能使用从本地目录中删除文件时 ()。
有关更多信息,请参阅 WatchServiceDirectoryScanner
。FileReadingMessageSource
RecursiveDirectoryScanner
Files.walk()
AbstractInboundFileSynchronizingMessageSource
WatchService
DirectoryScanner
setUseWatchService()
WatchEventType
FileReadingMessageSource.WatchServiceDirectoryScanner
ResettableFileListFilter.remove()
StandardWatchEventKinds.ENTRY_DELETE
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}