FTP/FTPS适配器
FTP/FTPS 适配器
Spring 集成支持使用 FTP 和 FTPS 进行文件传输操作。
文件传输协议 (FTP) 是一种简单的网络协议,可让您在 Internet 上的两台计算机之间传输文件。 FTPS 代表“FTP over SSL”。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-ftp:6.0.9"
在 FTP 通信方面,有两个参与者:客户端和服务器。 要使用 FTP 或 FTPS 传输文件,请使用启动与运行 FTP 服务器的远程计算机的连接的客户端。 建立连接后,客户端可以选择发送或接收文件副本。
Spring 集成通过提供三个客户端端点来支持通过 FTP 或 FTPS 发送和接收文件:入站通道适配器、出站通道适配器和出站网关。 它还为定义这些客户端组件提供了方便的基于命名空间的配置选项。
要使用 FTP 命名空间,请将以下内容添加到 XML 文件的标头中:
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp
https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd"
FTP 会话工厂
Spring 集成提供了可用于创建 FTP(或 FTPS)会话的工厂。
默认工厂
从版本 3.0 开始,默认情况下不再缓存会话。 请参阅 FTP 会话缓存。 |
在配置 FTP 适配器之前,必须配置 FTP 会话工厂。
您可以使用常规 bean 定义配置 FTP 会话工厂,其中实现类为 。
以下示例显示了基本配置:o.s.i.ftp.session.DefaultFtpSessionFactory
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="kermit"/>
<property name="password" value="frog"/>
<property name="clientMode" value="0"/>
<property name="fileType" value="2"/>
<property name="bufferSize" value="100000"/>
</bean>
对于 FTPS 连接,您可以改用。o.s.i.ftp.session.DefaultFtpsSessionFactory
以下示例显示了完整的配置:
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpsSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="oleg"/>
<property name="password" value="password"/>
<property name="clientMode" value="1"/>
<property name="fileType" value="2"/>
<property name="useClientMode" value="true"/>
<property name="cipherSuites" value="a,b.c"/>
<property name="keyManager" ref="keyManager"/>
<property name="protocol" value="SSL"/>
<property name="trustManager" ref="trustManager"/>
<property name="prot" value="P"/>
<property name="needClientAuth" value="true"/>
<property name="authValue" value="oleg"/>
<property name="sessionCreation" value="true"/>
<property name="protocols" value="SSL, TLS"/>
<property name="implicit" value="true"/>
</bean>
如果您遇到连接问题,并且想要跟踪会话创建并查看轮询了哪些会话,则可以通过将 Logger 设置为级别(例如,)。TRACE log4j.category.org.springframework.integration.file=TRACE |
现在你只需要将这些 session factories 注入到你的适配器中。 适配器使用的协议 (FTP 或 FTPS) 取决于已注入适配器的会话工厂的类型。
为 FTP 或 FTPS 会话工厂提供值的更实用的方法是使用 Spring 的属性占位符支持(参见 https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-factory-placeholderconfigurer)。 |
高级配置
DefaultFtpSessionFactory
提供了对底层客户端 API 的抽象,该 API(自 Spring Integration 2.0 起)是 Apache Commons Net。
这样,您就无需了解 .
会话工厂上公开了几个常见属性(从版本 4.0 开始,现在包括 、 和 )。
但是,您有时需要访问较低级别的配置以实现更高级的配置(例如设置活动模式的端口范围)。
为此,(所有 FTP 会话工厂的基类)以以下清单中所示的两种后处理方法的形式公开钩子:org.apache.commons.net.ftp.FTPClient
connectTimeout
defaultTimeout
dataTimeout
FTPClient
AbstractFtpSessionFactory
/**
* Will handle additional initialization after client.connect() method was invoked,
* but before any action on the client has been taken
*/
protected void postProcessClientAfterConnect(T t) throws IOException {
// NOOP
}
/**
* Will handle additional initialization before client.connect() method was invoked.
*/
protected void postProcessClientBeforeConnect(T client) throws IOException {
// NOOP
}
如您所见,这两种方法没有默认实现。
但是,通过扩展 ,您可以覆盖这些方法以提供 的更高级配置 ,如下例所示:DefaultFtpSessionFactory
FTPClient
public class AdvancedFtpSessionFactory extends DefaultFtpSessionFactory {
protected void postProcessClientBeforeConnect(FTPClient ftpClient) throws IOException {
ftpClient.setActivePortRange(4000, 5000);
}
}
FTPS 和共享 SSLSession
当使用 FTP over SSL 或 TLS 时,某些服务器要求在控制和数据连接上使用相同的 FTP。
这是为了防止 “窃取” 数据连接。
有关更多信息,请参阅 https://scarybeastsecurity.blogspot.cz/2009/02/vsftpd-210-released.html。SSLSession
目前,Apache FTPSClient 不支持此功能。 请参阅 NET-408。
以下解决方案由 Stack Overflow 提供,它使用 上的反射,因此它可能不适用于其他 JVM。
堆栈溢出答案于 2015 年提交,Spring 集成团队已在 JDK 1.8.0_112 上测试了该解决方案。sun.security.ssl.SSLSessionContextImpl
以下示例说明如何创建 FTPS 会话:
@Bean
public DefaultFtpsSessionFactory sf() {
DefaultFtpsSessionFactory sf = new DefaultFtpsSessionFactory() {
@Override
protected FTPSClient createClientInstance() {
return new SharedSSLFTPSClient();
}
};
sf.setHost("...");
sf.setPort(21);
sf.setUsername("...");
sf.setPassword("...");
sf.setNeedClientAuth(true);
return sf;
}
private static final class SharedSSLFTPSClient extends FTPSClient {
@Override
protected void _prepareDataSocket_(final Socket socket) throws IOException {
if (socket instanceof SSLSocket) {
// Control socket is SSL
final SSLSession session = ((SSLSocket) _socket_).getSession();
final SSLSessionContext context = session.getSessionContext();
context.setSessionCacheSize(0); // you might want to limit the cache
try {
final Field sessionHostPortCache = context.getClass()
.getDeclaredField("sessionHostPortCache");
sessionHostPortCache.setAccessible(true);
final Object cache = sessionHostPortCache.get(context);
final Method method = cache.getClass().getDeclaredMethod("put", Object.class,
Object.class);
method.setAccessible(true);
String key = String.format("%s:%s", socket.getInetAddress().getHostName(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
key = String.format("%s:%s", socket.getInetAddress().getHostAddress(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
}
catch (NoSuchFieldException e) {
// Not running in expected JRE
logger.warn("No field sessionHostPortCache in SSLSessionContext", e);
}
catch (Exception e) {
// Not running in expected JRE
logger.warn(e.getMessage());
}
}
}
}
委派 Session Factory
版本 4.2 引入了 ,它允许在运行时选择实际的会话工厂。
在调用 FTP 端点之前,请调用工厂以将密钥与当前线程关联。
然后,该键用于查找要使用的实际 session factory。
您可以通过 call after use 清除密钥。DelegatingSessionFactory
setThreadKey()
clearThreadKey()
我们添加了便捷的方法,以便您可以轻松地从消息流中使用委托会话工厂。
下面的示例展示了如何声明一个委托的 session factory:
<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
<constructor-arg>
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
<!-- delegate factories here -->
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel="in" output-channel="c1"
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />
<int-ftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />
<int:service-activator input-channel="c2" output-channel="out"
expression="@dsf.clearThreadKey(#root)" />
当您使用会话缓存时(请参阅 FTP 会话缓存),应缓存每个委托。
您无法缓存 itself.DelegatingSessionFactory |
从版本 5.0.7 开始,可以与 结合使用以轮询多个服务器;请参见入站通道适配器:轮询多个服务器和目录。DelegatingSessionFactory
RotatingServerAdvice
FTP 入站通道适配器
FTP 入站通道适配器是一个特殊的侦听器,它连接到 FTP 服务器并侦听远程目录事件(例如,创建了新文件),此时它将启动文件传输。
以下示例说明如何配置 :inbound-channel-adapter
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
如前面的配置所示,您可以使用 element 配置 FTP 入站通道适配器,同时还为各种属性提供值,例如 , (基于简单的模式匹配,而不是正则表达式)和对 .inbound-channel-adapter
local-directory
filename-pattern
session-factory
默认情况下,传输的文件与原始文件同名。
如果要覆盖此行为,可以设置该属性,该属性允许您提供 SPEL 表达式来生成本地文件的名称。
与出站网关和适配器不同,其中 SPEL 评估上下文的根对象是 ,此入站适配器在评估时还没有消息,因为这是它最终以传输的文件作为其有效负载生成的。
因此,SPEL 评估上下文的根对象是远程文件的原始名称(a )。local-filename-generator-expression
Message
String
入站通道适配器首先检索本地目录的对象,然后根据 Poller 配置发出每个文件。
从版本 5.0 开始,您现在可以限制在需要检索新文件时从 FTP 服务器获取的文件数。
当目标文件非常大时,或者在具有持久文件列表过滤器的集群系统中运行时,这可能非常有用,稍后将讨论。
用于此目的。
负值(默认值)表示没有限制,并且将检索所有匹配的文件。
有关更多信息,请参见入站通道适配器:控制远程文件获取。
从版本 5.0 开始,您还可以通过设置 attribute 来为 提供自定义实现。File
max-fetch-size
DirectoryScanner
inbound-channel-adapter
scanner
从 Spring Integration 3.0 开始,您可以指定属性(其默认值为 )。
当 时,本地文件的修改时间戳设置为从服务器检索的值。
否则,它将设置为当前时间。preserve-timestamp
false
true
从版本 4.2 开始,您可以指定 而不是 ,让您动态确定每个轮询的目录 — 例如, .remote-directory-expression
remote-directory
remote-directory-expression="@myBean.determineRemoteDir()"
从版本 4.3 开始,您可以省略 and 属性。
它们默认为 .
在这种情况下,根据 FTP 协议,客户端工作目录将用作默认远程目录。remote-directory
remote-directory-expression
null
有时,基于使用该属性指定的简单模式的文件过滤可能还不够。
如果是这种情况,则可以使用该属性指定正则表达式(如 )。
此外,如果需要完全控制,则可以使用该属性并提供对 的任何自定义实现的引用,这是一个用于筛选文件列表的策略接口。
此筛选器确定要检索的远程文件。
您还可以使用 .filename-pattern
filename-regex
filename-regex=".*\.test$"
filter
o.s.i.file.filters.FileListFilter
AcceptOnceFileListFilter
CompositeFileListFilter
将其状态存储在内存中。
如果您希望该状态在系统重启后继续存在,请考虑改用 。
此筛选条件将接受的文件名存储在策略的实例中(请参阅 元数据存储)。
此过滤器匹配文件名和远程修改时间。AcceptOnceFileListFilter
FtpPersistentAcceptOnceFileListFilter
MetadataStore
从 4.0 版本开始,此过滤器需要一个 .
当与共享数据存储一起使用时(例如与 ),它允许在多个应用程序或服务器实例之间共享筛选键。ConcurrentMetadataStore
Redis
RedisMetadataStore
从版本 5.0 开始,默认情况下,将对 .
此过滤器还与 XML 配置中的 or 选项以及 Java DSL 中的 一起应用。
任何其他用例都可以使用 (或 ) 进行管理。FtpPersistentAcceptOnceFileListFilter
SimpleMetadataStore
FtpInboundFileSynchronizer
regex
pattern
FtpInboundChannelAdapterSpec
CompositeFileListFilter
ChainFileListFilter
前面的讨论是指在检索文件之前筛选文件。
检索文件后,将对文件系统上的文件应用额外的过滤器。
默认情况下,如前所述,这是一个在内存中保留状态并且不考虑文件的修改时间。
除非您的应用程序在处理后删除文件,否则默认情况下,适配器将在应用程序重新启动后重新处理磁盘上的文件。AcceptOnceFileListFilter
此外,如果您将 配置为使用 a 并且远程文件时间戳更改(导致它被重新获取),则默认本地过滤器不允许处理此新文件。filter
FtpPersistentAcceptOnceFileListFilter
有关此筛选器及其使用方法的更多信息,请参阅远程持久性文件列表筛选器。
您可以使用该属性来配置本地文件系统过滤器的行为。
从版本 4.3.8 开始,默认配置 a。
此筛选条件将接受的文件名和修改后的时间戳存储在策略的实例中(请参阅元数据存储),并检测对本地文件修改时间的更改。
默认值为 a ,它将状态存储在内存中。local-filter
FileSystemPersistentAcceptOnceFileListFilter
MetadataStore
MetadataStore
SimpleMetadataStore
从 4.1.5 版本开始,这些过滤器有一个新属性 (),使它们刷新
元数据存储(如果存储实现)。flushOnUpdate
Flushable
此外,如果使用分布式 (如 Redis) ,则可以拥有同一适配器或应用程序的多个实例,并确保每个文件只处理一次。MetadataStore |
实际的本地过滤器是包含提供的过滤器和模式过滤器的 a,该过滤器阻止处理正在下载的文件(基于 )。
下载带有此后缀的文件(默认值为 ),并在传输完成后将文件重命名为其最终名称,使其对过滤器“可见”。CompositeFileListFilter
temporary-file-suffix
.writing
该属性允许您配置文件分隔符,以便在默认 '/' 不适用于您的特定环境时使用。remote-file-separator
请参阅 架构 以了解有关这些属性的更多详细信息。
您还应该了解 FTP 入站通道适配器是轮询使用者。
因此,你必须配置一个 Poller (通过使用 global default 或 local sub-element)。
传输文件后,将生成一条以 a 作为有效负荷的消息,并将其发送到由属性标识的通道。java.io.File
channel
详细了解文件筛选和不完整文件
有时,刚刚出现在受监视(远程)目录中的文件并不完整。
通常,此类文件是使用临时扩展名(如 )写入的,然后在写入过程完成后重命名。
在大多数情况下,您只对完整的文件感兴趣,并且只想筛选完整的文件。
要处理这些情况,您可以使用 、 和 属性提供的筛选支持。
以下示例使用自定义筛选器实现:somefile.txt.writing
filename-pattern
filename-regex
filter
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
入站 FTP 适配器的轮询器配置说明
入站 FTP 适配器的作业包括两个任务:
-
与远程服务器通信,以便将文件从远程目录传输到本地目录。
-
对于每个传输的文件,生成一条消息,将该文件作为有效负载,并将其发送到由 'channel' 属性标识的通道。 这就是为什么它们被称为 “'channel adapters'” 而不仅仅是 “'adapters'”。 此类适配器的主要工作是生成要发送到消息通道的消息。 从本质上讲,第二个任务的优先级是,如果您的本地目录已经有一个或多个文件,则它首先从这些文件生成消息。 只有当所有本地文件都已处理完时,它才会启动远程通信以检索更多文件。
此外,在 Poller 上配置触发器时,应密切注意该属性。
其默认值适用于所有实例 (包括 FTP)。
这意味着,一旦处理了一个文件,它就会等待由触发器配置确定的下一个执行时间。
如果 中恰好有一个或多个文件位于 ,它将在启动与远程 FTP 服务器的通信之前处理这些文件。
此外,如果设置为 (默认),则它一次只处理一个文件,其间隔由触发器定义,本质上是 “one-poll === one-file” 工作。max-messages-per-poll
1
SourcePollingChannelAdapter
local-directory
max-messages-per-poll
1
对于典型的文件传输使用案例,您很可能希望发生相反的行为:处理每次轮询可以处理的所有文件,然后才等待下一次轮询。
如果是这种情况,请设置为 -1。
然后,在每次轮询时,适配器会尝试生成尽可能多的消息。
换句话说,它处理本地目录中的所有内容,然后连接到远程目录以传输所有可用内容,以便在本地处理。
只有这样,poll 操作才被视为完成,并且 Poller 等待下一次执行时间。max-messages-per-poll
您也可以将 'max-messages-per-poll' 值设置为正值,该值表示每次轮询要从文件创建的消息的上限。
例如,值 of 表示在每次轮询时,它尝试处理不超过 10 个文件。10
从故障中恢复
了解适配器的架构非常重要。
有一个文件同步器用于获取文件,还有一个为每个文件发出一条消息
synchronized 文件。
如前所述,涉及两个过滤器。
属性(和模式)是指远程 (FTP) 文件列表,以避免获取已经
被获取。
用于确定哪些文件将作为消息发送。FileReadingMessageSource
filter
local-filter
FileReadingMessageSource
同步器列出远程文件并查阅其过滤器。
然后传输文件。
如果在文件传输过程中发生 IO 错误,则会删除已添加到筛选器的任何文件,以便它们
有资格在下次轮询时重新获取。
这仅适用于过滤器实现 (如 )。ReversibleFileListFilter
AcceptOnceFileListFilter
如果在同步文件后,处理文件的下游流发生错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。
如果您希望在失败后重新处理此类文件,您可以使用类似于以下内容的配置来方便 从过滤器中删除失败的文件:
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 .ResettableFileListFilter
从版本 5.0 开始,入站通道适配器可以在本地构建与生成的本地文件名相对应的子目录。
那也可以是远程子路径。
为了能够递归读取本地目录以根据层次结构支持进行修改,您现在可以根据算法为 internal 提供新的目录。
有关更多信息,请参见AbstractInboundFileSynchronizingMessageSource.setScanner()。
此外,您现在可以将 切换到 -based by using 选项。
它还为所有实例进行了配置,以对 local directory 中的任何修改做出反应。
前面显示的 reprocessing 示例基于 to perform when the file is deleted () 的内置功能。
有关更多信息,请参阅 WatchServiceDirectoryScanner
。FileReadingMessageSource
RecursiveDirectoryScanner
Files.walk()
AbstractInboundFileSynchronizingMessageSource
WatchService
DirectoryScanner
setUseWatchService()
WatchEventType
FileReadingMessageSource.WatchServiceDirectoryScanner
ResettableFileListFilter.remove()
StandardWatchEventKinds.ENTRY_DELETE
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
FTP 流入站通道适配器
版本 4.3 引入了流入站通道适配器。
此适配器生成 payloads 类型的 message ,让文件在不写入
local 文件系统。
由于会话保持打开状态,因此使用应用程序负责在文件
消耗。
会话在标头 () 中提供。
标准框架组件(如 和 )会自动关闭会话。
有关这些组件的更多信息,请参见File Splitter 和 Stream Transformer。
以下示例说明如何配置 :InputStream
closeableResource
IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
FileSplitter
StreamTransformer
inbound-streaming-channel-adapter
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
只允许 、 、 或 中的一个。filename-pattern
filename-regex
filter
filter-expression
从版本 5.0 开始,默认情况下,适配器会根据内存中的 .
默认情况下,此过滤器也与文件名模式(或 regex)一起应用。
如果需要允许重复项,可以使用 .
任何其他用例都可以由 (或 ) 处理。
Java 配置(本文档后面部分)显示了一种在处理后删除远程文件以避免重复的技术。FtpStreamingMessageSource FtpPersistentAcceptOnceFileListFilter SimpleMetadataStore AcceptAllFileListFilter CompositeFileListFilter ChainFileListFilter |
有关 的更多信息,请参见远程持久性文件列表过滤器。FtpPersistentAcceptOnceFileListFilter
使用 该属性来限制在需要 fetch 时每次轮询时获取的文件数。
将其设置为在集群环境中运行时使用持久过滤器。
有关更多信息,请参见入站通道适配器:控制远程文件获取。max-fetch-size
1
适配器将远程目录和文件名分别放在 和 headers 中。
从版本 5.0 开始,标头提供额外的远程文件信息(默认以 JSON 表示)。
如果在 to 上设置属性,则标头包含一个对象。
可以使用该方法访问底层 Apache Net 库提供的对象。
当您使用 XML 配置时,该属性不可用,但您可以通过将 the 注入到您的一个配置类中来设置它。
另请参阅远程文件信息。FileHeaders.REMOTE_DIRECTORY
FileHeaders.REMOTE_FILE
FileHeaders.REMOTE_FILE_INFO
fileInfoJson
FtpStreamingMessageSource
false
FtpFileInfo
FTPFile
FtpFileInfo.getFileInfo()
fileInfoJson
FtpStreamingMessageSource
从版本 5.1 开始,的泛型类型为 .
以前,它是 .
这是因为排序现在在处理的早期执行,在过滤和应用之前执行。comparator
FTPFile
AbstractFileInfo<FTPFile>
maxFetch
使用 Java 配置进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
请注意,在此示例中,转换器下游的消息处理程序具有一个,该处理程序在处理后删除远程文件。advice
入站通道适配器:轮询多个服务器和目录
从版本 5.0.7 开始,可以使用;当配置为 Poller Advice 时,入站适配器可以轮询多个服务器和目录。
配置建议并将其正常添加到 Poller 的建议链中。
A 用于选择服务器,有关更多信息,请参阅 Delegating Session Factory。
通知配置由对象列表组成。RotatingServerAdvice
DelegatingSessionFactory
RotationPolicy.KeyDirectory
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
此建议将轮询服务器上的目录,直到没有新文件存在,然后移动到 directory ,然后移动到 server 上的 directory ,依此类推。foo
one
bar
baz
two
可以使用构造函数 arg 修改此默认行为:fair
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
在这种情况下,无论上一个 poll 是否返回文件,通知都将移动到下一个服务器/目录。
或者,您也可以提供自己的信息以根据需要重新配置消息源:RotationPolicy
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
和
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
属性 ( 在 synchronizer 上) 现在可以包含变量。
这允许将从不同目录中检索到的文件下载到本地的类似目录:local-filename-generator-expression
localFilenameGeneratorExpression
#remoteDirectory
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Ftp.inboundAdapter(sf())
.filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
使用此建议时,不要在 Poller 上配置 a;有关更多信息,请参见Conditional Pollers for Message Sources。TaskExecutor |
入站通道适配器:控制远程文件获取
在配置入站通道适配器时,应考虑两个属性。与所有 Poller 一样,可用于限制每次轮询时发出的消息数(如果已准备好超过配置的值)。 (自版本 5.0 起)可以限制一次从远程服务器检索的文件数。max-messages-per-poll
max-fetch-size
以下场景假定起始状态为空的本地目录:
-
max-messages-per-poll=2
and:适配器获取一个文件,发出它,获取下一个文件,发出它,然后休眠直到下一次轮询。max-fetch-size=1
-
max-messages-per-poll=2
和 ):适配器获取这两个文件,然后发出每个文件。max-fetch-size=2
-
max-messages-per-poll=2
和:适配器最多获取四个文件(如果可用)并发出前两个文件(如果至少有两个)。 接下来的两个文件将在下一次轮询时发出。max-fetch-size=4
-
max-messages-per-poll=2
和 not specified:适配器获取所有远程文件并发出前两个(如果至少有两个)。 后续文件将在后续轮询时发出(一次两个)。 当所有文件都被消耗时,将再次尝试远程获取以获取任何新文件。max-fetch-size
当您部署应用程序的多个实例时,我们建议使用小型 ,以避免一个实例“抓取”所有文件并耗尽其他实例。max-fetch-size |
另一个用途是,如果您想停止获取远程文件,但继续处理已获取的文件。
在(以编程方式、使用 JMX 或控制总线)上设置该属性可以有效地阻止适配器获取更多文件,但允许 Poller 继续为以前获取的文件发出消息。
如果在更改属性时 poller 处于活动状态,则更改将在下一次轮询时生效。max-fetch-size
maxFetchSize
MessageSource
从版本 5.1 开始,可以为同步器提供 .
这在限制使用 获取的文件数量时非常有用。Comparator<FTPFile>
maxFetchSize
FTP 出站通道适配器
FTP 出站通道适配器依赖于连接到 FTP 服务器的实现,并为它在传入消息的有效负载中接收的每个文件启动 FTP 传输。
它还支持文件的多种表示形式,因此您不仅限于 -typed payloads。
FTP 出站通道适配器支持以下有效负载:MessageHandler
java.io.File
-
java.io.File
:实际文件对象 -
byte[]
:表示文件内容的字节数组 -
java.lang.String
:表示文件内容的文本 -
java.io.InputStream
:要传输到远程文件的数据流 -
org.springframework.core.io.Resource
:用于将数据传输到远程文件的资源
以下示例说明如何配置 :outbound-channel-adapter
<int-ftp:outbound-channel-adapter id="ftpOutbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
charset="UTF-8"
remote-file-separator="/"
auto-create-directory="true"
remote-directory-expression="headers['remote_dir']"
temporary-remote-directory-expression="headers['temp_remote_dir']"
filename-generator="fileNameGenerator"
use-temporary-filename="true"
chmod="600"
mode="REPLACE"/>
前面的配置显示了如何使用元素配置 FTP 出站通道适配器,同时还为各种属性提供值,例如(策略接口的实现)、对 a 的引用和其他属性。
您还可以查看一些属性示例,这些属性允许您使用 SPEL 配置设置,例如、 、 和 (前面的示例中所示的 SPEL 替代项)。
与任何允许使用 SPEL 的组件一样,可以通过“payload”和“headers”变量访问有效负载和消息 Headers。
请参阅 架构 有关可用属性的更多详细信息。outbound-channel-adapter
filename-generator
o.s.i.file.FileNameGenerator
session-factory
*expression
remote-directory-expression
temporary-remote-directory-expression
remote-filename-generator-expression
filename-generator
默认情况下,如果未指定文件名生成器,则 Spring Integration 使用 。 根据 中的标头(如果存在)的值确定文件名,或者,如果 Message 的有效负载已经是 ,则它使用该文件的原始名称。o.s.i.file.DefaultFileNameGenerator DefaultFileNameGenerator file_name MessageHeaders java.io.File |
定义某些值 (如 ) 可能取决于平台或 FTP 服务器。
例如,正如 https://forum.spring.io/showthread.php?p=333478&posted=1#post333478 上报告的那样,在某些平台上,您必须在目录定义的末尾添加斜杠(例如,而不是 )。remote-directory remote-directory="/thing1/thing2/" remote-directory="/thing1/thing2" |
从版本 4.1 开始,您可以指定 when transfer the file.
默认情况下,现有文件将被覆盖。
模式由枚举定义,其中包括以下值:mode
FileExistsMode
-
REPLACE
(默认) -
REPLACE_IF_MODIFIED
-
APPEND
-
APPEND_NO_FLUSH
-
IGNORE
-
FAIL
IGNORE
,并且不要传输文件。 导致引发异常,同时静默忽略传输(尽管生成了日志条目)。FAIL
FAIL
IGNORE
DEBUG
版本 5.2 引入了该属性,您可以使用该属性在上传后更改远程文件权限。
您可以使用传统的 Unix 八进制格式(例如,仅允许文件所有者进行读写操作)。
使用 java 配置适配器时,可以使用 或 。
仅当您的 FTP 服务器支持该子命令时才适用。chmod
600
setChmodOctal("600")
setChmod(0600)
SITE CHMOD
避免部分写入的文件
处理文件传输时出现的常见问题之一是处理部分文件的可能性。 也就是说,文件可能在传输实际完成之前就出现在文件系统中。
为了解决这个问题, Spring 集成 FTP 适配器使用一种通用算法:文件以临时名称传输,然后在完全传输后重命名。
默认情况下,正在传输的每个文件都会在文件系统中显示一个附加后缀,默认情况下,该后缀为 .
您可以通过设置属性来更改此后缀。.writing
temporary-file-suffix
但是,在某些情况下,您可能不想使用此技术(例如,如果服务器不允许重命名文件)。
对于此类情况,您可以通过设置为 (默认为 ) 来禁用此功能。
当此属性为 时,文件将以其最终名称写入,并且使用应用程序需要某种其他机制来检测文件是否已完全上传,然后再访问它。use-temporary-file-name
false
true
false
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpMessageHandler handler = new FtpMessageHandler(ftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message<?> message) {
return "handlerContent.test";
}
});
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public IntegrationFlow ftpOutboundFlow() {
return IntegrationFlow.from("toFtpChannel")
.handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
.useTemporaryFileName(false)
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
.remoteDirectory(this.ftpServer.getTargetFtpDirectory().getName())
).get();
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
FTP 出站网关
FTP 出站网关提供一组有限的命令来与远程 FTP 或 FTPS 服务器进行交互。 支持的命令包括:
-
ls
(列出文件) -
nlst
(列出文件名) -
get
(检索文件) -
mget
(检索文件) -
rm
(删除文件) -
mv
(移动/重命名文件) -
put
(发送文件) -
mput
(发送多个文件)
使用命令ls
ls
列出远程文件并支持以下选项:
-
-1
:检索文件名列表。 默认为检索对象列表。FileInfo
-
-a
:包括所有文件(包括以“.”开头的文件) -
-f
:不对列表进行排序 -
-dirs
:包括目录(默认情况下排除它们) -
-links
:包括符号链接(默认情况下,它们被排除在外) -
-R
:递归列出远程目录
此外,还提供文件名筛选,其方式与 .
请参阅 FTP 入站通道适配器。inbound-channel-adapter
操作生成的消息有效负载是文件名列表或对象列表。
这些对象提供修改时间、权限和其他详细信息等信息。ls
FileInfo
标头中提供了命令操作的远程目录。ls
file_remoteDirectory
使用递归选项 () 时,包括任何子目录元素,这些元素表示文件的相对路径(相对于远程目录)。
如果包含该选项,则每个递归目录也将作为列表中的一个元素返回。
在这种情况下,建议您不要使用该选项,因为您将无法区分文件和目录,而您可以对对象进行区分。-R
fileName
-dirs
-1
FileInfo
从版本 4.3 开始,对 and 方法的支持。
因此,您可以省略该属性。
为方便起见,Java 配置有两个没有参数的构造函数。
或 、 、 和 命令被视为客户端工作目录,根据 FTP 协议。
所有其他命令都必须随 一起提供,以便根据请求消息评估远程路径。
您可以在扩展和实现回调时使用函数设置工作目录。FtpSession
null
list()
listNames()
expression
expression
LS
NLST
PUT
MPUT
null
expression
FTPClient.changeWorkingDirectory()
DefaultFtpSessionFactory
postProcessClientAfterConnect()
使用命令nlst
版本 5 引入了对该命令的支持。nlst
nlst
列出远程文件名,并且仅支持一个选项:
-
-f
:不对列表进行排序
操作生成的消息有效负载是文件名列表。nlst
标头中提供了命令操作的远程目录。nlst
file_remoteDirectory
与使用该命令的 ls
命令的选项不同,该命令将命令发送到目标 FTP 服务器。
当服务器不支持时(例如,由于安全限制),此命令很有用。
操作的结果是没有其他详细信息的名称。
因此,框架无法确定实体是否为目录,例如,无法执行筛选或递归列表。-1
LIST
nlst
NLST
LIST
nlst
使用命令get
get
检索远程文件。
它支持以下选项:
-
-P
:保留远程文件的时间戳。 -
-stream
:将远程文件作为流检索。 -
-D
:传输成功后删除远程文件。 如果忽略传输,则不会删除远程文件,因为 is 和本地文件已存在。FileExistsMode
IGNORE
标头提供远程目录名称,标头提供文件名。file_remoteDirectory
file_remoteFile
操作生成的消息有效负载是表示检索到的文件的对象,或者当您使用该选项时。
该选项允许将文件作为流检索。
对于文本文件,一个常见的用例是将此操作与文件拆分器或流转换器结合使用。
将远程文件作为流使用时,您负责在使用流后关闭 。
为方便起见,标头中提供了 the,您可以使用 便利 方法访问以下示例显示了如何使用便利方法:get
File
InputStream
-stream
-stream
Session
Session
closeableResource
IntegrationMessageHeaderAccessor
Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}
以下示例演示如何将文件作为流使用:
<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />
<int-file:splitter input-channel="stream" output-channel="lines" />
如果您在自定义组件中使用输入流,则必须关闭 .
你可以在自定义代码中执行此操作,也可以通过将消息的副本路由到 a 并使用 SPEL 来实现,如下例所示:Session service-activator |
<int:service-activator input-channel="closeSession"
expression="headers['closeableResource'].close()" />
使用命令mget
mget
根据模式检索多个远程文件,并支持以下选项:
-
-P
:保留远程文件的时间戳。 -
-R
:递归检索整个目录树。 -
-x
:如果没有与模式匹配的文件,则引发异常(否则返回空列表)。 -
-D
:传输成功后删除每个远程文件。 如果忽略传输,则不会删除远程文件,因为 is 和本地文件已存在。FileExistsMode
IGNORE
操作生成的消息有效负载是一个对象(即一个对象,每个对象表示一个检索到的文件)。mget
List<File>
List
File
从版本 5.0 开始,如果 is ,则输出消息的有效负载不再包含由于文件已存在而未获取的文件。
以前,该列表包含所有文件,包括已存在的文件。FileExistsMode IGNORE |
用于确定远程路径的表达式应产生以 - 结尾的结果 - 例如 somedir/
将获取 下的完整树。somedir
从版本 5.0 开始,可以使用 recursive 与新模式相结合,定期在本地同步整个远程目录树。
此模式将本地文件的上次修改时间戳替换为远程时间戳,而不管 (preserve timestamp) 选项如何。mget
FileExistsMode.REPLACE_IF_MODIFIED
-P
使用递归 (
-R )该模式将被忽略,并被假定为。
默认情况下,将检索整个远程树。
但是,可以通过提供 .
树中的目录也可以通过这种方式进行筛选。
A 可以通过 reference、by 或 by 属性提供。
例如,检索远程目录和子目录中以 结尾的所有文件。
但是,下一个示例显示了一个替代方法,即 5.0 版本可用。 如果过滤了子目录,则不会对该子目录执行额外的遍历。 该选项是不允许的(递归使用递归来获取目录树,因此目录本身不能包含在列表中)。 通常,您将在 中使用该变量,以便在本地保留远程目录结构。 |
持久性文件列表过滤器现在具有 boolean 属性 。
将此属性设置为 , 还会设置 ,这意味着出站网关( 和 )上的递归操作现在每次都将始终遍历整个目录树。
这是为了解决未检测到目录树深处更改的问题。
此外,还会导致将文件的完整路径用作元数据存储键;这解决了以下问题:如果具有相同名称的文件在不同目录中多次出现,则过滤器无法正常工作。
重要说明:这意味着对于顶级目录下的文件,将无法找到持久性元数据存储中的现有键。
因此,默认情况下,该属性为;这可能会在未来版本中更改。forRecursion
true
alwaysAcceptDirectories
ls
mget
forRecursion=true
false
从版本 5.0 开始,可以通过将属性设置为 和 来将 and 配置为始终传递目录。
这样做允许对简单模式进行递归,如下例所示:FtpSimplePatternFileListFilter
FtpRegexPatternFileListFilter
alwaysAcceptDirectories
true
<bean id="starDotTxtFilter"
class="org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter">
<constructor-arg value="*.txt" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
<bean id="dotStarDotTxtFilter"
class="org.springframework.integration.ftp.filters.FtpRegexPatternFileListFilter">
<constructor-arg value="^.*\.txt$" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
定义筛选器(如前面示例中的筛选器)后,您可以通过在网关上设置属性来使用一个筛选器。filter
使用命令put
该命令将文件发送到远程服务器。
消息的有效负载可以是 、 或 。
一个 (或表达式) 用于命名远程文件。
其他可用属性包括 、 及其等效项:和 。
有关更多信息,请参阅 schema documentation.put
java.io.File
byte[]
String
remote-filename-generator
remote-directory
temporary-remote-directory
*-expression
use-temporary-file-name
auto-create-directory
操作生成的消息有效负载是 a,表示传输后文件在服务器上的完整路径。put
String
版本 5.2 引入了该属性,该属性在上传后更改远程文件权限。
您可以使用传统的 Unix 八进制格式(例如,仅允许文件所有者进行读写操作)。
使用 java 配置适配器时,可以使用 .
仅当您的 FTP 服务器支持该子命令时才适用。chmod
600
setChmod(0600)
SITE CHMOD
使用命令mput
将多个文件发送到服务器,并且仅支持一个选项:mput
-
-R
:递归的。 发送目录及其子目录中的所有文件(可能已过滤)。
消息有效负载必须是表示本地目录的 (或 )。
从版本 5.1 开始,还支持 or 的集合。java.io.File
String
File
String
此命令支持与 put
命令相同的属性。
此外,可以使用 、 、 或 之一筛选本地目录中的文件。
只要子目录本身通过过滤器,过滤器就可以使用递归。
未通过过滤器的子目录不会递归。mput-pattern
mput-regex
mput-filter
mput-filter-expression
操作生成的消息有效负载是一个对象(即传输产生的远程文件路径)。mput
List<String>
List
版本 5.2 引入了该属性,该属性允许您在上传后更改远程文件权限。
您可以使用传统的 Unix 八进制格式(例如,仅允许文件所有者进行读写操作)。
使用 Java 配置适配器时,可以使用 或 。
仅当您的 FTP 服务器支持该子命令时才适用。chmod
600
setChmodOctal("600")
setChmod(0600)
SITE CHMOD
使用命令rm
该命令将删除文件。rm
该命令没有选项。rm
操作生成的消息有效负载是删除是否成功。
标头提供远程目录,标头提供文件名。rm
Boolean.TRUE
Boolean.FALSE
file_remoteDirectory
file_remoteFile
使用命令mv
该命令可移动文件。mv
该命令没有选项。mv
该属性定义 “from” 路径,该属性定义 “to” 路径。
默认情况下,该 是 .
此表达式的计算结果不得为 null 或空 .
如有必要,将创建任何必要的远程目录。
结果消息的负载为 .
header 提供原始远程目录,header 提供文件名。
新路径位于标头中。expression
rename-expression
rename-expression
headers['file_renameTo']
String
Boolean.TRUE
file_remoteDirectory
file_remoteFile
file_renameTo
从版本 5.5.6 开始,为方便起见,可以在命令中使用 。
如果 “from” 文件不是完整的文件路径,则 result of 将用作远程目录。
这同样适用于“to”文件,例如,如果任务只是重命名某个目录中的远程文件。remoteDirectoryExpression
mv
remoteDirectoryExpression
有关 FTP 出站网关命令的其他信息
和 命令支持该属性。
它定义了一个 SPEL 表达式,用于在传输过程中生成本地文件的名称。
评估上下文的根对象是请求消息。
该变量对 特别有用,也可用 — 例如 .get
mget
local-filename-generator-expression
remoteFileName
mget
local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.something"
和 命令支持该属性。
它定义了一个 SPEL 表达式,用于在传输过程中生成本地目录的名称。
评估上下文的根对象是请求消息 but.
该变量对 特别有用,也可用 — 例如:。
此属性与该属性互斥。get
mget
local-directory-expression
remoteDirectory
mget
local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.something"
local-directory
对于所有命令,网关的 'expression' 属性提供命令操作的路径。
对于该命令,表达式的计算结果可能为 '',表示检索所有文件,或者为 'somedirectory/',依此类推。mget
以下示例显示了为命令配置的网关:ls
<int-ftp:outbound-gateway id="gateway1"
session-factory="ftpSessionFactory"
request-channel="inbound1"
command="ls"
command-options="-1"
expression="payload"
reply-channel="toSplitter"/>
发送到通道的消息的有效负载是一个对象列表,每个对象都包含一个文件名。
如果省略了该属性,则它包含对象。
它使用以空格分隔的选项 — 例如 .toSplitter
String
command-options
FileInfo
command-options="-1 -dirs -links"
从版本 4.2 开始, , 和 命令支持属性(使用命名空间支持时)。
这会影响本地文件存在( 和 )或远程文件存在( 和 )时的行为。
支持的模式包括 、 、 和 。
为了向后兼容,和 operations 的默认模式为 .
对于 和 operations,默认值为 .GET
MGET
PUT
MPUT
FileExistsMode
mode
GET
MGET
PUT
MPUT
REPLACE
APPEND
FAIL
IGNORE
PUT
MPUT
REPLACE
GET
MGET
FAIL
从版本 5.0 开始,在 ( in XML) 上提供 ( in XML) 选项。
它允许您在运行时更改客户端工作目录。
根据请求消息计算表达式。
每次网关操作后,都会恢复以前的工作目录。setWorkingDirExpression()
working-dir-expression
FtpOutboundGateway
<int-ftp:outbound-gateway>
使用 Java 配置进行配置
Spring 以下 Boot 应用程序显示了如何使用 Java 配置配置出站网关的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpOutboundGateway ftpOutboundGateway =
new FtpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
ftpOutboundGateway.setOutputChannelName("lsReplyChannel");
return ftpOutboundGateway;
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置出站网关的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpOutboundGatewaySpec ftpOutboundGateway() {
return Ftp.outboundGateway(ftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
.regexFileNameFilter("(subFtpSource|.*1.txt)")
.localDirectoryExpression("'localDirectory/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replaceFirst('ftpSource', 'localTarget')");
}
@Bean
public IntegrationFlow ftpMGetFlow(AbstractRemoteFileOutboundGateway<FTPFile> ftpOutboundGateway) {
return f -> f
.handle(ftpOutboundGateway)
.channel(c -> c.queue("remoteFileOutputChannel"));
}
}
出站网关部分成功 ( 和mget
mput
)
当您对多个文件执行操作(通过使用 和 )时,在传输一个或多个文件后的一段时间内可能会发生异常。
在这种情况下(从版本 4.2 开始),会抛出 a 。
除了通常的属性 ( 和 ),此异常还具有两个附加属性:mget
mput
PartialSuccessException
MessagingException
failedMessage
cause
-
partialResults
:传输成功的结果。 -
derivedInput
:从请求消息生成的文件列表(例如,要传输的本地文件)。mput
这些属性允许您确定哪些文件已成功传输,哪些文件未成功传输。
在 recursive 的情况下,可能有嵌套的出现次数。mput
PartialSuccessException
PartialSuccessException
请考虑以下目录结构:
root/
|- file1.txt
|- subdir/
| - file2.txt
| - file3.txt
|- zoo.txt
如果异常发生在 ,则网关引发的 具有 、 和 和 的 。
它是另一个具有 of 和 and 的 。file3.txt
PartialSuccessException
derivedInput
file1.txt
subdir
zoo.txt
partialResults
file1.txt
cause
PartialSuccessException
derivedInput
file2.txt
file3.txt
partialResults
file2.txt
FTP 会话缓存
从 Spring Integration 3.0 开始,默认情况下不再缓存会话。
终端节点不再支持该属性。
如果您希望缓存会话,则必须使用 a(如下例所示)。cache-sessions CachingSessionFactory |
在 3.0 之前的版本中,默认情况下会自动缓存会话。
可以使用一个属性来禁用自动缓存,但该解决方案没有提供配置其他会话缓存属性的方法。
例如,您无法限制创建的会话数。
为了支持该要求和其他配置选项,添加了 a。
它提供 和 properties.
该属性控制工厂在其缓存中维护的活动会话数(默认值为无界)。
如果已达到阈值,则任何获取另一个会话的尝试都将阻止,直到其中一个缓存会话变为可用或会话的等待时间到期(默认等待时间为 )。
该属性配置该值。cache-sessions
CachingSessionFactory
sessionCacheSize
sessionWaitTimeout
sessionCacheSize
sessionCacheSize
Integer.MAX_VALUE
sessionWaitTimeout
如果要缓存会话,请按照前面所述配置默认会话工厂,然后将其包装在 的实例中,您可以在其中提供这些附加属性。
以下示例显示了如何执行此操作:CachingSessionFactory
<bean id="ftpSessionFactory" class="o.s.i.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost"/>
</bean>
<bean id="cachingSessionFactory" class="o.s.i.file.remote.session.CachingSessionFactory">
<constructor-arg ref="ftpSessionFactory"/>
<constructor-arg value="10"/>
<property name="sessionWaitTimeout" value="1000"/>
</bean>
前面的示例显示了一个 created,设置为 to 并将 设置为 1 秒(其值以毫秒为单位)。CachingSessionFactory
sessionCacheSize
10
sessionWaitTimeout
从 Spring Integration 3.0 开始,它提供了一个方法。
调用时,所有空闲会话将立即关闭,而正在使用的会话将在返回到缓存时关闭。
对会话的新请求会根据需要建立新会话。CachingConnectionFactory
resetCache()
从版本 5.1 开始,具有新属性 。
如果为 true,则将通过发送 NOOP 命令来测试会话,以确保它仍然处于活动状态;否则,将从缓存中删除它;如果缓存中没有活动会话,则会创建一个新会话。CachingSessionFactory
testSession
用RemoteFileTemplate
从 Spring Integration 3.0 开始,在对象上提供了一个新的抽象。
该模板提供了发送、检索(作为)、删除和重命名文件的方法。
此外,还提供了一个方法,允许调用方对会话执行多个操作。
在所有情况下,模板都会可靠地关闭会话。
有关更多信息,请参阅 RemoteFileTemplate
的 Javadoc。
FTP 有一个子类: .FtpSession
InputStream
execute
FtpRemoteFileTemplate
版本 4.1 添加了其他方法,包括 ,它提供对底层的访问,从而允许您访问低级 API。getClientInstance()
FTPClient
并非所有 FTP 服务器都能正确实现该命令。
有些 API 会为不存在的路径返回 positive 结果。
当路径为文件且存在时,该命令会可靠地返回名称。
但是,这不支持检查是否存在空目录,因为当路径为目录时,总是返回空列表。
由于模板不知道路径是否代表目录,因此当路径似乎不存在时(使用 时),它必须执行其他检查。
这会增加开销,需要向服务器发出多个请求。
从版本 4.1.9 开始,该 提供了具有以下选项的属性:STAT <path>
NLST
NLST
NLST
FtpRemoteFileTemplate
FtpRemoteFileTemplate.ExistsMode
-
STAT
:执行 FTP 命令 () 检查路径是否存在。 这是默认设置,并要求 FTP 服务器正确支持该命令(带路径)。STAT
FTPClient.getStatus(path)
STAT
-
NLST
:执行 FTP 命令 — 。 如果要测试的路径是文件的完整路径,请使用此选项。 它不适用于空目录。NLST
FTPClient.listName(path)
-
NLST_AND_DIRS
:首先执行该命令,如果它没有返回任何文件,则回退到使用 临时切换工作目录的技术。 有关更多信息,请参见FtpSession.exists()
。NLST
FTPClient.changeWorkingDirectory(path)
由于我们知道情况总是只寻找一个文件(而不是一个目录),因此我们安全地对 and 组件使用 mode。FileExistsMode.FAIL
NLST
FtpMessageHandler
FtpOutboundGateway
对于任何其他情况,可以扩展 the 以在重写的方法中实现自定义逻辑。FtpRemoteFileTemplate
exist()
从版本 5.0 开始,可以使用新方法。
此方法允许在相同的线程绑定 .
当您需要执行 as one unit of work 的多个高级操作时,这非常有用。
例如,将它与命令实现一起使用,其中我们对提供的目录中的每个文件执行操作,并对其子目录递归执行操作。
有关更多信息,请参阅 Javadoc。RemoteFileOperations.invoke(OperationsCallback<F, T> action)
RemoteFileOperations
Session
RemoteFileTemplate
AbstractRemoteFileOutboundGateway
mput
put
用MessageSessionCallback
从 Spring Integration 4.2 开始,你可以使用带有(在 Java 中)的实现来对上下文执行任何操作。
它可用于任何非标准或低级 FTP 操作,并允许从集成流定义和功能接口 (Lambda) 实施注入进行访问,如下例所示:MessageSessionCallback<F, T>
<int-ftp:outbound-gateway/>
FtpOutboundGateway
Session<FTPFile>
requestMessage
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler ftpOutboundGateway(SessionFactory<FTPFile> sessionFactory) {
return new FtpOutboundGateway(sessionFactory,
(session, requestMessage) -> session.list(requestMessage.getPayload()));
}
另一个示例可能是对正在发送或检索的文件数据进行预处理或后处理。
使用 XML 配置时,会提供一个属性,让您指定 Bean 名称。<int-ftp:outbound-gateway/>
session-callback
MessageSessionCallback
这与 和 属性互斥。
使用 Java 进行配置时,FtpOutboundGateway 类中提供了不同的构造函数。session-callback command expression |
Apache Mina FTP 服务器事件
版本 5.2 中添加的 ,侦听某些 Apache Mina FTP 服务器事件并将它们发布为可以由任何 bean、bean 方法或事件入站通道适配器接收的 s。ApacheMinaFtplet
ApplicationEvent
ApplicationListener
@EventListener
目前,支持的事件包括:
-
SessionOpenedEvent
- 已打开客户端会话 -
DirectoryCreatedEvent
- 已创建目录 -
FileWrittenEvent
- 文件已写入 -
PathMovedEvent
- 文件或目录已重命名 -
PathRemovedEvent
- 文件或目录已被删除 -
SessionClosedEvent
- 客户端已断开连接
它们中的每一个都是 ;您可以配置单个侦听器来接收所有事件类型。
每个事件的属性都是 ,您可以从中获取客户端地址等信息;在 Abstract 事件上提供了一个方便的方法。ApacheMinaFtpEvent
source
FtpSession
getSession()
会话打开/关闭以外的事件具有另一个属性,该属性具有 command 和 arguments 等属性。FtpRequest
要使用侦听器(必须是 Spring bean)配置服务器,请将其添加到服务器工厂中:
FtpServerFactory serverFactory = new FtpServerFactory();
...
ListenerFactory factory = new ListenerFactory();
...
serverFactory.addListener("default", factory.createListener());
serverFactory.setFtplets(new HashMap<>(Collections.singletonMap("springFtplet", apacheMinaFtpletBean)));
server = serverFactory.createServer();
server.start();
要使用 Spring 集成事件适配器来使用这些事件:
@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
ApplicationEventListeningMessageProducer producer =
new ApplicationEventListeningMessageProducer();
producer.setEventTypes(ApacheMinaFtpEvent.class);
producer.setOutputChannel(eventChannel());
return producer;
}
远程文件信息
从版本 5.2 开始,(FTP Streaming Inbound Channel Adapter)、(FTP Inbound Channel Adapter) 和 (FTP Outbound Gateway) 的 “read” 命令在消息中提供额外的标头,以生成有关远程文件的信息:FtpStreamingMessageSource
FtpInboundFileSynchronizingMessageSource
FtpOutboundGateway
-
FileHeaders.REMOTE_HOST_PORT
- 在文件传输操作期间,远程会话已连接到的 host:port 对; -
FileHeaders.REMOTE_DIRECTORY
- 已执行操作的远程目录; -
FileHeaders.REMOTE_FILE
- 远程文件名;仅适用于单文件操作。
由于 不会针对远程文件生成消息,而是使用本地副本,因此在同步操作期间,它会以 URI 样式 () 将有关远程文件的信息存储在 (可以在外部配置) 中。
轮询本地文件时,将检索此元数据。
删除本地文件时,建议删除其元数据条目。
为此,它提供了一个回调。
此外,还有一个 to be used 在元数据键中。
当这些组件之间共享同一实例时,建议将此前缀与基于 -的实施中使用的前缀不同,以避免条目覆盖,因为两者都会筛选并对元数据入口键使用相同的本地文件名。FtpInboundFileSynchronizingMessageSource
AbstractInboundFileSynchronizer
MetadataStore
protocol://host:port/remoteDirectory#remoteFileName
FtpInboundFileSynchronizingMessageSource
AbstractInboundFileSynchronizer
removeRemoteFileMetadata()
setMetadataStorePrefix()
MetadataStore
FileListFilter
MetadataStore
AbstractInboundFileSynchronizer