The FTP inbound channel adapter is a special listener that connects to the FTP server and listens for the remote directory events (for example, new file created) at which point it initiates a file transfer.
The following example shows how to configure an inbound-channel-adapter
:
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
As the preceding configuration shows, you can configure an FTP inbound channel adapter by using the inbound-channel-adapter
element while also providing values for various attributes, such as local-directory
, filename-pattern
(which is based on simple pattern matching, not regular expressions), and the reference to a session-factory
.
By default, the transferred file carries the same name as the original file.
If you want to override this behavior, you can set the local-filename-generator-expression
attribute, which lets you provide a SpEL expression to generate the name of the local file.
Unlike outbound gateways and adapters, where the root object of the SpEL evaluation context is a Message
, this inbound adapter does not yet have the message at the time of evaluation, since that’s what it ultimately generates with the transferred file as its payload.
Consequently, the root object of the SpEL evaluation context is the original name of the remote file (a String
).
The inbound channel adapter first retrieves the File
object for a local directory and then emits each file according to the poller configuration.
Starting with version 5.0, you can now limit the number of files fetched from the FTP server when new file retrievals are needed.
This can be beneficial when the target files are very large or when you run in a clustered system with a persistent file list filter, discussed later.
Use max-fetch-size
for this purpose.
A negative value (the default) means no limit and all matching files are retrieved.
See Inbound Channel Adapters: Controlling Remote File Fetching for more information.
Since version 5.0, you can also provide a custom DirectoryScanner
implementation to the inbound-channel-adapter
by setting the scanner
attribute.
Starting with Spring Integration 3.0, you can specify the preserve-timestamp
attribute (its default is false
).
When true
, the local file’s modified timestamp is set to the value retrieved from the server.
Otherwise, it is set to the current time.
Starting with version 4.2, you can specify remote-directory-expression
instead of remote-directory
, letting you dynamically determine the directory on each poll — for example, remote-directory-expression="@myBean.determineRemoteDir()"
.
Starting with version 4.3, you can omit the remote-directory
and remote-directory-expression
attributes.
They default to null
.
In this case, according to the FTP protocol, the client working directory is used as the default remote directory.
Sometimes, file filtering based on the simple pattern specified with the filename-pattern
attribute might not suffice.
If this is the case, you can use the filename-regex
attribute to specify a regular expression (such as filename-regex=".*\.test$"
).
Also, if you need complete control, you can use the filter
attribute and provide a reference to any custom implementation of the o.s.i.file.filters.FileListFilter
, a strategy interface for filtering a list of files.
This filter determines which remote files are retrieved.
You can also combine a pattern-based filter with other filters (such as an AcceptOnceFileListFilter
to avoid synchronizing files that have previously been fetched) by using a CompositeFileListFilter
.
The AcceptOnceFileListFilter
stores its state in memory.
If you wish the state to survive a system restart, consider using the FtpPersistentAcceptOnceFileListFilter
instead.
This filter stores the accepted file names in an instance of the MetadataStore
strategy (see Metadata Store).
This filter matches on the filename and the remote modified time.
Since version 4.0, this filter requires a ConcurrentMetadataStore
.
When used with a shared data store (such as Redis
with the RedisMetadataStore
), it lets filter keys be shared across multiple application or server instances.
Starting with version 5.0, the FtpPersistentAcceptOnceFileListFilter
with in-memory SimpleMetadataStore
is applied by default for the FtpInboundFileSynchronizer
.
This filter is also applied with the regex
or pattern
option in the XML configuration as well as with FtpInboundChannelAdapterSpec
in the Java DSL.
Any other use cases can be managed with CompositeFileListFilter
(or ChainFileListFilter
).
The preceding discussion refers to filtering the files before retrieving them.
Once the files have been retrieved, an additional filter is applied to the files on the file system.
By default, this is an AcceptOnceFileListFilter
which, as discussed earlier, retains state in memory and does not consider the file’s modified time.
Unless your application removes files after processing, the adapter will re-process the files on disk by default after an application restart.
Also, if you configure the filter
to use a FtpPersistentAcceptOnceFileListFilter
and the remote file timestamp changes (causing it to be re-fetched), the default local filter does not let this new file be processed.
For more information about this filter, and how it is used, see Remote Persistent File List Filters.
You can use the local-filter
attribute to configure the behavior of the local file system filter.
Starting with version 4.3.8, a FileSystemPersistentAcceptOnceFileListFilter
is configured by default.
This filter stores the accepted file names and modified timestamp in an instance of the MetadataStore
strategy (see Metadata Store) and detects changes to the local file modified time.
The default MetadataStore
is a SimpleMetadataStore
, which stores state in memory.
Since version 4.1.5, these filters have a new property (flushOnUpdate
) that causes them to flush the
metadata store on every update (if the store implements Flushable
).
Further, if you use a distributed MetadataStore (such as Redis), you can have multiple instances of the same adapter or application and be sure that each file is processed only once.
|
The actual local filter is a CompositeFileListFilter
that contains the supplied filter and a pattern filter that prevents processing files that are in the process of being downloaded (based on the temporary-file-suffix
).
Files are downloaded with this suffix (the default is .writing
), and the file is renamed to its final name when the transfer is complete, making it 'visible' to the filter.
The remote-file-separator
attribute lets you configure a file separator character to use if the default '/' is not applicable for your particular environment.
See the schema for more details on these attributes.
You should also understand that the FTP inbound channel adapter is a polling consumer.
Therefore, you must configure a poller (by using either a global default or a local sub-element).
Once a file has been transferred, a message with a java.io.File
as its payload is generated and sent to the channel identified by the channel
attribute.
Starting with version 6.2, you can filter FTP files based on last-modified strategy using FtpLastModifiedFileListFilter
.
This filter can be configured with an age
property so that only files older than this value are passed by the filter.
The age defaults to 60 seconds, but you should choose an age that is large enough to avoid picking up a file early (due to, say, network glitches).
Look into its Javadoc for more information.
Further, if you use a distributed MetadataStore (such as Redis), you can have multiple instances of the same adapter or application and be sure that each file is processed only once.
|
More on File Filtering and Incomplete Files
Sometimes the file that just appeared in the monitored (remote) directory is not complete.
Typically, such a file is written with a temporary extension (such as somefile.txt.writing
) and is then renamed once the writing process finishes.
In most cases, you are only interested in files that are complete and would like to filter for only files that are complete.
To handle these scenarios, you can use the filtering support provided by the filename-pattern
, filename-regex
, and filter
attributes.
The following example uses a custom filter implementation:
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
Poller Configuration Notes for the Inbound FTP Adapter
The job of the inbound FTP adapter consists of two tasks:
-
Communicate with a remote server in order to transfer files from a remote directory to a local directory.
-
For each transferred file, generate a message with that file as a payload and send it to the channel identified by the 'channel' attribute. That is why they are called "'channel adapters'" rather than just "'adapters'". The main job of such an adapter is to generate a message to send to a message channel. Essentially, the second task takes precedence in such a way that, if your local directory already has one or more files, it first generates messages from those. Only when all local files have been processed does it initiate the remote communication to retrieve more files.
Also, when configuring a trigger on the poller, you should pay close attention to the max-messages-per-poll
attribute.
Its default value is 1
for all SourcePollingChannelAdapter
instances (including FTP).
This means that, as soon as one file is processed, it waits for the next execution time as determined by your trigger configuration.
If you happened to have one or more files sitting in the local-directory
, it would process those files before it would initiate communication with the remote FTP server.
Also, if the max-messages-per-poll
is set to 1
(the default), it processes only one file at a time with intervals as defined by your trigger, essentially working as “one-poll === one-file”.
For typical file-transfer use cases, you most likely want the opposite behavior: to process all the files you can for each poll and only then wait for the next poll.
If that is the case, set max-messages-per-poll
to -1.
Then, on each poll, the adapter tries to generate as many messages as it possibly can.
In other words, it processes everything in the local directory, and then it connects to the remote directory to transfer everything that is available there to be processed locally.
Only then is the poll operation considered complete, and the poller waits for the next execution time.
You can alternatively set the 'max-messages-per-poll' value to a positive value that indicates the upward limit of messages to be created from files with each poll.
For example, a value of 10
means that, on each poll, it tries to process no more than ten files.
Recovering from Failures
It is important to understand the architecture of the adapter.
There is a file synchronizer that fetches the files and a FileReadingMessageSource
that emits a message for each
synchronized file.
As discussed earlier, two filters are involved.
The filter
attribute (and patterns) refers to the remote (FTP) file list, to avoid fetching files that have already
been fetched.
The local-filter
is used by the FileReadingMessageSource
to determine which files are to be sent as messages.
The synchronizer lists the remote files and consults its filter.
The files are then transferred.
If an IO error occurs during file transfer, any files that have already been added to the filter are removed so that they
are eligible to be re-fetched on the next poll.
This only applies if the filter implements ReversibleFileListFilter
(such as the AcceptOnceFileListFilter
).
If, after synchronizing the files, an error occurs on the downstream flow processing a file, no automatic rollback of the filter occurs, so the failed file is not reprocessed by default.
If you wish to reprocess such files after a failure, you can use configuration similar to the following to facilitate the removal of the failed file from the filter:
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
The preceding configuration works for any ResettableFileListFilter
.
Starting with version 5.0, the inbound channel adapter can build sub-directories locally that correspond to the generated local file name.
That can be a remote sub-path as well.
To be able to read a local directory recursively for modification according to the hierarchy support, you can now supply an internal FileReadingMessageSource
with a new RecursiveDirectoryScanner
based on the Files.walk()
algorithm.
See AbstractInboundFileSynchronizingMessageSource.setScanner()
for more information.
Also, you can now switch the AbstractInboundFileSynchronizingMessageSource
to the WatchService
-based DirectoryScanner
by using setUseWatchService()
option.
It is also configured for all the WatchEventType
instances to react to any modifications in local directory.
The reprocessing sample shown earlier is based on the built-in functionality of the FileReadingMessageSource.WatchServiceDirectoryScanner
to perform ResettableFileListFilter.remove()
when the file is deleted (StandardWatchEventKinds.ENTRY_DELETE
) from the local directory.
See WatchServiceDirectoryScanner
for more information.
Configuring with Java Configuration
The following Spring Boot application show an example of how to configure the inbound adapter with Java configuration:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Configuring with the Java DSL
The following Spring Boot application shows an example of how to configure the inbound adapter with the Java DSL:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
Dealing With Incomplete Data
The FtpSystemMarkerFilePresentFileListFilter
is provided to filter remote files that do not have a corresponding marker file on the remote system.
See the Javadoc (and browse to the parent classes) for configuration information.