读取文件
一个FileReadingMessageSource
可用于使用文件系统中的文件。
这是MessageSource
,从文件系统目录创建消息。
以下示例显示如何配置FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
要防止为某些文件创建消息,您可以提供FileListFilter
.
默认情况下,我们使用以下筛选器:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
这IgnoreHiddenFileListFilter
确保不处理隐藏文件。
请注意,hidden 的确切定义取决于系统。
例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏文件。
另一方面,Microsoft Windows 有一个专用的 file 属性来指示隐藏文件。
版本 4.2 引入了 |
这AcceptOnceFileListFilter
确保仅从目录中选取文件一次。
这 从 4.0 版本开始,此过滤器需要一个 从 4.1.5 版本开始,此过滤器具有一个新属性 ( |
持久文件列表过滤器现在具有布尔属性forRecursion
.
将此属性设置为true
,还会设置alwaysAcceptDirectories
,这意味着出站网关 (ls
和mget
) 现在每次都始终遍历完整的目录树。
这是为了解决未检测到目录树深处更改的问题。
另外forRecursion=true
使文件的完整路径用作元数据存储键;这解决了以下问题:如果具有相同名称的文件在不同目录中多次出现,则过滤器无法正常工作。
重要说明:这意味着对于顶级目录下的文件,将无法找到持久性元数据存储中的现有键。
因此,该属性为false
默认情况下;这可能会在未来版本中更改。
以下示例将FileReadingMessageSource
使用滤镜:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的常见问题是,文件可能在准备就绪之前被检测到(即,其他进程可能仍在写入该文件)。
默认的AcceptOnceFileListFilter
不会阻止这种情况。
在大多数情况下,如果文件写入过程在准备好读取每个文件后立即重命名每个文件,则可以防止这种情况。
一个filename-pattern
或filename-regex
过滤器,它只接受准备好的文件(可能基于已知的后缀),由默认的AcceptOnceFileListFilter
允许这种情况。
这CompositeFileListFilter
启用合成,如下例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,则 Spring 集成提供了另一种选择。
版本 4.2 添加了LastModifiedFileListFilter
.
此过滤器可以使用age
属性,以便过滤器仅传递早于此值的文件。
该年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早获取文件(例如,由于网络故障)。
以下示例显示如何配置LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从版本 4.3.7 开始,ChainFileListFilter
(的扩展CompositeFileListFilter
) 以允许后续过滤器只能看到前一个过滤器的结果。
(使用CompositeFileListFilter
,则所有筛选器都会查看所有文件,但它仅传递已通过所有筛选器的文件。
需要新行为的一个示例是LastModifiedFileListFilter
和AcceptOnceFileListFilter
,当我们在一段时间后才希望接受文件时。
使用CompositeFileListFilter
,由于AcceptOnceFileListFilter
在第一个传递时看到所有文件,当另一个过滤器稍后传递时,它不会传递它。
这CompositeFileListFilter
当模式过滤器与自定义过滤器结合使用时,方法非常有用,该过滤器查找辅助文件以指示文件传输已完成。
模式过滤器可能只传递主文件(例如something.txt
),但 “done” 过滤器需要查看 if(例如)something.done
存在。
假设我们有文件a.txt
,a.done
和b.txt
.
模式筛选器仅通过a.txt
和b.txt
,而 “done” 过滤器只能看到所有三个文件并传递a.txt
.
复合过滤器的最终结果是,只有a.txt
发布。
使用ChainFileListFilter ,如果链中的任何过滤器返回空列表,则不会调用其余过滤器。 |
版本 5.0 引入了ExpressionFileListFilter
对文件执行 SPEL 表达式作为上下文评估根对象。
为此,所有用于文件处理的 XML 组件(本地和远程)以及现有的filter
属性,已随filter-expression
选项,如下例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
版本 5.0.5 引入了DiscardAwareFileListFilter
对 rejected files 感兴趣的实现。
为此,这样的过滤器实现应该通过addDiscardCallback(Consumer<File>)
.
在框架中,此功能从FileReadingMessageSource.WatchServiceDirectoryScanner
,与LastModifiedFileListFilter
.
与常规DirectoryScanner
这WatchService
根据目标文件系统上的事件提供要处理的文件。
在轮询包含这些文件的内部队列时,LastModifiedFileListFilter
可能会丢弃它们,因为它们相对于其配置的age
.
因此,我们将丢失该文件以备将来可能的考虑。
discard 回调钩子允许我们将文件保留在内部队列中,以便可以根据age
在随后的民意调查中。
这CompositeFileListFilter
还实现了一个DiscardAwareFileListFilter
并将 discard 回调填充到其所有DiscardAwareFileListFilter
代表。
因为CompositeFileListFilter 将文件与所有委托匹配,则discardCallback 可能会对同一个文件调用多次。 |
从版本 5.1 开始,FileReadingMessageSource
不检查目录是否存在,并且在其start()
调用(通常通过包装SourcePollingChannelAdapter
).
以前,在引用目录时(例如,从测试中引用)或稍后应用权限时,没有简单的方法来防止作系统权限错误。
消息报头
从版本 5.0 开始,FileReadingMessageSource
(除了payload
作为轮询File
) 将以下标头填充到出站Message
:
-
FileHeaders.FILENAME
:这File.getName()
的 file 中。 可用于后续的重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE
:这File
对象本身。 通常,当我们丢失原始File
对象。 但是,为了与任何其他自定义用例保持一致和方便,此标头可用于访问原始文件。 -
FileHeaders.RELATIVE_PATH
:引入了一个新的标头,用于表示文件路径相对于扫描的根目录的部分。 当要求在其他地方恢复源目录层次结构时,此标头可能很有用。 为此,DefaultFileNameGenerator
(请参阅“'生成文件名)可以配置为使用此标头。
目录扫描和轮询
这FileReadingMessageSource
不会立即为目录中的文件生成消息。
它使用内部队列来存储scanner
.
这scanEachPoll
选项用于确保在每次轮询时使用最新的输入目录内容刷新内部队列。
默认情况下 (scanEachPoll = false
)、FileReadingMessageSource
清空其队列,然后再次扫描目录。
此默认行为对于减少对目录中大量文件的扫描特别有用。
但是,在需要自定义排序的情况下,请务必考虑将此标志设置为true
.
文件的处理顺序可能与预期不符。
默认情况下,队列中的文件以其自然 (path
) 订单。
通过扫描添加的新文件,即使队列中已有文件,也会入到适当的位置,以保持该自然顺序。
要自定义顺序,FileReadingMessageSource
可以接受Comparator<File>
作为构造函数参数。
它由内部 (PriorityBlockingQueue
) 以根据业务要求对其内容进行重新排序。
因此,要按特定顺序处理文件,您应该为FileReadingMessageSource
而不是对自定义生成的列表进行排序DirectoryScanner
.
引入 5.0 版RecursiveDirectoryScanner
执行文件树访问。
该实现基于Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能性。
根目录 (DirectoryScanner.listFiles(File)
) 参数。
所有其他子目录包含和排除项都基于目标FileListFilter
实现。
例如,SimplePatternFileListFilter
默认情况下,会过滤掉目录。
看AbstractDirectoryAwareFileListFilter
及其实现了解更多信息。
从版本 5.5 开始,FileInboundChannelAdapterSpec 的 Java DSL 具有recursive(boolean) 选项以使用RecursiveDirectoryScanner 在目标FileReadingMessageSource 而不是默认的。 |
命名空间支持
通过使用特定于文件的命名空间,可以简化文件读取的配置。 为此,请使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间中,您可以减少FileReadingMessageSource
并将其包装在入站 Channel Adapter 中,如下所示:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的FileListFilter
实现:
-
IgnoreHiddenFileListFilter
(不处理隐藏文件) -
AcceptOnceFileListFilter
(防止重复)
因此,您也可以省略prevent-duplicates
和ignore-hidden
属性,因为它们是true
默认情况下。
Spring Integration 4.2 引入了 |
第二个通道适配器示例使用自定义过滤器,第三个示例使用filename-pattern
属性以添加AntPathMatcher
based 过滤器,第四个使用filename-regex
属性,将基于正则表达式模式的筛选条件添加到FileReadingMessageSource
.
这filename-pattern
和filename-regex
属性都与常规的filter
reference 属性。
但是,您可以使用filter
属性来引用CompositeFileListFilter
它组合了任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。
当多个进程从同一目录读取数据时,您可能希望锁定文件以防止它们被并发选取。
为此,您可以使用FileLocker
.
有一个java.nio
,但也可以实现自己的锁定方案。
这nio
可以按如下方式注入 Locker:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
您可以按如下方式配置自定义保险箱:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了locker时,它负责在允许接收文件之前获取锁。
它不承担解锁文件的责任。
如果已处理文件并保持锁挂起,则存在内存泄漏。
如果这是一个问题,您应该调用FileLocker.unlock(File file) 你自己在适当的时候。 |
当筛选和锁定文件还不够时,您可能需要完全控制文件的列出方式。
要实现此类要求,您可以使用DirectoryScanner
.
此扫描程序可让您准确确定每个轮询中列出的文件。
这也是 Spring Integration 在内部用来连接FileListFilter
instances 和FileLocker
到FileReadingMessageSource
.
您可以注入自定义DirectoryScanner
到<int-file:inbound-channel-adapter/>
在scanner
属性,如下例所示:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做可以让您完全自由地选择排序、列表和锁定策略。
了解过滤器(包括patterns
,regex
,prevent-duplicates
等)和locker
实例实际上由scanner
.
在适配器上设置的这些属性中的任何一个随后都会注入到内部的scanner
.
对于外部scanner
,则FileReadingMessageSource
.
必须在该自定义中指定它们(如果需要)DirectoryScanner
.
换句话说,如果您注入scanner
到FileReadingMessageSource
,您应该提供filter
和locker
在那个scanner
,而不是在FileReadingMessageSource
.
默认情况下,DefaultDirectoryScanner 使用IgnoreHiddenFileListFilter 以及一个AcceptOnceFileListFilter .
为了防止使用它们,您可以配置自己的过滤器(例如AcceptAllFileListFilter ),甚至将其设置为null . |
WatchServiceDirectoryScanner
这FileReadingMessageSource.WatchServiceDirectoryScanner
在将新文件添加到目录时依赖于文件系统事件。
在初始化期间,将注册目录以生成事件。
初始文件列表也是在初始化期间构建的。
在遍历目录树时,遇到的任何子目录也会被注册以生成事件。
在第一次轮询时,将返回遍历目录的初始文件列表。
在后续轮询中,将返回来自新创建事件的文件。
如果添加了新的子目录,则其 creation 事件用于遍历新子树以查找现有文件并注册找到的任何新子目录。
存在问题WatchKey 当其内部事件queue 不会在目录修改事件发生时被程序耗尽。
如果超出队列大小,则StandardWatchEventKinds.OVERFLOW ,表示部分文件系统事件可能会丢失。
在这种情况下,将完全重新扫描根目录。
为避免重复,请考虑使用适当的FileListFilter (例如AcceptOnceFileListFilter ) 或在处理完成时删除文件。 |
这WatchServiceDirectoryScanner
可以通过FileReadingMessageSource.use-watch-service
选项,它与scanner
选择。
内部FileReadingMessageSource.WatchServiceDirectoryScanner
实例为提供的directory
.
此外,现在WatchService
轮询逻辑可以跟踪StandardWatchEventKinds.ENTRY_MODIFY
和StandardWatchEventKinds.ENTRY_DELETE
.
如果您需要跟踪现有文件和新文件的修改,则应实现ENTRY_MODIFY
events 逻辑FileListFilter
.
否则,将以相同的方式处理这些事件中的文件。
这ResettableFileListFilter
implementation 拾取ENTRY_DELETE
事件。
因此,他们的文件是为remove()
操作。
启用此事件后,过滤器(如AcceptOnceFileListFilter
删除文件。
因此,如果出现同名文件,它将通过过滤器并作为消息发送。
为此,watch-events
属性 (FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
) 已引入。
(WatchEventType
是FileReadingMessageSource
.)
使用此选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他一些 logic 。
以下示例显示了如何为同一目录中的 create 和 modify 事件配置不同的逻辑:
值得一提的是,ENTRY_DELETE
事件涉及被监视目录的子目录的重命名作。
更具体地说,ENTRY_DELETE
event 的 event(与上一个目录名称相关)位于ENTRY_CREATE
事件,该事件通知新的(重命名的)目录。
在某些作系统(如 Windows)上,ENTRY_DELETE
event 必须注册才能处理这种情况。
否则,在文件资源管理器中重命名监视的子目录可能会导致在该子目录中无法检测到新文件。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从版本 6.1 开始,FileReadingMessageSource
公开了两个新的WatchService
-related 选项:
-
watchMaxDepth
- 一个参数Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)
应用程序接口; -
watchDirPredicate
-一个Predicate<Path>
要测试是否应遍历扫描树中的目录并将其注册到WatchService
和配置的 watch 事件类型。
限制内存消耗
您可以使用HeadDirectoryScanner
以限制内存中保留的文件数。
这在扫描大型目录时非常有用。
使用 XML 配置时,可以通过设置queue-size
入站通道适配器上的属性。
在版本 4.2 之前,此设置与任何其他过滤器的使用不兼容。
任何其他筛选条件(包括prevent-duplicates="true"
) 覆盖用于限制大小的筛选条件。
使用 通常,不使用 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
'tail'ing 文件
另一个常见的用例是从文件的末尾(或尾部)获取 'lines',并在添加新行时捕获新行。
提供了两种实现。
第一个OSDelegatingFileTailingMessageProducer
使用本机tail
命令(在具有 COMMAND 的作系统上)。
这通常是这些平台上最有效的实现。
对于没有tail
command 的 app 中,第二个实现ApacheCommonsFileTailingMessageProducer
使用 Apachecommons-io
Tailer
类。
在这两种情况下,文件系统事件(例如文件不可用和其他事件)都发布为ApplicationEvent
实例。
此类事件的示例包括:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
例如,在旋转文件时,可能会发生前面示例中显示的事件序列。
从版本 5.0 开始,FileTailingIdleEvent
在idleEventInterval
.
以下示例显示了此类事件的外观:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持tail 命令提供这些状态消息。 |
从这些终端节点发出的消息具有以下标头:
-
FileHeaders.ORIGINAL_FILE
:这File
对象 -
FileHeaders.FILENAME
:文件名 (File.getName()
)
在 5.0 版之前的版本中,FileHeaders.FILENAME header 包含文件绝对路径的字符串表示形式。
现在,您可以通过调用getAbsolutePath() 在原始文件头上。 |
以下示例使用默认选项 ('-F -n 0',表示从当前端开始遵循文件名) 创建本机适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例使用 '-F -n +0' 选项(表示遵循文件名,发出所有现有行)创建本机适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果tail
命令失败(在某些平台上,缺少文件会导致tail
失败,即使使用-F
指定),则每 10 秒重试一次该命令。
默认情况下,本机适配器从标准输出捕获并将内容作为消息发送。
他们还从标准错误中捕获以引发事件。
从版本 4.3.6 开始,您可以通过设置enable-status-reader
自false
,如下例所示:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval
设置为5000
,这意味着,如果 5 秒内没有写入任何行,则FileTailingIdleEvent
每 5 秒触发一次:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
当您需要停止适配器时,这可能很有用。
以下示例创建一个 Apachecommons-io
Tailer
适配器,它每两秒检查一次文件是否有新行,并每十秒检查一次是否存在缺失的文件:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | 文件从开头 (end="false" ) 而不是 end(这是默认值)。 |
2 | 将为每个块重新打开文件(默认为保持文件打开)。 |
指定delay ,end 或reopen attributes 强制使用 Apachecommons-io 适配器,并使native-options 属性不可用。 |
处理不完整的数据
文件传输方案中的一个常见问题是如何确定传输已完成,以便不会开始读取不完整的文件。
解决此问题的一种常见方法是使用临时名称编写文件,然后以原子方式将其重命名为最终名称。
此技术与遮盖临时文件不被使用者选取的过滤器一起,提供了一个强大的解决方案。
这种技术被写入文件(本地或远程)的 Spring 集成组件使用。
默认情况下,它们会附加.writing
添加到文件名中,并在传输完成后将其删除。