SFTP 适配器
SFTP 适配器
Spring 集成支持通过 SFTP 进行文件传输操作。
安全文件传输协议 (SFTP) 是一种网络协议,可让您通过任何可靠的流在 Internet 上的两台计算机之间传输文件。
SFTP 协议需要一个安全通道(如 SSH),并在整个 SFTP 会话中对客户端身份的可见性。
Spring 集成通过提供三个客户端端点来支持通过 SFTP 发送和接收文件:入站通道适配器、出站通道适配器和出站网关。 它还提供了方便的命名空间配置来定义这些客户端组件。
从版本 6.0 开始,过时的 JCraft JSch 客户端已被现代 Apache MINA SSHD 框架取代。
这导致框架组件发生了很多重大变化。
但是,在大多数情况下,这样的迁移隐藏在 Spring Integration API 后面。
最剧烈的变化发生在 a 上,它现在基于 并公开了一些 if 它的配置属性。DefaultSftpSessionFactory org.apache.sshd.client.SshClient |
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-sftp</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-sftp:6.1.9"
要在 xml 配置中包含 SFTP 命名空间,请在 root 元素上包含以下属性:
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/sftp
https://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd"
SFTP 会话工厂
从版本 3.0 开始,默认情况下不再缓存会话。 请参阅 SFTP 会话缓存。 |
在配置 SFTP 适配器之前,必须配置 SFTP 会话工厂。 您可以使用常规 bean 定义配置 SFTP 会话工厂,如下例所示:
<beans:bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<beans:property name="host" value="localhost"/>
<beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/>
<beans:property name="privateKeyPassphrase" value="springIntegration"/>
<beans:property name="port" value="22"/>
<beans:property name="user" value="kermit"/>
</beans:bean>
每次适配器从其 请求会话对象时,都会创建一个新的 SFTP 会话。
实际上,SFTP Session Factory 依赖于 Apache MINA SSHD 库来提供 SFTP 功能。SessionFactory
但是, Spring 集成还支持 SFTP 会话的缓存。 有关更多信息,请参阅 SFTP 会话缓存。
可以使用外部配置或扩展的 .
例如,库中的扩展可用于提供对 HTTP/SOCKS 代理的支持。DefaultSftpSessionFactory SshClient org.eclipse.jgit.internal.transport.sshd.JGitSshClient org.eclipse.jgit:org.eclipse.jgit.ssh.apache |
支持通过与服务器的连接使用多个通道(操作)。
默认情况下, Spring 集成会话工厂为每个通道使用单独的物理连接。
从 Spring Integration 3.0 开始,您可以配置会话工厂(使用布尔构造函数 arg - default )以使用与服务器的单个连接,并在该单个连接上创建多个实例。 使用此功能时,必须将 session factory 包装在缓存 session factory 中,如下所述,以便在操作完成时不会物理关闭连接。 如果重置缓存,则仅当最后一个通道关闭时,会话才会断开连接。 如果在新操作获取会话时发现连接已断开连接,则会刷新连接。 |
现在您需要做的就是将此 SFTP 会话工厂注入到您的适配器中。
为 SFTP 会话工厂提供值的更实用的方法是使用 Spring 的属性占位符支持。 |
配置属性
以下列表描述了 DefaultSftpSessionFactory
公开的所有属性。
isSharedSession
(constructor argument)::当 ,则 single 用于所有请求的实例。
它默认为 .true
SftpClient
SftpSession
false
sftpVersionSelector
::用于 SFTP 协议选择的实例。
默认值为 .SftpVersionSelector
SftpVersionSelector.CURRENT
host
::要连接到的主机的 URL。
必填。
hostConfig
::作为 user/host/port 选项的替代实例。
可以使用代理跳转属性进行配置。org.apache.sshd.client.config.hosts.HostConfigEntry
port
::应通过其建立 SFTP 连接的端口。
如果未指定,则此值默认为 。
如果指定,则此属性必须为正数。22
user
::要使用的远程用户。
必填。
knownHostsResource
::用于主机密钥存储库的 An。
资源的内容必须与 OpenSSH 文件的格式相同,并且是必需的,如果为 false,则必须预先填充。org.springframework.core.io.Resource
known_hosts
allowUnknownKeys
password
::用于对远程主机进行身份验证的密码。
如果未提供密码,则该属性是必需的。privateKey
privateKey
::表示用于对远程主机进行身份验证的私钥的位置。
如果未提供 the,则 property 是必需的。org.springframework.core.io.Resource
privateKey
password
privateKeyPassphrase
::私钥的密码。
如果设置 ,则不允许 。
密码是从该对象获取的。
自选。userInfo
privateKeyPassphrase
timeout
::timeout 属性用作套接字超时参数,以及默认连接超时。
默认为 ,这意味着不会发生超时。0
allowUnknownKeys
::设置为 以允许连接到具有未知(或更改)键的主机。
它的默认值是 'false'。
如果 ,则需要预填充的文件。true
false
knownHosts
userInteraction
::身份验证期间使用的自定义。org.apache.sshd.client.auth.keyboard.UserInteraction
委派 Session Factory
版本 4.2 引入了 ,它允许在运行时选择实际的会话工厂。
在调用 SFTP 端点之前,您可以调用工厂以将密钥与当前线程关联。
然后,该键用于查找要使用的实际会话工厂。
您可以通过 call after use 清除密钥。DelegatingSessionFactory
setThreadKey()
clearThreadKey()
我们添加了便捷的方法,以便您可以更轻松地从消息流执行此操作,如下例所示:
<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
<constructor-arg>
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
<!-- delegate factories here -->
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel="in" output-channel="c1"
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />
<int-sftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />
<int:service-activator input-channel="c2" output-channel="out"
expression="@dsf.clearThreadKey(#root)" />
使用会话缓存时(请参阅 SFTP 会话缓存),应缓存每个委托。
您无法缓存 itself.DelegatingSessionFactory |
从版本 5.0.7 开始,可以与 结合使用以轮询多个服务器;请参见入站通道适配器:轮询多个服务器和目录。DelegatingSessionFactory
RotatingServerAdvice
SFTP 会话缓存
从 Spring 集成版本 3.0 开始,默认情况下不再缓存会话。
终端节点不再支持该属性。
如果要缓存会话,则必须使用 (请参阅下一个示例)。cache-sessions CachingSessionFactory |
在 3.0 之前的版本中,默认情况下会自动缓存会话。
可以使用一个属性来禁用自动缓存,但该解决方案没有提供配置其他会话缓存属性的方法。
例如,您不能限制创建的会话数。
为了支持该要求和其他配置选项,我们添加了一个 .
它提供 和 properties.
顾名思义,该属性控制工厂在其缓存中维护多少个活动会话(默认是无界的)。
如果已达到阈值,则任何获取另一个会话的尝试都将阻止,直到其中一个缓存会话变为可用或会话的等待时间到期(默认等待时间为 )。
该属性启用等待时间的配置。cache-sessions
CachingSessionFactory
sessionCacheSize
sessionWaitTimeout
sessionCacheSize
sessionCacheSize
Integer.MAX_VALUE
sessionWaitTimeout
如果你希望缓存你的会话,请配置你的默认会话工厂(如前所述),然后将其包装在一个你可以提供这些附加属性的实例中。
以下示例显示了如何执行此操作:CachingSessionFactory
<bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="localhost"/>
</bean>
<bean id="cachingSessionFactory"
class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg ref="sftpSessionFactory"/>
<constructor-arg value="10"/>
<property name="sessionWaitTimeout" value="1000"/>
</bean>
前面的示例创建一个 设置为 且设置为 1 秒(1000 毫秒)。CachingSessionFactory
sessionCacheSize
10
sessionWaitTimeout
从 Spring 集成版本 3.0 开始,提供了一个方法。
调用时,所有空闲会话将立即关闭,而正在使用的会话将在返回到缓存时关闭。
使用 时,通道将关闭,并且共享会话仅在最后一个通道关闭时关闭。
对会话的新请求会根据需要建立新会话。CachingConnectionFactory
resetCache()
isSharedSession=true
从版本 5.1 开始,具有新属性 。
如果为 true,则将通过对空路径执行命令来测试会话,以确保它仍然处于活动状态;否则,将从缓存中删除它;如果缓存中没有活动会话,则会创建一个新会话。CachingSessionFactory
testSession
REALPATH
用RemoteFileTemplate
Spring 集成版本 3.0 提供了对对象的新抽象。
该模板提供了发送、检索(作为)、删除和重命名文件的方法。
此外,我们还提供了一种方法,让调用方对会话运行多个操作。
在所有情况下,模板都会可靠地关闭会话。
有关更多信息,请参阅 RemoteFileTemplate
的 Javadoc SFTP 有一个子类:SftpRemoteFileTemplate
。SftpSession
InputStream
execute
我们在版本 4.1 中添加了其他方法,包括 .
它提供对底层 的访问,从而允许访问低级 API。getClientInstance()
ChannelSftp
版本 5.0 引入了该方法。
此方法允许在同一线程 bounded 的范围内调用多个调用。
当您需要执行 as one unit of work 的多个高级操作时,这非常有用。
例如,将它与命令实现一起使用,其中我们对提供的目录中的每个文件执行操作,并对其子目录递归执行操作。
有关更多信息,请参阅 Javadoc。RemoteFileOperations.invoke(OperationsCallback<F, T> action)
RemoteFileOperations
Session
RemoteFileTemplate
AbstractRemoteFileOutboundGateway
mput
put
SFTP 入站通道适配器
SFTP 入站通道适配器是一个特殊的侦听器,它连接到服务器并侦听远程目录事件(例如正在创建新文件),此时它将启动文件传输。 以下示例说明如何配置 SFTP 入站通道适配器:
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
前面的配置示例显示了如何为各种属性提供值,包括以下内容:
-
local-directory
:文件将要传输到的位置 -
remote-directory
:要从中传输文件的远程源目录 -
session-factory
:对我们之前配置的 bean 的引用
默认情况下,传输的文件与原始文件同名。
如果要覆盖此行为,可以设置该属性,该属性允许您提供 SPEL 表达式来生成本地文件的名称。
与出站网关和适配器不同,其中 SPEL 评估上下文的根对象是 ,此入站适配器在评估时还没有消息,因为这是它最终以传输的文件作为其有效负载生成的消息。
因此,SPEL 评估上下文的根对象是远程文件的原始名称(a )。local-filename-generator-expression
Message
String
入站通道适配器首先将文件检索到本地目录,然后根据 Poller 配置发出每个文件。
从版本 5.0 开始,当需要检索新文件时,您可以限制从 SFTP 服务器获取的文件数。
当目标文件很大或在具有持久文件列表过滤器的集群系统中运行时,这可能是有益的,本节稍后将讨论。
用于此目的。
负值(默认值)表示没有限制,并且将检索所有匹配的文件。
有关更多信息,请参见入站通道适配器:控制远程文件获取。
从版本 5.0 开始,您还可以通过设置 attribute 来为 提供自定义实现。max-fetch-size
DirectoryScanner
inbound-channel-adapter
scanner
从 Spring Integration 3.0 开始,您可以指定属性(默认为 )。
当 时,本地文件的修改时间戳设置为从服务器检索的值。
否则,它将设置为当前时间。preserve-timestamp
false
true
从版本 4.2 开始,您可以指定 instead 而不是 ,这样您就可以动态确定每个轮询的目录 — 例如, .remote-directory-expression
remote-directory
remote-directory-expression="@myBean.determineRemoteDir()"
有时,基于 via 属性指定的简单模式的文件过滤可能还不够。
如果是这种情况,则可以使用该属性指定正则表达式(例如 )。
如果需要完全控制,则可以使用该属性提供对 的自定义实现的引用,该实现是用于筛选文件列表的策略接口。
此筛选器确定要检索的远程文件。
您还可以使用 .filename-pattern
filename-regex
filename-regex=".*\.test$"
filter
org.springframework.integration.file.filters.FileListFilter
AcceptOnceFileListFilter
CompositeFileListFilter
将其状态存储在内存中。
如果您希望该状态在系统重启后继续存在,请考虑改用 。
此筛选条件将接受的文件名存储在策略的实例中(请参阅 元数据存储)。
此过滤器匹配文件名和远程修改时间。AcceptOnceFileListFilter
SftpPersistentAcceptOnceFileListFilter
MetadataStore
从 4.0 版本开始,此过滤器需要一个 .
当与共享数据存储一起使用时(例如与 ),这允许在多个应用程序或服务器实例之间共享筛选键。ConcurrentMetadataStore
Redis
RedisMetadataStore
从版本 5.0 开始,默认情况下,将对 .
此过滤器还与 XML 配置中的 or 选项一起应用,以及 Java DSL 中的 through。
您可以使用 (或 ) 处理任何其他用例。SftpPersistentAcceptOnceFileListFilter
SimpleMetadataStore
SftpInboundFileSynchronizer
regex
pattern
SftpInboundChannelAdapterSpec
CompositeFileListFilter
ChainFileListFilter
上面的讨论是指在检索文件之前过滤文件。 检索文件后,将对文件系统上的文件应用额外的过滤器。 默认情况下,这是一个 'AcceptOnceFileListFilter',如本节所述,它将状态保留在内存中,并且不考虑文件的修改时间。 除非您的应用程序在处理后删除文件,否则默认情况下,适配器会在应用程序重新启动后重新处理磁盘上的文件。
此外,如果将 配置为使用 a 并且远程文件时间戳更改(导致重新获取),则默认本地过滤器不允许处理此新文件。filter
SftpPersistentAcceptOnceFileListFilter
有关此筛选器及其使用方法的更多信息,请参阅远程持久性文件列表筛选器。
您可以使用该属性来配置本地文件系统过滤器的行为。
从版本 4.3.8 开始,默认配置 a。
此筛选条件将接受的文件名和修改后的时间戳存储在策略的实例中(请参阅元数据存储),并检测对本地文件修改时间的更改。
默认值是将状态存储在内存中的 a。local-filter
FileSystemPersistentAcceptOnceFileListFilter
MetadataStore
MetadataStore
SimpleMetadataStore
从版本 4.1.5 开始,这些过滤器有一个名为 的新属性,该属性使它们刷新
元数据存储(如果存储实现)。flushOnUpdate
Flushable
此外,如果您使用分布式(例如 Redis 元数据存储),则可以拥有同一适配器或应用程序的多个实例,并确保一个且只有一个实例处理一个文件。MetadataStore |
实际的本地过滤器是包含提供的过滤器和模式过滤器的 a,该过滤器阻止处理正在下载的文件(基于 )。
下载带有此后缀的文件(默认为 ),并在传输完成后将文件重命名为其最终名称,使其对过滤器“可见”。CompositeFileListFilter
temporary-file-suffix
.writing
有关这些属性的更多详细信息,请参阅 schema.
SFTP 入站通道适配器是轮询使用者。
因此,你必须配置一个 Poller (全局默认值或本地元素)。
将文件传输到本地目录后,将生成一条 payload type 为的消息,并将其发送到由属性标识的通道。java.io.File
channel
详细了解文件筛选和大文件
有时,刚刚出现在受监视 (远程) 目录中的文件并不完整。
通常,此类文件使用某个临时扩展名(例如在 名为 的 file 上)写入,然后在写入过程完成后重命名。
在大多数情况下,开发人员只对完整的文件感兴趣,并且只想筛选这些文件。
要处理这些情况,您可以使用 、 和 属性提供的筛选支持。
如果您需要自定义过滤器实现,可以通过设置 attribute 在适配器中包含引用。
以下示例显示了如何执行此操作:.writing
something.txt.writing
filename-pattern
filename-regex
filter
filter
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
从故障中恢复
您应该了解适配器的架构。
文件同步器获取文件,并为每个同步文件发出一条消息。
如前所述,涉及两个过滤器。
属性 (和模式) 引用远程 (SFTP) 文件列表,以避免获取已获取的文件。
使用 来确定哪些文件将作为消息发送。FileReadingMessageSource
filter
FileReadingMessageSource
local-filter
同步器列出远程文件并查阅其过滤器。
然后传输文件。
如果在文件传输过程中发生 IO 错误,则会删除已添加到过滤器的任何文件,以便它们有资格在下次轮询时重新获取。
仅当过滤器实现 (如 ) 时,这才适用。ReversibleFileListFilter
AcceptOnceFileListFilter
如果在同步文件后,处理文件的下游流发生错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。
如果您希望在失败后重新处理此类文件,可以使用类似于以下内容的配置,以便于从过滤器中删除失败的文件:
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 .ResettableFileListFilter
从版本 5.0 开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。
那也可以是远程子路径。
为了能够递归读取本地目录以根据层次结构支持进行修改,您现在可以根据算法为 internal 提供新的目录。
有关更多信息,请参见AbstractInboundFileSynchronizingMessageSource.setScanner()。
此外,您现在可以将 切换到 -based by using 选项。
它还为所有实例配置了 ID,以便对 local directory 中的任何修改做出反应。
前面显示的 reprocessing 示例基于 的内置功能,该功能使用从本地目录中删除文件时 ()。
有关更多信息,请参阅 WatchServiceDirectoryScanner
。FileReadingMessageSource
RecursiveDirectoryScanner
Files.walk()
AbstractInboundFileSynchronizingMessageSource
WatchService
DirectoryScanner
setUseWatchService()
WatchEventType
FileReadingMessageSource.WatchServiceDirectoryScanner
ResettableFileListFilter.remove()
StandardWatchEventKinds.ENTRY_DELETE
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
SFTP 流入站频道适配器
版本 4.3 引入了流入站通道适配器。
此适配器生成 payloads 为 类型的 message ,允许您获取文件而无需写入本地文件系统。
由于会话保持打开状态,因此使用应用程序负责在使用文件时关闭会话。
会话在标头 () 中提供。
标准框架组件(如 和 )会自动关闭会话。
有关这些组件的更多信息,请参见File Splitter 和 Stream Transformer。
以下示例显示如何配置 SFTP 流入站通道适配器:InputStream
closeableResource
IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
FileSplitter
StreamTransformer
<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>
您只能使用 、 、 或 中的一个。filename-pattern
filename-regex
filter
filter-expression
从版本 5.0 开始,默认情况下,适配器通过使用 based on the in-memory 来防止远程文件重复。
默认情况下,此过滤器也与文件名模式(或 regex)一起应用。
如果需要允许重复项,可以使用 .
您可以使用 (或 ) 处理任何其他使用案例。
后面显示的 Java 配置显示了一种在处理后删除远程文件以避免重复的技术。SftpStreamingMessageSource SftpPersistentAcceptOnceFileListFilter SimpleMetadataStore AcceptAllFileListFilter CompositeFileListFilter ChainFileListFilter |
有关 的更多信息,请参见远程持久性文件列表过滤器。SftpPersistentAcceptOnceFileListFilter
当需要提取时,你可以使用该属性来限制每次轮询时提取的文件数。
将其设置为在集群环境中运行时使用持久过滤器。
有关更多信息,请参见入站通道适配器:控制远程文件获取。max-fetch-size
1
适配器将远程目录和文件名放在 headers ( 和 , ) 中。
从版本 5.0 开始,标头提供额外的远程文件信息(以 JSON 格式)。
如果在 to 上设置属性,则标头包含一个对象。
您可以使用该方法访问底层提供的对象。
当您使用 XML 配置时,该属性不可用,但您可以通过将 the 注入到您的一个配置类中来设置它。
另请参阅远程文件信息。FileHeaders.REMOTE_DIRECTORY
FileHeaders.REMOTE_FILE
FileHeaders.REMOTE_FILE_INFO
fileInfoJson
SftpStreamingMessageSource
false
SftpFileInfo
SftpClient.DirEntry
SftpClient
SftpFileInfo.getFileInfo()
fileInfoJson
SftpStreamingMessageSource
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory("sftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
请注意,在此示例中,转换器下游的消息处理程序具有一个,该处理程序在处理后删除远程文件。advice
入站通道适配器:轮询多个服务器和目录
从版本 5.0.7 开始,可以使用;当配置为 Poller Advice 时,入站适配器可以轮询多个服务器和目录。
配置建议并将其正常添加到 Poller 的建议链中。
A 用于选择服务器,有关更多信息,请参阅 Delegating Session Factory。
通知配置由对象列表组成。RotatingServerAdvice
DelegatingSessionFactory
RotationPolicy.KeyDirectory
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
此建议将轮询服务器上的目录,直到没有新文件存在,然后移动到 directory ,然后移动到 server 上的 directory ,依此类推。foo
one
bar
baz
two
可以使用构造函数 arg 修改此默认行为:fair
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
在这种情况下,无论上一个 poll 是否返回文件,通知都将移动到下一个服务器/目录。
或者,您也可以提供自己的信息以根据需要重新配置消息源:RotationPolicy
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
和
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
属性 ( 在 synchronizer 上) 现在可以包含变量。
这允许将从不同目录中检索到的文件下载到本地的类似目录:local-filename-generator-expression
localFilenameGeneratorExpression
#remoteDirectory
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Sftp.inboundAdapter(sf())
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
使用此建议时,不要在 Poller 上配置 a;有关更多信息,请参见Conditional Pollers for Message Sources。TaskExecutor |
入站通道适配器:控制远程文件获取
在配置入站通道适配器时,应考虑两个属性。与所有 Poller 一样,可用于限制每次轮询时发出的消息数(如果已准备好超过配置的值)。 (自版本 5.0 起)可以限制一次从远程服务器检索的文件数。max-messages-per-poll
max-fetch-size
以下场景假定起始状态为空的本地目录:
-
max-messages-per-poll=2
and:适配器获取一个文件,发出该文件,获取下一个文件,然后发出该文件。 然后它会休眠,直到下一次轮询。max-fetch-size=1
-
max-messages-per-poll=2
和 ):适配器获取这两个文件,然后发出每个文件。max-fetch-size=2
-
max-messages-per-poll=2
和:适配器最多获取 4 个文件(如果可用)并发出前两个文件(如果至少有两个)。 接下来的两个文件将在下一次轮询时发出。max-fetch-size=4
-
max-messages-per-poll=2
和 not specified:适配器获取所有远程文件并发出前两个(如果至少有两个)。 后续文件将在后续轮询时发出(一次两个)。 当所有文件都被消耗掉时,将再次尝试远程获取以获取任何新文件。max-fetch-size
当您部署应用程序的多个实例时,我们建议设置一个小的 ,以避免一个实例“抓取”所有文件并耗尽其他实例。max-fetch-size |
另一个用途是当您想要停止获取远程文件但继续处理已获取的文件时。
在 上设置属性(以编程方式、通过 JMX 或通过控制总线)可以有效地阻止适配器获取更多文件,但允许 Poller 继续为以前获取的文件发出消息。
如果在更改属性时 poller 处于活动状态,则更改将在下一次轮询时生效。max-fetch-size
maxFetchSize
MessageSource
从版本 5.1 开始,可以为同步器提供 .
这在限制使用 获取的文件数量时非常有用。Comparator<?>
maxFetchSize
SFTP 出站通道适配器
SFTP 出站通道适配器是一个特殊的适配器,它连接到远程目录,并为它作为传入的有效负载接收的每个文件启动文件传输。
它还支持文件的多种表示形式,因此您不仅限于对象。
与 FTP 出站适配器类似,SFTP 出站通道适配器支持以下有效负载:MessageHandler
Message
File
-
java.io.File
:实际文件对象 -
byte[]
:表示文件内容的字节数组 -
java.lang.String
:表示文件内容的文本 -
java.io.InputStream
:要传输到远程文件的数据流 -
org.springframework.core.io.Resource
:用于将数据传输到远程文件的资源
以下示例说明如何配置 SFTP 出站通道适配器:
<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter"
session-factory="sftpSessionFactory"
channel="inputChannel"
charset="UTF-8"
remote-file-separator="/"
remote-directory="foo/bar"
remote-filename-generator-expression="payload.getName() + '-mysuffix'"
filename-generator="fileNameGenerator"
use-temporary-filename="true"
chmod="600"
mode="REPLACE"/>
有关这些属性的更多详细信息,请参阅 schema.
SPEL 和 SFTP 出站适配器
与 Spring 集成中的许多其他组件一样,在配置 SFTP 出站通道适配器时,可以通过指定两个属性来使用 Spring 表达式语言(SPEL):和(如前所述)。
表达式评估上下文将消息作为其根对象,这允许您使用表达式,这些表达式可以根据消息中的数据(来自“payload”或“headers”)动态计算文件名或现有目录路径。
在前面的示例中,我们使用 expression 值定义属性,该值根据文件名的原始名称计算文件名,同时还附加后缀:'-mysuffix'。remote-directory-expression
remote-filename-generator-expression
remote-filename-generator-expression
从版本 4.1 开始,您可以指定何时传输文件。
默认情况下,现有文件将被覆盖。
模式由枚举定义,其中包括以下值:mode
FileExistsMode
-
REPLACE
(默认) -
REPLACE_IF_MODIFIED
-
APPEND
-
APPEND_NO_FLUSH
-
IGNORE
-
FAIL
使用 和 ,不会传输文件。 导致引发异常,同时静默忽略传输(尽管生成了日志条目)。IGNORE
FAIL
FAIL
IGNORE
DEBUG
版本 4.3 引入了该属性,您可以使用该属性在上传后更改远程文件权限。
您可以使用传统的 Unix 八进制格式(例如,仅允许文件所有者进行读写操作)。
使用 java 配置适配器时,可以使用 或 。chmod
600
setChmodOctal("600")
setChmod(0600)
避免部分写入的文件
处理文件传输时的一个常见问题是处理部分文件的可能性。 文件可能在传输实际完成之前就出现在文件系统中。
为了解决这个问题, Spring 集成 SFTP 适配器使用一种通用算法,其中文件以临时名称传输,并在完全传输后重命名。
默认情况下,正在传输的每个文件都会显示在文件系统中,并带有一个附加后缀,默认情况下,该后缀为 .
您可以通过设置属性来更改。.writing
temporary-file-suffix
但是,在某些情况下,您可能不想使用此技术(例如,如果服务器不允许重命名文件)。
对于此类情况,您可以通过设置为 (默认为 ) 来禁用此功能。
当此属性为 时,文件将以其最终名称写入,并且使用应用程序需要一些其他机制来检测文件是否已完全上传,然后再访问它。use-temporary-file-name
false
true
false
使用 Java 配置进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java 配置出站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class SftpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToSftp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<SftpClient.DirEntry>(factory);
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message<?> message) {
return "handlerContent.test";
}
});
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toSftpChannel")
void sendToSftp(File file);
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpOutboundFlow() {
return IntegrationFlow.from("toSftpChannel")
.handle(Sftp.outboundAdapter(this.sftpSessionFactory, FileExistsMode.FAIL)
.useTemporaryFileName(false)
.remoteDirectory("/foo")
).get();
}
}
SFTP 出站网关
SFTP 出站网关提供了一组有限的命令,允许您与远程 SFTP 服务器进行交互:
-
ls
(列出文件) -
nlst
(列出文件名) -
get
(检索文件) -
mget
(检索多个文件) -
rm
(删除文件) -
mv
(移动和重命名文件) -
put
(发送文件) -
mput
(发送多个文件)
使用命令ls
ls
列出远程文件并支持以下选项:
-
-1
:检索文件名列表。 默认值是检索对象列表FileInfo
-
-a
:包括所有文件(包括以“.”开头的文件) -
-f
:不对列表进行排序 -
-dirs
:包括目录(默认排除) -
-links
:包括符号链接(默认排除) -
-R
:递归列出远程目录
此外,文件名筛选的提供方式与 .inbound-channel-adapter
操作产生的消息有效负载是文件名列表或对象列表(取决于是否使用 switch)。
这些对象提供修改时间、权限等信息。ls
FileInfo
-1
标头中提供了命令操作的远程目录。ls
file_remoteDirectory
使用递归选项 () 时,它包括任何子目录元素,并表示文件的相对路径(相对于远程目录)。
如果使用该选项,则每个递归目录也会作为列表中的一个元素返回。
在这种情况下,我们建议您不要使用该选项,因为您将无法区分文件和目录,而使用对象时可以这样做。-R
fileName
-dirs
-1
FileInfo
如果 remote path to list 以符号开头,则 SFTP 会将其视为绝对路径;without - 作为当前用户 home 中的相对路径。/
使用命令nlst
版本 5 引入了对该命令的支持。nlst
nlst
列出远程文件名,并且仅支持一个选项:
-
-f
:不对列表进行排序
操作生成的消息有效负载是文件名列表。nlst
标头包含命令所作用于的远程目录。file_remoteDirectory
nlst
SFTP 协议不提供列出名称的功能。
此命令等效于带有 option 的命令,为方便起见,在此处添加。ls
-1
使用命令get
get
检索远程文件并支持以下选项:
-
-P
:保留远程文件的时间戳。 -
-stream
:将远程文件作为流检索。 -
-D
:传输成功后删除远程文件。 如果忽略传输,则不会删除远程文件,因为 is 和本地文件已存在。FileExistsMode
IGNORE
header 包含远程目录,header 包含文件名。file_remoteDirectory
file_remoteFile
操作生成的消息负载是表示检索到的文件的对象。
如果使用该选项,则有效负载为 an 而不是 .
对于文本文件,一个常见的用例是将此操作与文件拆分器或流转换器结合使用。
将远程文件作为流使用时,您负责在使用流后关闭 。
为方便起见,在 header 中提供了 the,并提供便捷的方法:get
File
-stream
InputStream
File
Session
Session
closeableResource
IntegrationMessageHeaderAccessor
Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}
框架组件(如 File Splitter 和 Stream Transformer)在传输数据后自动关闭会话。
以下示例演示如何将文件作为流使用:
<int-sftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />
<int-file:splitter input-channel="stream" output-channel="lines" />
如果您在自定义组件中使用输入流,则必须关闭 .
你可以在自定义代码中执行此操作,也可以将消息的副本路由到 a 并使用 SPEL,如下例所示:Session service-activator |
<int:service-activator input-channel="closeSession"
expression="headers['closeableResource'].close()" />
使用命令mget
mget
根据模式检索多个远程文件,并支持以下选项:
-
-P
:保留远程文件的时间戳。 -
-R
:递归检索整个目录树。 -
-x
:如果没有文件与模式匹配,则引发异常(否则,返回空列表)。 -
-D
:传输成功后删除每个远程文件。 如果忽略传输,则不会删除远程文件,因为 is 和本地文件已存在。FileExistsMode
IGNORE
操作生成的消息有效负载是一个对象(即一个对象,每个对象表示一个检索到的文件)。mget
List<File>
List
File
从版本 5.0 开始,如果 is ,则输出消息的有效负载不再包含由于文件已存在而未获取的文件。
以前,该数组包含所有文件,包括已存在的文件。FileExistsMode IGNORE |
您使用的表达式 determine the remote path 应生成一个结果,该结果以例如,fetches the complete tree under 结尾。*
myfiles/*
myfiles
从版本 5.0 开始,您可以将 recursive 与 模式结合使用,以定期在本地同步整个远程目录树。
此模式将本地文件的上次修改时间戳设置为远程文件的时间戳,而不考虑 (preserve timestamp) 选项。MGET
FileExistsMode.REPLACE_IF_MODIFIED
-P
使用递归时的注意事项 (
-R )该模式将被忽略并被假定。
默认情况下,将检索整个远程树。
但是,您可以通过提供 .
您还可以通过这种方式筛选树中的目录。
A 可以通过 reference 或 by 或 attribute 提供。
例如,检索 remote directory 和 subdirectory 中以 结尾的所有文件。
但是,我们将在本说明之后介绍可用的替代方法。 如果筛选一个子目录,则不会对该子目录执行额外的遍历。 不允许使用该选项(递归使用递归获取目录树,并且目录本身不能包含在列表中)。 通常,您将在 中使用该变量,以便在本地保留远程目录结构。 |
持久性文件列表过滤器现在具有 boolean 属性 。
将此属性设置为 , 还会设置 ,这意味着出站网关( 和 )上的递归操作现在每次都将始终遍历整个目录树。
这是为了解决未检测到目录树深处更改的问题。
此外,还会导致将文件的完整路径用作元数据存储键;这解决了以下问题:如果具有相同名称的文件在不同目录中多次出现,则过滤器无法正常工作。
重要说明:这意味着对于顶级目录下的文件,将无法找到持久性元数据存储中的现有键。
因此,默认情况下,该属性为;这可能会在未来版本中更改。forRecursion
true
alwaysAcceptDirectories
ls
mget
forRecursion=true
false
从版本 5.0 开始,您可以通过将 和 设置为 来配置 和 以始终传递目录。
这样做允许对简单模式进行递归,如下例所示:SftpSimplePatternFileListFilter
SftpRegexPatternFileListFilter
alwaysAcceptDirectorties
true
<bean id="starDotTxtFilter"
class="org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter">
<constructor-arg value="*.txt" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
<bean id="dotStarDotTxtFilter"
class="org.springframework.integration.sftp.filters.SftpRegexPatternFileListFilter">
<constructor-arg value="^.*\.txt$" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
您可以使用网关上的属性来提供这些过滤器之一。filter
使用命令put
put
将文件发送到远程服务器。
消息的有效负载可以是 、 或 。
一个 (或表达式) 用于命名远程文件。
其他可用属性包括 ,及其等效项:和 .
有关更多信息,请参阅 schema documentation.java.io.File
byte[]
String
remote-filename-generator
remote-directory
temporary-remote-directory
*-expression
use-temporary-file-name
auto-create-directory
操作生成的消息有效负载是 a,其中包含传输后服务器上文件的完整路径。put
String
版本 4.3 引入了该属性,该属性在上传后更改远程文件权限。
您可以使用传统的 Unix 八进制格式(例如,仅允许文件所有者进行读写操作)。
使用 java 配置适配器时,可以使用 .chmod
600
setChmod(0600)
使用命令mput
mput
将多个文件发送到服务器并支持以下选项:
-
-R
: Recursive — 发送目录和子目录中的所有文件(可能已过滤)
消息有效负载必须是表示本地目录的 (或 )。
从版本 5.1 开始,还支持 or 的集合。java.io.File
String
File
String
支持与 put
命令相同的属性。
此外,您还可以使用 、 、 或 之一筛选本地目录中的文件。
只要子目录本身通过过滤器,过滤器就可以使用递归。
未通过过滤器的子目录不会递归。mput-pattern
mput-regex
mput-filter
mput-filter-expression
操作生成的消息有效负载是一个对象(即传输产生的远程文件路径)。mput
List<String>
List
版本 4.3 引入了该属性,该属性允许您在上传后更改远程文件权限。
您可以使用传统的 Unix 八进制格式(例如,仅允许文件所有者进行读写操作)。
使用 Java 配置适配器时,可以使用 或 。chmod
600
setChmodOctal("600")
setChmod(0600)
使用命令rm
该命令没有选项。rm
如果 remove 操作成功,则生成的消息负载为 .
否则,消息负载为 。
header 包含远程目录,header 包含文件名。Boolean.TRUE
Boolean.FALSE
file_remoteDirectory
file_remoteFile
使用命令mv
该命令没有选项。mv
该属性定义 “from” 路径,该属性定义 “to” 路径。
默认情况下,该 是 .
此表达式的计算结果不得为 null 或空 .
如有必要,将创建所需的任何远程目录。
结果消息的负载为 .
header 包含原始远程目录,header 包含文件名。
标头包含新路径。expression
rename-expression
rename-expression
headers['file_renameTo']
String
Boolean.TRUE
file_remoteDirectory
file_remoteFile
file_renameTo
从版本 5.5.6 开始,为方便起见,可以在命令中使用 。
如果 “from” 文件不是完整的文件路径,则 result of 将用作远程目录。
这同样适用于“to”文件,例如,如果任务只是重命名某个目录中的远程文件。remoteDirectoryExpression
mv
remoteDirectoryExpression
其他命令信息
和 命令支持该属性。
它定义了一个 SPEL 表达式,用于在传输期间生成本地文件的名称。
评估上下文的根对象是请求消息。
该变量也可用。
它特别适用于(例如:)。get
mget
local-filename-generator-expression
remoteFileName
mget
local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.foo"
和 命令支持该属性。
它定义了一个 SPEL 表达式,用于在传输期间生成本地目录的名称。
评估上下文的根对象是请求消息。
该变量也可用。
它对 mget 特别有用(例如:)。
此属性与该属性互斥。get
mget
local-directory-expression
remoteDirectory
local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.myheader"
local-directory
对于所有命令,网关的 'expression' 属性保存命令操作的路径。
对于该命令,表达式的计算结果可能为 ,表示检索所有文件、 以及以 .mget
*
somedirectory/*
*
以下示例显示了为命令配置的网关:ls
<int-ftp:outbound-gateway id="gateway1"
session-factory="ftpSessionFactory"
request-channel="inbound1"
command="ls"
command-options="-1"
expression="payload"
reply-channel="toSplitter"/>
发送到通道的消息的有效负载是一个对象列表,每个对象都包含一个文件名。
如果省略 ,则有效负载将是对象列表。
您可以将选项作为空格分隔的列表(例如,)。toSplitter
String
command-options="-1"
FileInfo
command-options="-1 -dirs -links"
从版本 4.2 开始, , , 和 命令支持属性(使用命名空间支持时)。
这会影响本地文件存在( 和 )或远程文件存在( 和 )时的行为。
支持的模式包括 、 、 和 。
为了向后兼容,和 operations 的默认模式为 .
对于 和 operations,默认值为 .GET
MGET
PUT
MPUT
FileExistsMode
mode
GET
MGET
PUT
MPUT
REPLACE
APPEND
FAIL
IGNORE
PUT
MPUT
REPLACE
GET
MGET
FAIL
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置出站网关的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new SftpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置出站网关的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
factory.setTestSession(true);
return new CachingSessionFactory<>(sf);
}
@Bean
public QueueChannelSpec remoteFileOutputChannel() {
return MessageChannels.queue();
}
@Bean
public IntegrationFlow sftpMGetFlow() {
return IntegrationFlow.from("sftpMgetInputChannel")
.handle(Sftp.outboundGateway(sftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
.regexFileNameFilter("(subSftpSource|.*1.txt)")
.localDirectoryExpression("'myDir/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replaceFirst('sftpSource', 'localTarget')"))
.channel("remoteFileOutputChannel")
.get();
}
}
出站网关部分成功 ( 和mget
mput
)
对多个文件执行操作(使用 和 )时,在传输一个或多个文件后的一段时间内可能会发生异常。
在这种情况下(从版本 4.2 开始),会抛出 a 。
除了通常的属性 ( 和 ),此异常还具有两个附加属性:mget
mput
PartialSuccessException
MessagingException
failedMessage
cause
-
partialResults
:传输成功的结果。 -
derivedInput
:从请求消息生成的文件列表(例如要传输的本地文件)。mput
这些属性允许您确定哪些文件已成功传输,哪些文件未成功传输。
在 recursive 的情况下,可能具有嵌套实例。mput
PartialSuccessException
PartialSuccessException
请考虑以下目录结构:
root/
|- file1.txt
|- subdir/
| - file2.txt
| - file3.txt
|- zoo.txt
如果异常发生在 ,则网关引发的 具有 、 和 和 的 。
它是另一个具有 of 和 and 的 。file3.txt
PartialSuccessException
derivedInput
file1.txt
subdir
zoo.txt
partialResults
file1.txt
cause
PartialSuccessException
derivedInput
file2.txt
file3.txt
partialResults
file2.txt
MessageSessionCallback 回调
从 Spring 集成版本 4.2 开始,你可以使用带有 () 的实现来对 with context 执行任何操作。
您可以将其用于任何非标准或低级 SFTP 操作(或多个),例如允许从集成流定义或功能接口 (lambda) 实施注入进行访问。
以下示例使用 lambda:MessageSessionCallback<F, T>
<int-sftp:outbound-gateway/>
SftpOutboundGateway
Session<SftpClient.DirEntry>
requestMessage
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler sftpOutboundGateway(SessionFactory<SftpClient.DirEntry> sessionFactory) {
return new SftpOutboundGateway(sessionFactory,
(session, requestMessage) -> session.list(requestMessage.getPayload()));
}
另一个示例可能是对正在发送或检索的文件数据进行预处理或后处理。
使用 XML 配置时,会提供一个允许您指定 Bean 名称的属性。<int-sftp:outbound-gateway/>
session-callback
MessageSessionCallback
这与 和 属性互斥。
使用 Java 进行配置时,该类提供不同的构造函数。session-callback command expression SftpOutboundGateway |
Apache Mina SFTP 服务器事件
版本 5.2 中添加的 ,侦听某些 Apache Mina SFTP 服务器事件,并将它们发布为可以由任何 bean、bean 方法或事件入站通道适配器接收的 s。ApacheMinaSftpEventListener
ApplicationEvent
ApplicationListener
@EventListener
目前,支持的事件包括:
-
SessionOpenedEvent
- 已打开客户端会话 -
DirectoryCreatedEvent
- 已创建目录 -
FileWrittenEvent
- 文件已写入 -
PathMovedEvent
- 文件或目录已重命名 -
PathRemovedEvent
- 文件或目录已被删除 -
SessionClosedEvent
- 客户端已断开连接
它们中的每一个都是 ;您可以配置单个侦听器来接收所有事件类型。
每个事件的属性都是 ,您可以从中获取客户端地址等信息;在 Abstract 事件上提供了一个方便的方法。ApacheMinaSftpEvent
source
ServerSession
getSession()
要使用侦听器(必须是 Spring bean)配置服务器,只需将其添加到:SftpSubsystemFactory
server = SshServer.setUpDefaultServer();
...
SftpSubsystemFactory sftpFactory = new SftpSubsystemFactory();
sftpFactory.addSftpEventListener(apacheMinaSftpEventListenerBean);
...
要使用 Spring 集成事件适配器来使用这些事件:
@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
ApplicationEventListeningMessageProducer producer =
new ApplicationEventListeningMessageProducer();
producer.setEventTypes(ApacheMinaSftpEvent.class);
producer.setOutputChannel(eventChannel());
return producer;
}
远程文件信息
从版本 5.2 开始,(SFTP Outbound Gateway) 的 (SFTP Streaming Inbound Channel Adapter)、(SFTP Inbound Channel Adapter) 和 “read” 命令在消息中提供额外的标头,以生成有关远程文件的信息:SftpStreamingMessageSource
SftpInboundFileSynchronizingMessageSource
SftpOutboundGateway
-
FileHeaders.REMOTE_HOST_PORT
- 在文件传输操作期间,远程会话已连接到的 host:port 对; -
FileHeaders.REMOTE_DIRECTORY
- 已执行操作的远程目录; -
FileHeaders.REMOTE_FILE
- 远程文件名;仅适用于单文件操作。
由于 不会针对远程文件生成消息,而是使用本地副本,因此在同步操作期间,它会以 URI 样式 () 将有关远程文件的信息存储在 (可以在外部配置) 中。
轮询本地文件时,将检索此元数据。
删除本地文件时,建议删除其元数据条目。
为此,它提供了一个回调。
此外,还有一个 to be used 在元数据键中。
当这些组件之间共享同一实例时,建议将此前缀与基于 -的实施中使用的前缀不同,以避免条目覆盖,因为两者都会筛选并对元数据入口键使用相同的本地文件名。SftpInboundFileSynchronizingMessageSource
AbstractInboundFileSynchronizer
MetadataStore
protocol://host:port/remoteDirectory#remoteFileName
SftpInboundFileSynchronizingMessageSource
AbstractInboundFileSynchronizer
removeRemoteFileMetadata()
setMetadataStorePrefix()
MetadataStore
FileListFilter
MetadataStore
AbstractInboundFileSynchronizer