FTP 流入站通道适配器
版本 4.3 引入了流入站通道适配器。
此适配器生成 payloads 类型的 message ,让文件在不写入
local 文件系统。
由于会话保持打开状态,因此使用应用程序负责在文件
消耗。
会话在标头 () 中提供。
标准框架组件(如 和 )会自动关闭会话。
有关这些组件的更多信息,请参见File Splitter 和 Stream Transformer。
以下示例说明如何配置 :InputStream
closeableResource
IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
FileSplitter
StreamTransformer
inbound-streaming-channel-adapter
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
只允许 、 、 或 中的一个。filename-pattern
filename-regex
filter
filter-expression
从版本 5.0 开始,默认情况下,适配器会根据内存中的 .
默认情况下,此过滤器也与文件名模式(或 regex)一起应用。
如果需要允许重复项,可以使用 .
任何其他用例都可以由 (或 ) 处理。
Java 配置(本文档后面部分)显示了一种在处理后删除远程文件以避免重复的技术。FtpStreamingMessageSource FtpPersistentAcceptOnceFileListFilter SimpleMetadataStore AcceptAllFileListFilter CompositeFileListFilter ChainFileListFilter |
有关 的更多信息,请参见远程持久性文件列表过滤器。FtpPersistentAcceptOnceFileListFilter
使用 该属性来限制在需要 fetch 时每次轮询时获取的文件数。
将其设置为在集群环境中运行时使用持久过滤器。
有关更多信息,请参见入站通道适配器:控制远程文件获取。max-fetch-size
1
适配器将远程目录和文件名分别放在 和 headers 中。
从版本 5.0 开始,标头提供额外的远程文件信息(默认以 JSON 表示)。
如果在 to 上设置属性,则标头包含一个对象。
可以使用该方法访问底层 Apache Net 库提供的对象。
当您使用 XML 配置时,该属性不可用,但您可以通过将 the 注入到您的一个配置类中来设置它。
另请参阅远程文件信息。FileHeaders.REMOTE_DIRECTORY
FileHeaders.REMOTE_FILE
FileHeaders.REMOTE_FILE_INFO
fileInfoJson
FtpStreamingMessageSource
false
FtpFileInfo
FTPFile
FtpFileInfo.getFileInfo()
fileInfoJson
FtpStreamingMessageSource
从版本 5.1 开始,的泛型类型为 .
以前,它是 .
这是因为排序现在在处理的早期执行,在过滤和应用之前执行。comparator
FTPFile
AbstractFileInfo<FTPFile>
maxFetch
使用 Java 配置进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
请注意,在此示例中,转换器下游的消息处理程序具有一个,该处理程序在处理后删除远程文件。advice