文件支持
文件支持
Spring Integration 的文件支持通过专用词汇表扩展了 Spring Integration 核心,以处理读取、写入和转换文件。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-file:6.0.9"
它提供了一个命名空间,使元素能够定义专用于文件的通道适配器,并支持可以将文件内容读取到字符串或字节数组中的转换器。
本节介绍FileReadingMessageSource
和FileWritingMessageHandler
以及如何将它们配置为 bean。
它还讨论了通过特定于文件的实现来处理文件的支持Transformer
.
最后,它解释了特定于文件的命名空间。
读取文件
一个FileReadingMessageSource
可用于使用文件系统中的文件。
这是MessageSource
从文件系统目录创建消息。
以下示例演示如何配置FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
要防止为某些文件创建消息,您可以提供FileListFilter
.
默认情况下,我们使用以下过滤器:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
这IgnoreHiddenFileListFilter
确保不处理隐藏文件。
请注意,隐藏的确切定义取决于系统。
例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏。
另一方面,Microsoft Windows 有一个专用的文件属性来指示隐藏文件。
4.2 版引入了 |
这AcceptOnceFileListFilter
确保仅从目录中选取一次文件。
这 从 4.0 版开始,此过滤器需要 从版本 4.1.5 开始,此过滤器具有一个新属性 ( |
持久文件列表过滤器现在具有布尔属性forRecursion
.
将此属性设置为true
,也设置alwaysAcceptDirectories
,这意味着出站网关上的递归作 (ls
和mget
) 现在每次都会遍历完整的目录树。
这是为了解决未检测到目录树深处更改的问题。
另外forRecursion=true
导致文件的完整路径用作元数据存储键;这解决了如果具有相同名称的文件多次出现在不同目录中,则过滤器无法正常工作的问题。
重要提示:这意味着对于顶级目录下的文件,将找不到持久元数据存储中的现有密钥。
因此,该属性是false
默认情况下;这可能会在将来的版本中更改。
以下示例配置FileReadingMessageSource
使用过滤器:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的一个常见问题是,在文件准备就绪之前可能会被检测到(即,其他进程可能仍在写入文件)。
默认值AcceptOnceFileListFilter
并不能阻止这一点。
在大多数情况下,如果文件写入过程在每个文件准备好读取后立即重命名,则可以防止这种情况发生。
一个filename-pattern
或filename-regex
过滤器,该过滤器仅接受已准备就绪的文件(可能基于已知后缀),使用默认值AcceptOnceFileListFilter
,允许这种情况。
这CompositeFileListFilter
启用组合,如以下示例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种选择。
4.2 版本添加了LastModifiedFileListFilter
.
此过滤器可以配置age
属性,以便过滤器仅传递早于此值的文件。
年龄默认为 60 秒,但您应该选择足够大的年龄以避免提前拾取文件(例如,由于网络故障)。
以下示例演示如何配置LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从 4.3.7 版本开始,ChainFileListFilter
(的扩展CompositeFileListFilter
) 的引入,以允许后续筛选器应仅看到上一个筛选器的结果的情况。
(使用CompositeFileListFilter
,所有过滤器都会看到所有文件,但它只传递已通过所有过滤器的文件)。
需要新行为的一个示例是LastModifiedFileListFilter
和AcceptOnceFileListFilter
,当我们不想接受文件时,直到经过一段时间。
使用CompositeFileListFilter
,因为AcceptOnceFileListFilter
在第一次通过时查看所有文件,当另一个过滤器通过时,它不会再传递它。
这CompositeFileListFilter
当模式过滤器与查找辅助文件以指示文件传输完成的自定义过滤器结合使用时,方法非常有用。
模式筛选器可能只传递主文件(例如something.txt
),但“完成”过滤器需要查看(例如)something.done
存在。
假设我们有文件a.txt
,a.done
和b.txt
.
模式过滤器仅通过a.txt
和b.txt
,而“完成”过滤器会看到所有三个文件并仅通过a.txt
.
复合Filter的最终结果是,只有a.txt
被释放。
使用ChainFileListFilter ,如果链中的任何过滤器返回空列表,则不会调用其余过滤器。 |
5.0 版引入了ExpressionFileListFilter
以对文件执行 SpEL 表达式作为上下文求值根对象。
为此,用于文件处理的所有 XML 组件(本地和远程)以及现有的filter
属性,已提供filter-expression
选项,如以下示例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
5.0.5 版引入了DiscardAwareFileListFilter
对被拒绝的文件感兴趣的实现。
为此,应通过addDiscardCallback(Consumer<File>)
.
在框架中,此功能是从FileReadingMessageSource.WatchServiceDirectoryScanner
,结合LastModifiedFileListFilter
.
与常规不同DirectoryScanner
这WatchService
提供根据目标文件系统上的事件进行处理的文件。
在轮询具有这些文件的内部队列时,LastModifiedFileListFilter
可能会丢弃它们,因为它们相对于其配置的来说太年轻了age
. 因此,我们丢失了该文件以备将来可能考虑。discard 回调钩子允许我们将文件保留在内部队列中,以便可以根据age
在随后的民意调查中。 这CompositeFileListFilter
还实现了一个DiscardAwareFileListFilter
并将丢弃回调填充到其所有DiscardAwareFileListFilter
代表。
因为CompositeFileListFilter 将文件与所有委托进行匹配,则discardCallback 可以对同一文件多次调用。 |
从 5.1 版本开始,FileReadingMessageSource
不检查目录是否存在,并且直到其start()
被调用(通常通过包装SourcePollingChannelAdapter
).
以前,没有简单的方法可以防止在引用目录时(例如从测试中引用目录)或稍后应用权限时出现作系统权限错误。
邮件头
从 5.0 版开始,FileReadingMessageSource
(除了payload
作为轮询的File
) 将以下标头填充到出站Message
:
-
FileHeaders.FILENAME
:这File.getName()
要发送的文件。 可用于后续重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE
:这File
对象本身。 通常,当我们丢失原始组件时,此标头会自动由框架组件(例如拆分器或转换器)填充File
对象。 但是,为了与任何其他自定义用例的一致性和便利性,此标头对于访问原始文件非常有用。 -
FileHeaders.RELATIVE_PATH
:引入了一个新标头,用于表示文件路径相对于扫描根目录的部分。 当需要在其他位置还原源目录层次结构时,此标头可能很有用。 为此,该DefaultFileNameGenerator
(参见“'生成文件名)可以配置为使用此标头。
目录扫描和轮询
这FileReadingMessageSource
不会立即为目录中的文件生成消息。
它使用内部队列来存储scanner
. 这scanEachPoll
选项用于确保在每次轮询时使用最新的输入目录内容刷新内部队列。
默认情况下 (scanEachPoll = false
)、FileReadingMessageSource
清空其队列,然后再扫描目录。
此默认行为对于减少目录中大量文件的扫描特别有用。
但是,在需要自定义排序的情况下,请务必考虑将此标志设置为true
.
文件的处理顺序可能与预期不同。
默认情况下,队列中的文件以其自然 (path
) 订单。
扫描添加的新文件(即使队列已有文件)也会插入到适当的位置以保持该自然顺序。
要自定义订单,请FileReadingMessageSource
可以接受Comparator<File>
作为构造函数参数。
它由内部 (PriorityBlockingQueue
)根据业务需求重新排序其内容。
因此,要按特定顺序处理文件,您应该提供一个比较器FileReadingMessageSource
而不是对自定义生成的列表进行排序DirectoryScanner
.
5.0 版推出RecursiveDirectoryScanner
以执行文件树访问。
实现基于Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能性。
根目录 (DirectoryScanner.listFiles(File)
) 参数从结果中排除。
所有其他子目录包含和排除均基于目标FileListFilter
实现。
例如,SimplePatternFileListFilter
默认过滤掉目录。
看AbstractDirectoryAwareFileListFilter
及其实现以获取更多信息。
从 5.5 版本开始,FileInboundChannelAdapterSpec 的 Java DSL 有一个方便的recursive(boolean) 使用RecursiveDirectoryScanner 在目标中FileReadingMessageSource 而不是默认的。 |
命名空间支持
使用特定于文件的命名空间可以简化文件读取的配置。 为此,请使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间中,您可以减少FileReadingMessageSource
并将其包装在入站通道适配器中,如下所示:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的FileListFilter
实现:
-
IgnoreHiddenFileListFilter
(不处理隐藏文件) -
AcceptOnceFileListFilter
(防止重复)
因此,您也可以省略prevent-duplicates
和ignore-hidden
属性,因为它们是true
默认情况下。
Spring Integration 4.2 引入了 |
第二个通道适配器示例使用自定义过滤器,第三个使用自定义过滤器filename-pattern
属性添加AntPathMatcher
基于过滤器,第四个使用该过滤器的filename-regex
属性,将基于正则表达式模式的过滤器添加到FileReadingMessageSource
. 这filename-pattern
和filename-regex
每个属性都与常规filter
reference 属性。
但是,您可以使用filter
属性来引用CompositeFileListFilter
它结合了任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。
当多个进程从同一目录读取时,您可能需要锁定文件以防止它们被并发选取。
为此,您可以使用FileLocker
.
有一个java.nio
基于的实现可用,但也可以实现您自己的锁定方案。
这nio
储物柜可以按如下方式注入:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
您可以按如下方式配置自定义储物柜:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了储物柜时,它负责在允许接收文件之前获取锁。
它不承担解锁文件的责任。
如果您已经处理了文件并保持锁悬挂,则存在内存泄漏。
如果这是一个问题,您应该调用FileLocker.unlock(File file) 在适当的时间自己。 |
如果过滤和锁定文件还不够,您可能需要控制文件的完全列出方式。
要实现此类需求,您可以使用DirectoryScanner
.
此扫描程序可让您准确确定每次轮询中列出的文件。
这也是 Spring Integration 内部用于连接的接口FileListFilter
instances 和FileLocker
到FileReadingMessageSource
.
您可以注入自定义DirectoryScanner
进入<int-file:inbound-channel-adapter/>
在scanner
属性,如以下示例所示:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做使您可以完全自由地选择排序、列表和锁定策略。
同样重要的是要了解过滤器(包括patterns
,regex
,prevent-duplicates
等)和locker
实例实际上由scanner
.
在适配器上设置的任何这些属性随后都会注入到内部scanner
.
对于外部scanner
,则禁止在FileReadingMessageSource
.
必须在该自定义中指定(如果需要)DirectoryScanner
.
换句话说,如果您注入scanner
进入FileReadingMessageSource
,您应该提供filter
和locker
关于这一点scanner
,而不是在FileReadingMessageSource
.
默认情况下,DefaultDirectoryScanner 使用IgnoreHiddenFileListFilter 和AcceptOnceFileListFilter .
若要防止使用它们,您可以配置自己的过滤器(例如AcceptAllFileListFilter ),甚至将其设置为null . |
WatchServiceDirectoryScanner
这FileReadingMessageSource.WatchServiceDirectoryScanner
当将新文件添加到目录时依赖于文件系统事件。
在初始化期间,将注册目录以生成事件。
初始文件列表也是在初始化期间构建的。
在遍历目录树时,遇到的任何子目录也会被注册以生成事件。
在第一次轮询时,返回遍历目录的初始文件列表。
在后续轮询中,将返回来自新创建事件的文件。
如果添加了新的子目录,则其创建事件将用于遍历新子树以查找现有文件并注册找到的任何新子目录。
有一个问题WatchKey 当其内部事件queue 不会在目录修改事件发生时被程序清空。
如果超过队列大小,则StandardWatchEventKinds.OVERFLOW 发出以指示某些文件系统事件可能丢失。
在这种情况下,将完全重新扫描根目录。
为避免重复,请考虑使用适当的FileListFilter (例如AcceptOnceFileListFilter ) 或在处理完成后删除文件。 |
这WatchServiceDirectoryScanner
可以通过FileReadingMessageSource.use-watch-service
选项,它与scanner
选择。
内部FileReadingMessageSource.WatchServiceDirectoryScanner
实例为提供的directory
.
此外,现在WatchService
轮询逻辑可以跟踪StandardWatchEventKinds.ENTRY_MODIFY
和StandardWatchEventKinds.ENTRY_DELETE
.
如果您需要跟踪现有文件和新文件的修改,则应实现ENTRY_MODIFY
events 逻辑中的FileListFilter
.
否则,将以相同的方式处理这些事件中的文件。
这ResettableFileListFilter
实现会选择ENTRY_DELETE
事件。
因此,他们的文件是为remove()
操作。
启用此事件后,过滤器(例如AcceptOnceFileListFilter
删除该文件。
因此,如果出现同名文件,则该文件将通过筛选器并作为消息发送。
为此,该watch-events
属性 (FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
)已被引入。
(WatchEventType
是FileReadingMessageSource
.)
有了这样的选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他逻辑。
以下示例展示了如何为同一目录中的创建和修改事件配置不同的逻辑:
值得一提的是,ENTRY_DELETE
事件涉及监视目录子目录的重命名作。
更具体地说,ENTRY_DELETE
事件,与上一个目录名称相关,位于ENTRY_CREATE
通知新(重命名)目录的事件。
在某些作系统(如 Windows)上,ENTRY_DELETE
必须注册事件来处理这种情况。
否则,在文件资源管理器中重命名监视的子目录可能会导致在该子目录中未检测到新文件。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
限制内存消耗
您可以使用HeadDirectoryScanner
以限制内存中保留的文件数。
这在扫描大型目录时非常有用。
对于 XML 配置,这是通过设置queue-size
入站通道适配器上的属性。
在 4.2 版之前,此设置与任何其他过滤器的使用不兼容。任何其他过滤器(包括prevent-duplicates="true"
) 覆盖了用于限制大小的过滤器。
使用 通常,而不是使用 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
'tailing'文件
另一个流行的用例是从文件的末尾(或尾部)获取“行”,并在添加新行时捕获新行。
提供了两种实现。
第一个,OSDelegatingFileTailingMessageProducer
,使用本机tail
命令(在具有命令的作系统上)。
这通常是这些平台上最有效的实现。
对于没有tail
命令,第二个实现,ApacheCommonsFileTailingMessageProducer
,使用 Apachecommons-io
Tailer
类。
在这两种情况下,文件系统事件(例如文件不可用和其他事件)都发布为ApplicationEvent
实例使用正常的 Spring 事件发布机制。
此类事件的示例包括:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
例如,在轮换文件时,可能会发生上述示例中显示的事件序列。
从 5.0 版开始,一个FileTailingIdleEvent
在文件中没有数据时发出idleEventInterval
.
以下示例显示了此类事件的外观:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持tail 命令提供这些状态消息。 |
从这些终结点发出的消息具有以下标头:
-
FileHeaders.ORIGINAL_FILE
:这File
对象 -
FileHeaders.FILENAME
:文件名 (File.getName()
)
在 5.0 之前的版本中,FileHeaders.FILENAME header 包含文件绝对路径的字符串表示。
现在,您可以通过调用getAbsolutePath() 在原始文件头上。 |
以下示例使用默认选项(“-F -n 0”,表示跟随当前末尾的文件名)创建本机适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例创建一个带有“-F -n +0”选项的本机适配器(意味着遵循文件名,发出所有现有行)。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果tail
命令失败(在某些平台上,丢失的文件会导致tail
失败,即使有-F
指定),则每 10 秒重试一次命令。
默认情况下,本机适配器从标准输出中捕获内容,并将内容作为消息发送。
它们还捕获标准误差以引发事件。
从版本 4.3.6 开始,您可以通过将enable-status-reader
自false
,如以下示例所示:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval
设置为5000
,这意味着,如果五秒钟内没有写任何行,FileTailingIdleEvent
每五秒触发一次:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
当您需要停止适配器时,这会很有用。
以下示例创建 Apachecommons-io
Tailer
适配器,每两秒检查一次文件中的新行,并每十秒检查是否存在丢失的文件:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | 该文件从头开始就尾随 (end="false" ) 而不是结束(这是默认值)。 |
2 | 每个块都会重新打开该文件(默认值是保持文件打开)。 |
指定delay ,end 或reopen attributes 强制使用 Apachecommons-io 适配器,并使native-options 属性不可用。 |
写入文件
要将消息写入文件系统,您可以使用FileWritingMessageHandler
.
此类可以处理以下有效负载类型:
-
File
-
String
-
字节数组
-
InputStream
(从 4.2 版本开始)
对于字符串有效负载,您可以配置编码和字符集。
为了简化作,您可以配置FileWritingMessageHandler
作为出站通道适配器或出站网关的一部分,使用 XML 命名空间。
从 4.3 版开始,您可以指定写入文件时要使用的缓冲区大小。
从 5.1 版开始,您可以提供BiConsumer<File, Message<?>>
newFileCallback
如果您使用FileExistsMode.APPEND
或FileExistsMode.APPEND_NO_FLUSH
并且必须创建一个新文件。
此回调接收新创建的文件和触发该文件的消息。
例如,此回调可用于编写消息标头中定义的 CSV 标头。
生成文件名
在最简单的形式中,FileWritingMessageHandler
只需要一个目标目录来写入文件。
要写入的文件的名称由处理程序的FileNameGenerator
.
默认实现查找键与定义为FileHeaders.FILENAME
.
或者,您可以指定要根据消息计算的表达式以生成文件名,例如headers['myCustomHeader'] + '.something'
.
表达式的计算结果必须为String
.
为方便起见,该DefaultFileNameGenerator
还提供setHeaderName
方法,允许您显式指定其值用作文件名的消息头。
设置完成后,DefaultFileNameGenerator
采用以下解析步骤来确定给定消息有效负载的文件名:
-
根据消息计算表达式,如果结果为非空
String
,将其用作文件名。 -
否则,如果有效负载是
java.io.File
,使用File
对象的文件名。 -
否则,请使用附加的消息 ID 。
msg
作为文件名。
使用 XML 命名空间支持时,文件出站通道适配器和文件出站网关都支持以下互斥配置属性:
-
filename-generator
(对FileNameGenerator
实施) -
filename-generator-expression
(计算结果为String
)
写入文件时,使用临时文件后缀(其默认值为.writing
).
在写入文件时,它会附加到文件名中。
要自定义后缀,您可以将temporary-file-suffix
文件出站通道适配器和文件出站网关上的属性。
使用APPEND 文件mode 这temporary-file-suffix 属性被忽略,因为数据直接附加到文件中。 |
从 4.2.5 版开始,生成的文件名(由于filename-generator
或filename-generator-expression
evaluation)可以表示子路径和目标文件名。
它用作File(File parent, String child)
如故。
但是,过去我们没有创建 (mkdirs()
) 目录,仅假设文件名。
当我们需要恢复文件系统树以匹配源目录时,这种方法很有用——例如,当解压缩存档并按原始顺序保存目标目录中的所有文件时。
指定输出目录
文件出站通道适配器和文件出站网关都提供了两个互斥的配置属性来指定输出目录:
-
directory
-
directory-expression
Spring Integration 2.2 引入了directory-expression 属性。 |
使用directory
属性
当您使用directory
属性,则输出目录设置为固定值,该值在FileWritingMessageHandler
已初始化。如果未指定此属性,则必须使用directory-expression
属性。
使用directory-expression
属性
如果您想获得完整的 SpEL 支持,您可以使用directory-expression
属性。 此属性接受针对正在处理的每条消息计算的 SpEL 表达式。因此,在动态指定输出文件目录时,您可以完全访问消息的有效负载及其标头。
SpEL 表达式必须解析为String
,java.io.File
或org.springframework.core.io.Resource
. (后者被计算为File
无论如何。此外,由此产生的String
或File
必须指向一个目录。如果未指定directory-expression
属性,则必须将directory
属性。
使用auto-create-directory
属性
默认情况下,如果目标目录不存在,则会自动创建相应的目标目录和任何不存在的父目录。要防止这种行为,您可以将auto-create-directory
属性设置为false
. 此属性适用于directory
和directory-expression
属性。
使用 现在,不再在初始化适配器时检查目标目录是否存在,而是对正在处理的每条消息执行此检查。 此外,如果 |
处理现有目标文件
当您写入文件并且目标文件已存在时,默认行为是覆盖该目标文件。您可以通过将mode
属性。存在以下选项:
-
REPLACE
(默认) -
REPLACE_IF_MODIFIED
-
APPEND
-
APPEND_NO_FLUSH
-
FAIL
-
IGNORE
Spring Integration 2.2 引入了mode 属性和APPEND ,FAIL 和IGNORE 选项。 |
REPLACE
-
如果目标文件已经存在,则将其覆盖。如果
mode
属性,这是写入文件时的默认行为。 REPLACE_IF_MODIFIED
-
如果目标文件已存在,则仅当上次修改的时间戳与源文件的时间戳不同时,才会覆盖该文件。 为
File
有效载荷, 有效载荷lastModified
时间与现有文件进行比较。对于其他有效负载,FileHeaders.SET_MODIFIED
(file_setModified
) 标头与现有文件进行比较。如果标头缺失或具有不是Number
,则始终替换该文件。 APPEND
-
此模式允许您将消息内容附加到现有文件,而不是每次都创建新文件。请注意,此属性与
temporary-file-suffix
属性,因为当它将内容追加到现有文件时,适配器不再使用临时文件。该文件在每条消息后关闭。 APPEND_NO_FLUSH
-
此选项具有与
APPEND
,但数据不会刷新,并且文件不会在每条消息后关闭。这可以提供显着的性能,但在发生故障时存在数据丢失的风险。 看使用时刷新文件APPEND_NO_FLUSH
了解更多信息。 FAIL
-
如果目标文件存在,则
MessageHandlingException
被抛出。 IGNORE
-
如果目标文件存在,则以静默方式忽略消息有效负载。
使用临时文件后缀时(默认值为.writing )、IGNORE 如果存在最终文件名或临时文件名,则选项适用。 |
使用时刷新文件APPEND_NO_FLUSH
这APPEND_NO_FLUSH
模式是在 4.3 版中添加的。使用它可以提高性能,因为文件不会在每条消息后关闭。但是,这可能会导致发生故障时数据丢失。
Spring Integration 提供了几种刷新策略来减轻这种数据丢失:
-
用
flushInterval
. 如果在此时间段内未写入文件,则会自动刷新该文件。这是近似值,可能高达1.33x
这一次(平均1.167x
). -
将包含正则表达式的消息发送到消息处理程序的
trigger
方法。 具有与模式匹配的绝对路径名的文件将被刷新。 -
为处理程序提供自定义
MessageFlushPredicate
实现来修改将消息发送到trigger
方法。 -
调用处理程序的
flushIfNeeded
方法通过传入自定义FileWritingMessageHandler.FlushPredicate
或FileWritingMessageHandler.MessageFlushPredicate
实现。
为每个打开的文件调用谓词。有关更多信息,请参阅这些接口的 Javadoc。请注意,从 V5.0 开始,谓词方法提供了另一个参数:如果是新的或以前关闭的,则当前文件首次写入的时间。
使用时flushInterval
,则间隔从上次写入开始。仅当文件在间隔内处于空闲状态时,才会刷新该文件。从版本 4.3.7 开始,附加属性 (flushWhenIdle
) 可以设置为false
,这意味着间隔从第一次写入先前刷新的(或新)文件开始。
文件时间戳
默认情况下,目标文件的lastModified
timestamp 是创建文件的时间(就地重命名保留当前时间戳除外)。
从 4.3 版开始,您现在可以配置preserve-timestamp
(或setPreserveTimestamp(true)
使用 Java 配置时)。
为File
有效负载,这会将时间戳从入站文件传输到出站文件(无论是否需要副本)。
对于其他有效负载,如果FileHeaders.SET_MODIFIED
标头 (file_setModified
) 存在,则用于设置目标文件的lastModified
timestamp,只要标头是Number
.
文件权限
从 V5.0 开始,将文件写入支持 Posix 权限的文件系统时,可以在出站通道适配器或网关上指定这些权限。
该属性是一个整数,通常以熟悉的八进制格式提供,例如0640
,这意味着所有者具有读/写权限,组具有只读权限,而其他人没有访问权限。
文件出站通道适配器
以下示例配置文件出站通道适配器:
<int-file:outbound-channel-adapter id="filesOut" directory="${input.directory.property}"/>
基于命名空间的配置还支持delete-source-files
属性。
如果设置为true
,则在写入目标后触发删除原始源文件。
该标志的默认值为false
.
以下示例演示如何将其设置为true
:
<int-file:outbound-channel-adapter id="filesOut"
directory="${output.directory}"
delete-source-files="true"/>
这delete-source-files 属性仅当入站消息具有File 有效负载或如果FileHeaders.ORIGINAL_FILE header 值包含源File 实例或String 表示原始文件路径。 |
从 4.2 版开始,FileWritingMessageHandler
支持append-new-line
选择。
如果设置为true
,则在写入消息后将向文件附加一行新行。
默认属性值为false
.
以下示例演示如何使用append-new-line
选择:
<int-file:outbound-channel-adapter id="newlineAdapter"
append-new-line="true"
directory="${output.directory}"/>
出站网关
如果要继续根据写入的文件处理消息,可以使用outbound-gateway
相反。
它的作用类似于outbound-channel-adapter
.
但是,在写入文件后,它还将其作为消息的有效负载发送到回复通道。
以下示例配置出站网关:
<int-file:outbound-gateway id="mover" request-channel="moveInput"
reply-channel="output"
directory="${output.directory}"
mode="REPLACE" delete-source-files="true"/>
如前所述,您还可以指定mode
属性,它定义了如何处理目标文件已存在的情况的行为。
有关更多详细信息,请参阅处理现有目标文件。
通常,当使用文件出站网关时,结果文件会作为回复通道上的消息负载返回。
这也适用于指定IGNORE
模式。
在这种情况下,将返回预先存在的目标文件。
如果请求消息的有效负载是文件,则您仍然可以通过消息头访问该原始文件。
参见FileHeaders.ORIGINAL_FILE。
“outbound-gateway”在您希望首先移动文件然后通过处理管道发送它的情况下效果很好。
在这种情况下,您可以连接文件命名空间的inbound-channel-adapter 元素添加到outbound-gateway ,然后连接该网关的reply-channel 到管道的开头。 |
如果您有更详细的要求或需要支持其他有效负载类型作为输入以转换为文件内容,则可以扩展FileWritingMessageHandler
,但更好的选择是依赖Transformer
.
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class FileWritingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.writeToFile("foo.txt", new File(tmpDir.getRoot(), "fileWritingFlow"), "foo");
}
@Bean
@ServiceActivator(inputChannel = "writeToFileChannel")
public MessageHandler fileWritingMessageHandler() {
Expression directoryExpression = new SpelExpressionParser().parseExpression("headers.directory");
FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
handler.setFileExistsMode(FileExistsMode.APPEND);
return handler;
}
@MessagingGateway(defaultRequestChannel = "writeToFileChannel")
public interface MyGateway {
void writeToFile(@Header(FileHeaders.FILENAME) String fileName,
@Header(FileHeaders.FILENAME) File directory, String data);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class FileWritingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MessageChannel fileWritingInput = context.getBean("fileWritingInput", MessageChannel.class);
fileWritingInput.send(new GenericMessage<>("foo"));
}
@Bean
public IntegrationFlow fileWritingFlow() {
return IntegrationFlow.from("fileWritingInput")
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.txt")
.header("directory", new File(tmpDir.getRoot(), "fileWritingFlow")))
.handle(Files.outboundGateway(m -> m.getHeaders().get("directory")))
.channel(MessageChannels.queue("fileWritingResultChannel"))
.get();
}
}
文件转换器
要将从文件系统读取的数据转换为对象,反之亦然,您需要做一些工作。
与FileReadingMessageSource
并且在较小程度上FileWritingMessageHandler
,您可能需要自己的机制来完成工作。
为此,您可以实现Transformer
接口。
或者,您可以扩展AbstractFilePayloadTransformer
对于入站消息。
Spring Integration 提供了一些明显的实现。
请参阅Javadoc 的Transformer
接口查看哪些 Spring Integration 类实现了它。
同样,您可以检查Javadoc 的AbstractFilePayloadTransformer
类查看哪些 Spring Integration 类扩展了它。
FileToByteArrayTransformer
延伸AbstractFilePayloadTransformer
并转换File
object 转换为byte[]
通过使用 Spring 的FileCopyUtils
.
使用一系列转换器通常比将所有转换放在一个类中要好。
在这种情况下,File
自byte[]
转换可能是合乎逻辑的第一步。
FileToStringTransformer
延伸AbstractFilePayloadTransformer
转换一个File
对String
.
如果不出意外的话,这对于调试很有用(考虑将其与窃听器一起使用)。
要配置特定于文件的转换器,您可以使用文件命名空间中的相应元素,如以下示例所示:
<int-file:file-to-bytes-transformer input-channel="input" output-channel="output"
delete-files="true"/>
<int-file:file-to-string-transformer input-channel="input" output-channel="output"
delete-files="true" charset="UTF-8"/>
这delete-files
选项向转换器发出信号,表明它应该在转换完成后删除入站文件。
这绝不能替代使用AcceptOnceFileListFilter
当FileReadingMessageSource
在多线程环境中使用(例如,当您通常使用 Spring Integration 时)。
文件拆分器
这FileSplitter
在 4.1.2 版中添加了,并且在 4.2 版中添加了其命名空间支持。
这FileSplitter
将文本文件拆分为单独的行,基于BufferedReader.readLine()
.
默认情况下,拆分器使用Iterator
在从文件中读取一行时一次发出一行。
设置iterator
属性设置为false
导致它在将所有行作为消息发出之前将其读取到内存中。
这方面的一个用例可能是,如果要在发送任何包含行的消息之前检测文件上的 I/O 错误。
但是,它仅适用于相对较短的文件。
入站有效负载可以是File
,String
(一个File
路径)、InputStream
或Reader
.
其他有效负载类型将保持不变。
以下列表显示了配置FileSplitter
:
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
1 | 拆分器的 bean 名称。 |
2 | 设置为true (默认)使用迭代器或false 在发送行之前将文件加载到内存中。 |
3 | 设置为true 在文件数据之前和之后发出文件开始和文件结束标记消息。
标记是带有FileSplitter.FileMarker 有效负载(使用START 和END 值mark 属性)。
在过滤某些行的下游流中按顺序处理文件时,可以使用标记。
它们使下游处理能够知道文件何时已完全处理。
此外,一个file_marker 包含START 或END 添加到这些消息中。
这END 标记包括行数。
如果文件为空,则仅START 和END 标记会以0 作为lineCount .
默认值为false .
什么时候true ,apply-sequence 是false 默认情况下。
也可以看看markers-json (下一个属性)。 |
4 | 什么时候markers 为 true,则将此设置为true 要拥有FileMarker 对象转换为 JSON 字符串。
(使用SimpleJsonSerializer 下面)。 |
5 | 设置为false 禁用包含sequenceSize 和sequenceNumber 报头。
默认值为true 除非markers 是true .
什么时候true 和markers 是true ,标记包含在测序中。
什么时候true 和iterator 是true 这sequenceSize header 设置为0 ,因为尺寸未知。 |
6 | 设置为true 导致RequiresReplyException 如果文件中没有行,则引发。
默认值为false . |
7 | 设置将文本数据读入String 负载。
默认值是平台字符集。 |
8 | 要作为其余行发出的邮件中的标头携带的第一行的标头名称。 从 5.0 版本开始。 |
9 | 设置用于向拆分器发送消息的输入通道。 |
10 | 设置将消息发送到的输出通道。 |
11 | 设置发送超时。
仅适用于output-channel 可以阻止 — 例如 fullQueueChannel . |
12 | 设置为false 以禁用在刷新上下文时自动启动拆分器。
默认值为true . |
13 | 设置此端点的顺序,如果input-channel 是一个<publish-subscribe-channel/> . |
14 | 设置分路器的启动阶段(在以下情况下使用auto-startup 是true ). |
这FileSplitter
还拆分任何基于文本的InputStream
成行。
从版本 4.3 开始,当与 FTP 或 SFTP 流式入站通道适配器或使用stream
选项检索文件时,拆分器会在文件完全使用时自动关闭支持流的会话
有关这些设施的详细信息,请参阅 FTP 流式处理入站通道适配器和 SFTP 流式处理入站通道适配器以及 FTP 出站网关和 SFTP 出站网关。
使用 Java 配置时,可以使用其他构造函数,如以下示例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
什么时候markersJson
为 true,则标记表示为 JSON 字符串(使用SimpleJsonSerializer
).
5.0 版本引入了firstLineAsHeader
选项来指定内容的第一行是标题(例如 CSV 文件中的列名)。
传递给此属性的参数是标头名称,在该标头名称下,第一行作为其余行发出的消息中的标头。
此行不包含在序列标头中(如果applySequence
为 true),也不在lineCount
关联FileMarker.END
.
注意:从版本 5.5 开始,lineCount' 也包含在FileHeaders.LINE_COUNT
到FileMarker.END
message,因为FileMarker
可以序列化为 JSON。
如果文件仅包含标题行,则该文件被视为空,因此仅FileMarker
在拆分期间发出实例(如果启用了标记,否则不会发出任何消息)。
默认情况下(如果未设置标头名称),则第一行被视为数据,并成为第一个发出的消息的有效负载。
如果您需要有关从文件内容中提取标头的更复杂的逻辑(不是第一行,不是行的全部内容,不是一个特定标头等),请考虑在FileSplitter
.
请注意,已移动到标题的行可能会从正常内容进程的下游进行过滤。
幂等下游处理拆分文件
什么时候apply-sequence
为 true,则拆分器在SEQUENCE_NUMBER
标头(当markers
为 true,则标记计为行)。
行号可以与幂等接收器一起使用,以避免在重新启动后重新处理行。
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}
文件聚合器
从 5.5 版开始,一个FileAggregator
被引入以涵盖FileSplitter
启用 START/END 标记时的用例。
为方便起见,该FileAggregator
实现所有三种序列详细信息策略:
-
这
HeaderAttributeCorrelationStrategy
使用FileHeaders.FILENAME
属性用于关联键计算。 当在FileSplitter
,它不会填充序列详细信息标头,因为 START/END 标记消息也包含在序列大小中。 这FileHeaders.FILENAME
仍会为发出的每一行填充,包括 START/END 标记消息。 -
这
FileMarkerReleaseStrategy
- 检查FileSplitter.FileMarker.Mark.END
消息,然后比较FileHeaders.LINE_COUNT
标头值,组大小减去2
-FileSplitter.FileMarker
实例。 它还实现了方便的GroupConditionProvider
联系方式conditionSupplier
函数要用于AbstractCorrelatingMessageHandler
. 有关详细信息,请参阅消息组条件。 -
这
FileAggregatingMessageGroupProcessor
只需删除FileSplitter.FileMarker
消息,并将其余消息收集到列表有效负载中进行生成。
以下列表显示了配置FileAggregator
:
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
integrationFlow {
split(Files.splitter().markers().firstLineAsHeader("firstLine"))
channel { executor(taskExecutor) }
filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
transform(String::toUpperCase)
channel("aggregatorChannel")
aggregate(FileAggregator())
channel { queue("resultChannel") }
}
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new FileAggregator());
aggregator.setOutputChannel(outputChannel);
return aggregator;
}
<int:chain input-channel="input" output-channel="output">
<int-file:splitter markers="true"/>
<int:aggregator>
<bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
</int:aggregator>
</int:chain>
如果默认行为的FileAggregator
不满足目标逻辑,建议使用单个策略配置聚合器端点。
看FileAggregator
JavaDocs 了解更多信息。
远程持久文件列表过滤器
入站和流式入站远程文件通道适配器 (FTP
,SFTP
和其他技术)配置了相应的实现AbstractPersistentFileListFilter
默认情况下,配置了内存中的MetadataStore
.
要在集群中运行,可以使用共享的MetadataStore
(有关详细信息,请参阅元数据存储)。
这些过滤器用于防止多次获取同一文件(除非修改时间更改)。
从 5.2 版开始,在获取文件之前,会立即将文件添加到过滤器中(如果获取失败,则将其反转)。
如果发生灾难性故障(例如断电),当前正在获取的文件可能会保留在过滤器中,并且在重新启动应用程序时不会重新获取。
在这种情况下,您需要手动从MetadataStore . |
在以前的版本中,在获取任何文件之前都会过滤这些文件,这意味着在发生灾难性故障后,多个文件可能处于此状态。
为了促进这种新行为,在FileListFilter
.
boolean accept(F file);
boolean supportsSingleFileFiltering();
如果过滤器返回true
在supportsSingleFileFiltering
,它必须实现accept()
.
如果远程过滤器不支持单个文件过滤(例如AbstractMarkerFilePresentFileListFilter
),适配器将恢复到以前的行为。
如果使用了多个过滤器(使用CompositeFileListFilter
或ChainFileListFilter
),则所有委托筛选器都必须支持单个文件筛选,复合筛选器才能支持它。
持久文件列表过滤器现在具有布尔属性forRecursion
.
将此属性设置为true
,也设置alwaysAcceptDirectories
,这意味着出站网关上的递归作 (ls
和mget
) 现在每次都会遍历完整的目录树。
这是为了解决未检测到目录树深处更改的问题。
另外forRecursion=true
导致文件的完整路径用作元数据存储键;这解决了如果具有相同名称的文件多次出现在不同目录中,则过滤器无法正常工作的问题。
重要提示:这意味着对于顶级目录下的文件,将找不到持久元数据存储中的现有密钥。
因此,该属性是false
默认情况下;这可能会在将来的版本中更改。