FTP 入站通道适配器是一个特殊的侦听器,它连接到 FTP 服务器并侦听远程目录事件(例如,创建了新文件),此时它将启动文件传输。
以下示例说明如何配置 :inbound-channel-adapter
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
如前面的配置所示,您可以使用 element 配置 FTP 入站通道适配器,同时还为各种属性提供值,例如 , (基于简单的模式匹配,而不是正则表达式)和对 .inbound-channel-adapter
local-directory
filename-pattern
session-factory
默认情况下,传输的文件与原始文件同名。
如果要覆盖此行为,可以设置该属性,该属性允许您提供 SPEL 表达式来生成本地文件的名称。
与出站网关和适配器不同,其中 SPEL 评估上下文的根对象是 ,此入站适配器在评估时还没有消息,因为这是它最终以传输的文件作为其有效负载生成的。
因此,SPEL 评估上下文的根对象是远程文件的原始名称(a )。local-filename-generator-expression
Message
String
入站通道适配器首先检索本地目录的对象,然后根据 Poller 配置发出每个文件。
从版本 5.0 开始,您现在可以限制在需要检索新文件时从 FTP 服务器获取的文件数。
当目标文件非常大时,或者在具有持久文件列表过滤器的集群系统中运行时,这可能非常有用,稍后将讨论。
用于此目的。
负值(默认值)表示没有限制,并且将检索所有匹配的文件。
有关更多信息,请参见入站通道适配器:控制远程文件获取。
从版本 5.0 开始,您还可以通过设置 attribute 来为 提供自定义实现。File
max-fetch-size
DirectoryScanner
inbound-channel-adapter
scanner
从 Spring Integration 3.0 开始,您可以指定属性(其默认值为 )。
当 时,本地文件的修改时间戳设置为从服务器检索的值。
否则,它将设置为当前时间。preserve-timestamp
false
true
从版本 4.2 开始,您可以指定 而不是 ,让您动态确定每个轮询的目录 — 例如, .remote-directory-expression
remote-directory
remote-directory-expression="@myBean.determineRemoteDir()"
从版本 4.3 开始,您可以省略 and 属性。
它们默认为 .
在这种情况下,根据 FTP 协议,客户端工作目录将用作默认远程目录。remote-directory
remote-directory-expression
null
有时,基于使用该属性指定的简单模式的文件过滤可能还不够。
如果是这种情况,则可以使用该属性指定正则表达式(如 )。
此外,如果需要完全控制,则可以使用该属性并提供对 的任何自定义实现的引用,这是一个用于筛选文件列表的策略接口。
此筛选器确定要检索的远程文件。
您还可以使用 .filename-pattern
filename-regex
filename-regex=".*\.test$"
filter
o.s.i.file.filters.FileListFilter
AcceptOnceFileListFilter
CompositeFileListFilter
将其状态存储在内存中。
如果您希望该状态在系统重启后继续存在,请考虑改用 。
此筛选条件将接受的文件名存储在策略的实例中(请参阅 元数据存储)。
此过滤器匹配文件名和远程修改时间。AcceptOnceFileListFilter
FtpPersistentAcceptOnceFileListFilter
MetadataStore
从 4.0 版本开始,此过滤器需要一个 .
当与共享数据存储一起使用时(例如与 ),它允许在多个应用程序或服务器实例之间共享筛选键。ConcurrentMetadataStore
Redis
RedisMetadataStore
从版本 5.0 开始,默认情况下,将对 .
此过滤器还与 XML 配置中的 or 选项以及 Java DSL 中的 一起应用。
任何其他用例都可以使用 (或 ) 进行管理。FtpPersistentAcceptOnceFileListFilter
SimpleMetadataStore
FtpInboundFileSynchronizer
regex
pattern
FtpInboundChannelAdapterSpec
CompositeFileListFilter
ChainFileListFilter
前面的讨论是指在检索文件之前筛选文件。
检索文件后,将对文件系统上的文件应用额外的过滤器。
默认情况下,如前所述,这是一个在内存中保留状态并且不考虑文件的修改时间。
除非您的应用程序在处理后删除文件,否则默认情况下,适配器将在应用程序重新启动后重新处理磁盘上的文件。AcceptOnceFileListFilter
此外,如果您将 配置为使用 a 并且远程文件时间戳更改(导致它被重新获取),则默认本地过滤器不允许处理此新文件。filter
FtpPersistentAcceptOnceFileListFilter
有关此筛选器及其使用方法的更多信息,请参阅远程持久性文件列表筛选器。
您可以使用该属性来配置本地文件系统过滤器的行为。
从版本 4.3.8 开始,默认配置 a。
此筛选条件将接受的文件名和修改后的时间戳存储在策略的实例中(请参阅元数据存储),并检测对本地文件修改时间的更改。
默认值为 a ,它将状态存储在内存中。local-filter
FileSystemPersistentAcceptOnceFileListFilter
MetadataStore
MetadataStore
SimpleMetadataStore
从 4.1.5 版本开始,这些过滤器有一个新属性 (),使它们刷新
元数据存储(如果存储实现)。flushOnUpdate
Flushable
此外,如果使用分布式 (如 Redis) ,则可以拥有同一适配器或应用程序的多个实例,并确保每个文件只处理一次。MetadataStore |
实际的本地过滤器是包含提供的过滤器和模式过滤器的 a,该过滤器阻止处理正在下载的文件(基于 )。
下载带有此后缀的文件(默认值为 ),并在传输完成后将文件重命名为其最终名称,使其对过滤器“可见”。CompositeFileListFilter
temporary-file-suffix
.writing
该属性允许您配置文件分隔符,以便在默认 '/' 不适用于您的特定环境时使用。remote-file-separator
请参阅 架构 以了解有关这些属性的更多详细信息。
您还应该了解 FTP 入站通道适配器是轮询使用者。
因此,你必须配置一个 Poller (通过使用 global default 或 local sub-element)。
传输文件后,将生成一条以 a 作为有效负荷的消息,并将其发送到由属性标识的通道。java.io.File
channel
从版本 6.2 开始,您可以使用 .
可以使用属性配置此过滤器,以便过滤器仅传递早于此值的文件。
该年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早获取文件(例如,由于网络故障)。
有关详细信息,请查看其 Javadoc。FtpLastModifiedFileListFilter
age
此外,如果使用分布式 (如 Redis) ,则可以拥有同一适配器或应用程序的多个实例,并确保每个文件只处理一次。MetadataStore |
详细了解文件筛选和不完整文件
有时,刚刚出现在受监视(远程)目录中的文件并不完整。
通常,此类文件是使用临时扩展名(如 )写入的,然后在写入过程完成后重命名。
在大多数情况下,您只对完整的文件感兴趣,并且只想筛选完整的文件。
要处理这些情况,您可以使用 、 和 属性提供的筛选支持。
以下示例使用自定义筛选器实现:somefile.txt.writing
filename-pattern
filename-regex
filter
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
入站 FTP 适配器的轮询器配置说明
入站 FTP 适配器的作业包括两个任务:
-
与远程服务器通信,以便将文件从远程目录传输到本地目录。
-
对于每个传输的文件,生成一条消息,将该文件作为有效负载,并将其发送到由 'channel' 属性标识的通道。 这就是为什么它们被称为 “'channel adapters'” 而不仅仅是 “'adapters'”。 此类适配器的主要工作是生成要发送到消息通道的消息。 从本质上讲,第二个任务的优先级是,如果您的本地目录已经有一个或多个文件,则它首先从这些文件生成消息。 只有当所有本地文件都已处理完时,它才会启动远程通信以检索更多文件。
此外,在 Poller 上配置触发器时,应密切注意该属性。
其默认值适用于所有实例 (包括 FTP)。
这意味着,一旦处理了一个文件,它就会等待由触发器配置确定的下一个执行时间。
如果 中恰好有一个或多个文件位于 ,它将在启动与远程 FTP 服务器的通信之前处理这些文件。
此外,如果设置为 (默认),则它一次只处理一个文件,其间隔由触发器定义,本质上是 “one-poll === one-file” 工作。max-messages-per-poll
1
SourcePollingChannelAdapter
local-directory
max-messages-per-poll
1
对于典型的文件传输使用案例,您很可能希望发生相反的行为:处理每次轮询可以处理的所有文件,然后才等待下一次轮询。
如果是这种情况,请设置为 -1。
然后,在每次轮询时,适配器会尝试生成尽可能多的消息。
换句话说,它处理本地目录中的所有内容,然后连接到远程目录以传输所有可用内容,以便在本地处理。
只有这样,poll 操作才被视为完成,并且 Poller 等待下一次执行时间。max-messages-per-poll
您也可以将 'max-messages-per-poll' 值设置为正值,该值表示每次轮询要从文件创建的消息的上限。
例如,值 of 表示在每次轮询时,它尝试处理不超过 10 个文件。10
从故障中恢复
了解适配器的架构非常重要。
有一个文件同步器用于获取文件,还有一个为每个文件发出一条消息
synchronized 文件。
如前所述,涉及两个过滤器。
属性(和模式)是指远程 (FTP) 文件列表,以避免获取已经
被获取。
用于确定哪些文件将作为消息发送。FileReadingMessageSource
filter
local-filter
FileReadingMessageSource
同步器列出远程文件并查阅其过滤器。
然后传输文件。
如果在文件传输过程中发生 IO 错误,则会删除已添加到筛选器的任何文件,以便它们
有资格在下次轮询时重新获取。
这仅适用于过滤器实现 (如 )。ReversibleFileListFilter
AcceptOnceFileListFilter
如果在同步文件后,处理文件的下游流发生错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。
如果您希望在失败后重新处理此类文件,您可以使用类似于以下内容的配置来方便 从过滤器中删除失败的文件:
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 .ResettableFileListFilter
从版本 5.0 开始,入站通道适配器可以在本地构建与生成的本地文件名相对应的子目录。
那也可以是远程子路径。
为了能够递归读取本地目录以根据层次结构支持进行修改,您现在可以根据算法为 internal 提供新的目录。
有关更多信息,请参见AbstractInboundFileSynchronizingMessageSource.setScanner()。
此外,您现在可以将 切换到 -based by using 选项。
它还为所有实例进行了配置,以对 local directory 中的任何修改做出反应。
前面显示的 reprocessing 示例基于 to perform when the file is deleted () 的内置功能。
有关更多信息,请参阅 WatchServiceDirectoryScanner
。FileReadingMessageSource
RecursiveDirectoryScanner
Files.walk()
AbstractInboundFileSynchronizingMessageSource
WatchService
DirectoryScanner
setUseWatchService()
WatchEventType
FileReadingMessageSource.WatchServiceDirectoryScanner
ResettableFileListFilter.remove()
StandardWatchEventKinds.ENTRY_DELETE
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}