对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
读取文件
A 可用于使用文件系统中的文件。
这是从文件系统目录创建消息的实现。
以下示例显示如何配置 :FileReadingMessageSource
MessageSource
FileReadingMessageSource
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
要防止为某些文件创建消息,您可以提供 .
默认情况下,我们使用以下筛选器:FileListFilter
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
可确保不处理隐藏文件。
请注意,hidden 的确切定义取决于系统。
例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏文件。
另一方面,Microsoft Windows 有一个专用的 file 属性来指示隐藏文件。IgnoreHiddenFileListFilter
版本 4.2 引入了 .
在以前的版本中,包含隐藏文件。
使用默认配置时,首先触发 ,然后触发 . |
这可确保仅从目录中选取一次文件。AcceptOnceFileListFilter
将其状态存储在内存中。
如果您希望该状态在系统重启后仍然存在,可以使用 .
此过滤器将接受的文件名存储在实施中(请参阅 元数据存储)。
此筛选条件与文件名和修改时间匹配。 从 4.0 版本开始,此过滤器需要一个 .
当与共享数据存储一起使用时(例如与 ),它允许在多个应用程序实例之间或在多个服务器使用的网络文件共享之间共享筛选键。 从版本 4.1.5 开始,此过滤器具有一个新属性 (),该属性会导致它在每次更新时刷新元数据存储(如果存储实现)。 |
持久性文件列表过滤器现在具有 boolean 属性 。
将此属性设置为 , 还会设置 ,这意味着出站网关( 和 )上的递归操作现在每次都将始终遍历整个目录树。
这是为了解决未检测到目录树深处更改的问题。
此外,还会导致将文件的完整路径用作元数据存储键;这解决了以下问题:如果具有相同名称的文件在不同目录中多次出现,则过滤器无法正常工作。
重要说明:这意味着对于顶级目录下的文件,将无法找到持久性元数据存储中的现有键。
因此,默认情况下,该属性为;这可能会在未来版本中更改。forRecursion
true
alwaysAcceptDirectories
ls
mget
forRecursion=true
false
以下示例使用 filter 配置 a:FileReadingMessageSource
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的常见问题是,文件可能在准备就绪之前被检测到(即,其他进程可能仍在写入该文件)。
默认值不会阻止此操作。
在大多数情况下,如果文件写入过程在准备好读取每个文件后立即重命名每个文件,则可以防止这种情况。
一个 or 过滤器只接受准备好的文件(可能基于已知后缀),用 default 组成,允许这种情况。
启用合成,如下例所示:AcceptOnceFileListFilter
filename-pattern
filename-regex
AcceptOnceFileListFilter
CompositeFileListFilter
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,则 Spring 集成提供了另一种选择。
版本 4.2 添加了 .
可以使用属性配置此过滤器,以便过滤器仅传递早于此值的文件。
该年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早获取文件(例如,由于网络故障)。
以下示例显示如何配置 :LastModifiedFileListFilter
age
LastModifiedFileListFilter
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从版本 4.3.7 开始,引入了 a (扩展 ) 以允许后续过滤器只能看到前一个过滤器的结果。
(使用 ,所有过滤器都会看到所有文件,但它只传递已通过所有过滤器的文件)。
需要新行为的一个示例是 和 的组合,当我们在经过一段时间之前不希望接受文件时。
使用 ,由于 it 在第一次传递时看到所有文件,因此当另一个过滤器稍后传递时,它不会传递它。
当模式筛选器与自定义筛选器结合使用时,该方法非常有用,自定义筛选器查找辅助文件以指示文件传输已完成。
模式过滤器可能只传递主文件(例如 ),但 “done” 过滤器需要查看(例如)是否存在。ChainFileListFilter
CompositeFileListFilter
CompositeFileListFilter
LastModifiedFileListFilter
AcceptOnceFileListFilter
CompositeFileListFilter
AcceptOnceFileListFilter
CompositeFileListFilter
something.txt
something.done
假设我们有文件 、 、 和 .a.txt
a.done
b.txt
模式过滤器仅传递 和 ,而 “done” 过滤器看到所有三个文件并仅传递 。
复合筛选器的最终结果是 only 被释放。a.txt
b.txt
a.txt
a.txt
使用 ,如果链中的任何过滤器返回空列表,则不会调用其余过滤器。ChainFileListFilter |
版本 5.0 引入了对文件执行 SPEL 表达式作为上下文评估根对象。
为此,所有用于文件处理的 XML 组件(本地和远程)以及现有属性都提供了该选项,如下例所示:ExpressionFileListFilter
filter
filter-expression
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
版本 5.0.5 引入了对被拒绝文件感兴趣的实现。
为此,应通过 .
在框架中,此功能从 中与 结合使用。
与常规 不同,它根据目标文件系统上的事件提供用于处理的文件。
在用这些文件轮询内部队列时,可能会丢弃它们,因为它们相对于其配置的太年轻。
因此,我们将丢失该文件以备将来可能的考虑。
discard 回调钩子允许我们将文件保留在内部队列中,以便在后续轮询中可以对其进行检查。
它还实现了一个 并填充了一个 discard 回调到其所有委托。DiscardAwareFileListFilter
addDiscardCallback(Consumer<File>)
FileReadingMessageSource.WatchServiceDirectoryScanner
LastModifiedFileListFilter
DirectoryScanner
WatchService
LastModifiedFileListFilter
age
age
CompositeFileListFilter
DiscardAwareFileListFilter
DiscardAwareFileListFilter
由于文件与所有委托匹配,因此可以对同一文件多次调用 。CompositeFileListFilter discardCallback |
从版本 5.1 开始,它不会检查目录是否存在,并且在调用它之前不会创建它(通常通过 wrapping )。
以前,在引用目录时(例如,从测试中引用)或稍后应用权限时,没有简单的方法来防止操作系统权限错误。FileReadingMessageSource
start()
SourcePollingChannelAdapter
消息报头
从版本 5.0 开始, (除了 as a polled ) 将以下标头填充到 outbound :FileReadingMessageSource
payload
File
Message
-
FileHeaders.FILENAME
:要发送的文件。 可用于后续的重命名或复制逻辑。File.getName()
-
FileHeaders.ORIGINAL_FILE
:对象本身。 通常,当我们丢失原始对象时,框架组件(例如拆分器或转换器)会自动填充此标头。 但是,为了与任何其他自定义用例保持一致和方便,此标头可用于访问原始文件。File
File
-
FileHeaders.RELATIVE_PATH
:引入了一个新的标头,用于表示文件路径相对于扫描的根目录的部分。 当要求在其他地方恢复源目录层次结构时,此标头可能很有用。 为此,可以将 (请参阅“'生成文件名) 配置为使用此标头。DefaultFileNameGenerator
目录扫描和轮询
不会立即为目录中的文件生成消息。
它使用内部队列来存储由 .
该选项用于确保在每次轮询时使用最新的输入目录内容刷新内部队列。
默认情况下 () ,它会在再次扫描目录之前清空其队列。
此默认行为对于减少对目录中大量文件的扫描特别有用。
但是,在需要自定义排序的情况下,请务必考虑将此标志设置为 的效果。
文件的处理顺序可能与预期不符。
默认情况下,队列中的文件按其自然 () 顺序处理。
通过扫描添加的新文件,即使队列中已有文件,也会入到适当的位置,以保持该自然顺序。
要自定义顺序,可以接受 a 作为构造函数参数。
内部 () 使用它根据业务要求对其内容重新排序。
因此,要按特定顺序处理文件,您应该提供一个比较器,而不是对自定义生成的列表进行排序。FileReadingMessageSource
scanner
scanEachPoll
scanEachPoll = false
FileReadingMessageSource
true
path
FileReadingMessageSource
Comparator<File>
PriorityBlockingQueue
FileReadingMessageSource
DirectoryScanner
引入了版本 5.0 以执行文件树访问。
实现基于功能。
根目录 () 参数将从结果中排除。
所有其他子目录包含和排除都基于目标实施。
例如,默认情况下会筛选出目录。
有关更多信息,请参见AbstractDirectoryAwareFileListFilter
及其实现。RecursiveDirectoryScanner
Files.walk(Path start, int maxDepth, FileVisitOption… options)
DirectoryScanner.listFiles(File)
FileListFilter
SimplePatternFileListFilter
从版本 5.5 开始,Java DSL 有一个方便的选项,可以在目标中使用 a 而不是默认选项。FileInboundChannelAdapterSpec recursive(boolean) RecursiveDirectoryScanner FileReadingMessageSource |
命名空间支持
通过使用特定于文件的命名空间,可以简化文件读取的配置。 为此,请使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间中,你可以减少并将其包装在入站 Channel Adapter 中,如下所示:FileReadingMessageSource
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个 channel adapter 示例依赖于默认实现:FileListFilter
-
IgnoreHiddenFileListFilter
(不处理隐藏文件) -
AcceptOnceFileListFilter
(防止重复)
因此,您也可以省略 and 属性,因为它们默认是 and 属性。prevent-duplicates
ignore-hidden
true
Spring Integration 4.2 引入了该属性。
在以前的版本中,包含隐藏文件。 |
第二个通道适配器示例使用自定义过滤器,第三个示例使用属性添加基于模式的过滤器,第四个示例使用该属性将基于正则表达式模式的过滤器添加到 .
和 属性都与常规 reference 属性互斥。
但是,您可以使用该属性来引用 的实例,该实例组合了任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。filename-pattern
AntPathMatcher
filename-regex
FileReadingMessageSource
filename-pattern
filename-regex
filter
filter
CompositeFileListFilter
当多个进程从同一目录读取数据时,您可能希望锁定文件以防止它们被并发选取。
为此,您可以使用 .
有一个基于 的实现可用,但也可以实现您自己的锁定方案。
储物柜可以按如下方式注入:FileLocker
java.nio
nio
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
您可以按如下方式配置自定义保险箱:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了locker时,它负责在允许接收文件之前获取锁。
它不承担解锁文件的责任。
如果已处理文件并保持锁挂起,则存在内存泄漏。
如果这是一个问题,您应该在适当的时候打电话给自己。FileLocker.unlock(File file) |
当筛选和锁定文件还不够时,您可能需要完全控制文件的列出方式。
要实现此类要求,您可以使用 .
此扫描程序可让您准确确定每个轮询中列出的文件。
这也是 Spring 集成在内部用来连接实例和 .
您可以将自定义注入 on 属性,如下例所示:DirectoryScanner
FileListFilter
FileLocker
FileReadingMessageSource
DirectoryScanner
<int-file:inbound-channel-adapter/>
scanner
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做可以让您完全自由地选择排序、列表和锁定策略。
了解过滤器(包括 、 、 、 等)和实例实际上由 .
在适配器上设置的这些属性中的任何一个随后都会注入到内部 .
对于 external ,禁止在 .
必须在该自定义上指定它们(如果需要)。
换句话说,如果将 a 注入到 中,则应在 上提供 ,而不是在 上。patterns
regex
prevent-duplicates
locker
scanner
scanner
scanner
FileReadingMessageSource
DirectoryScanner
scanner
FileReadingMessageSource
filter
locker
scanner
FileReadingMessageSource
默认情况下,使用 an 和 .
要防止使用它们,您可以配置自己的过滤器(如 ),甚至将其设置为 。DefaultDirectoryScanner IgnoreHiddenFileListFilter AcceptOnceFileListFilter AcceptAllFileListFilter null |
WatchServiceDirectoryScanner
当新文件添加到目录时,它依赖于文件系统事件。
在初始化期间,将注册目录以生成事件。
初始文件列表也是在初始化期间构建的。
在遍历目录树时,遇到的任何子目录也会被注册以生成事件。
在第一次轮询时,将返回遍历目录的初始文件列表。
在后续轮询中,将返回来自新创建事件的文件。
如果添加了新的子目录,则其 creation 事件用于遍历新子树以查找现有文件并注册找到的任何新子目录。FileReadingMessageSource.WatchServiceDirectoryScanner
当其内部事件没有在目录修改事件发生时,程序不会尽快耗尽其内部事件,则存在问题。
如果超过队列大小,则发出 a 以指示某些文件系统事件可能会丢失。
在这种情况下,将完全重新扫描根目录。
为避免重复,请考虑使用适当的 (如 ) 或在处理完成时删除文件。WatchKey queue StandardWatchEventKinds.OVERFLOW FileListFilter AcceptOnceFileListFilter |
可以通过 option 启用 ,该选项与 option 互斥。
为提供的 .WatchServiceDirectoryScanner
FileReadingMessageSource.use-watch-service
scanner
FileReadingMessageSource.WatchServiceDirectoryScanner
directory
此外,现在轮询逻辑可以跟踪 和 。WatchService
StandardWatchEventKinds.ENTRY_MODIFY
StandardWatchEventKinds.ENTRY_DELETE
如果需要跟踪现有文件和新文件的修改,则应在 .
否则,将以相同的方式处理这些事件中的文件。ENTRY_MODIFY
FileListFilter
implementations 选取事件。
因此,将为操作提供其文件。
启用此事件后,过滤器(如 )将删除该文件。
因此,如果出现同名文件,它将通过过滤器并作为消息发送。ResettableFileListFilter
ENTRY_DELETE
remove()
AcceptOnceFileListFilter
为此,引入了属性 ()。
( 是 中的公共内部枚举。
使用此选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他一些 logic 。
以下示例显示了如何为同一目录中的 create 和 modify 事件配置不同的逻辑:watch-events
FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
WatchEventType
FileReadingMessageSource
值得一提的是,该事件涉及到被监视目录的 sub-directory 的 rename 操作。
更具体地说,与先前目录名称相关的 event 位于通知新(重命名)目录的 event 之前。
在某些操作系统(如 Windows)上,必须注册事件才能处理这种情况。
否则,在文件资源管理器中重命名监视的子目录可能会导致在该子目录中无法检测到新文件。ENTRY_DELETE
ENTRY_DELETE
ENTRY_CREATE
ENTRY_DELETE
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从版本 6.1 开始,公开了两个新的 -相关选项:FileReadingMessageSource
WatchService
-
watchMaxDepth
- API 的参数;Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)
-
watchDirPredicate
- a 来测试是否应遍历扫描树中的目录并将其注册到和配置的 watch 事件类型。Predicate<Path>
WatchService
限制内存消耗
您可以使用 a 来限制内存中保留的文件数。
这在扫描大型目录时非常有用。
使用 XML 配置时,可以通过在入站通道适配器上设置属性来启用此功能。HeadDirectoryScanner
queue-size
在版本 4.2 之前,此设置与任何其他过滤器的使用不兼容。
任何其他过滤器(包括 )都会覆盖用于限制大小的过滤器。prevent-duplicates="true"
使用 a 与 不兼容。
由于在轮询决策期间会查询所有筛选器,因此 不知道其他筛选器可能正在临时筛选文件。
即使以前由 筛选的文件现在可用,也会对其进行筛选。 通常,在这种情况下,您应该删除已处理的文件,以便以前过滤的文件在将来的轮询中可用,而不是使用 an。 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
'tail'ing 文件
另一个常见的用例是从文件的末尾(或尾部)获取 'lines',并在添加新行时捕获新行。
提供了两种实现。
第一个 , 使用 native 命令 (在具有 native 命令的 os 上)。
这通常是这些平台上最有效的实现。
对于没有命令的操作系统,第二个实现 使用 Apache 类。OSDelegatingFileTailingMessageProducer
tail
tail
ApacheCommonsFileTailingMessageProducer
commons-io
Tailer
在这两种情况下,文件系统事件(例如文件不可用和其他事件)都使用正常的 Spring 事件发布机制作为实例发布。
此类事件的示例包括:ApplicationEvent
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
例如,在旋转文件时,可能会发生前面示例中显示的事件序列。
从版本 5.0 开始,在 期间,当文件中没有数据时,会发出 a 。
以下示例显示了此类事件的外观:FileTailingIdleEvent
idleEventInterval
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持命令的平台都提供这些状态消息。tail |
从这些终端节点发出的消息具有以下标头:
-
FileHeaders.ORIGINAL_FILE
:对象File
-
FileHeaders.FILENAME
:文件名 (File.getName()
)
在版本 5.0 之前的版本中,标头包含文件绝对路径的字符串表示形式。
现在,您可以通过调用原始文件头来获取该字符串表示形式。FileHeaders.FILENAME getAbsolutePath() |
以下示例使用默认选项 ('-F -n 0',表示从当前端开始遵循文件名) 创建本机适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例使用 '-F -n +0' 选项(表示遵循文件名,发出所有现有行)创建本机适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果命令失败(在某些平台上,缺少文件会导致失败,即使指定了命令),则每 10 秒重试一次该命令。tail
tail
-F
默认情况下,本机适配器从标准输出捕获并将内容作为消息发送。
他们还从标准错误中捕获以引发事件。
从版本 4.3.6 开始,您可以通过将 设置为 ,从而丢弃标准错误事件,如下例所示:enable-status-reader
false
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,设置为 ,这意味着,如果 5 秒内未写入任何行,则每 5 秒触发一次:IdleEventInterval
5000
FileTailingIdleEvent
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
当您需要停止适配器时,这可能很有用。
下面的示例创建一个 Apache 适配器,该适配器每 2 秒检查一次文件是否有新行,并每 10 秒检查一次是否存在缺失的文件:commons-io
Tailer
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | 文件从开头 () 而不是结尾(这是默认值)开始拖尾。end="false" |
2 | 将为每个块重新打开文件(默认为保持文件打开)。 |
指定 、 或 属性会强制使用 Apache 适配器,并使该属性不可用。delay end reopen commons-io native-options |
处理不完整的数据
文件传输方案中的一个常见问题是如何确定传输已完成,以便不会开始读取不完整的文件。
解决此问题的一种常见方法是使用临时名称编写文件,然后以原子方式将其重命名为最终名称。
此技术与遮盖临时文件不被使用者选取的过滤器一起,提供了一个强大的解决方案。
这种技术被写入文件(本地或远程)的 Spring 集成组件使用。
默认情况下,它们会附加到文件名,并在传输完成后将其删除。.writing