它是在版本 4.1.2 中添加的,其命名空间支持是在版本 4.2 中添加的。
根据 将文本文件拆分为单独的行。
默认情况下,拆分器使用 an 在从文件中读取行时一次发出一行。
将属性设置为 to 会导致它在将所有行作为消息发出之前将它们读入内存。
一个用例可能是,您希望在发送任何包含行的消息之前检测文件上的 I/O 错误。
但是,它仅适用于相对较短的文件。FileSplitter
FileSplitter
BufferedReader.readLine()
Iterator
iterator
false
入站负载可以是 、 (路径)、 或 。
其他负载类型保持不变。File
String
File
InputStream
Reader
以下清单显示了配置 :FileSplitter
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
1 | splitter 的 bean 名称。 |
2 | 设置为(默认值)以使用迭代器或在发送行之前将文件加载到内存中。true false |
3 | 设置为 to 可在文件数据之前和之后发出文件开始和文件结束标记消息。
标记是具有有效负载(属性中包含 and 值)的消息。
在筛选某些行的下游流中按顺序处理文件时,可以使用标记。
它们使下游处理能够知道文件何时被完全处理。
此外,包含这些消息或添加到这些消息的标头。
该标记包括行数。
如果文件为空,则仅发出 和 标记,并将其作为 .
默认值为 .
默认情况下,, 是 。
另请参阅 (next 属性)。true FileSplitter.FileMarker START END mark file_marker START END END START END 0 lineCount false true apply-sequence false markers-json |
4 | 当为 true 时,将此项设置为将对象转换为 JSON 字符串。
(使用 under)。markers true FileMarker SimpleJsonSerializer |
5 | 设置为 可禁用在邮件中包含 和 标头。
默认值为 ,除非为 。
当 和 is 时,标记包含在排序中。
当 和 is 时,标头设置为 ,因为大小未知。false sequenceSize sequenceNumber true markers true true markers true true iterator true sequenceSize 0 |
6 | 设置为 to 会导致在文件中没有行时引发 a。
默认值为 .true RequiresReplyException false |
7 | 设置将文本数据读取到有效负载时要使用的字符集名称。
默认值为 platform charset。String |
8 | 在为其余行发出的消息中作为标题携带的第一行的标题名称。 从 5.0 版本开始。 |
9 | 设置用于将消息发送到拆分器的输入通道。 |
10 | 设置将消息发送到的输出通道。 |
11 | 设置发送超时。
仅当 can 阻止时适用 — 例如完整的 .output-channel QueueChannel |
12 | 设置为 to 以禁用在刷新上下文时自动启动拆分器。
默认值为 .false true |
13 | 如果 是 ,则设置此端点的顺序。input-channel <publish-subscribe-channel/> |
14 | 设置分流器的启动阶段(在 is 时使用)。auto-startup true |
还会将任何基于文本的内容拆分为多行。
从版本 4.3 开始,当与 FTP 或 SFTP 流入站通道适配器或者使用该选项检索文件的 FTP 或 SFTP 出站网关结合使用时,拆分器会在文件完全使用时自动关闭支持该流的会话
有关这些工具的更多信息,请参阅 FTP Streaming Inbound Channel Adapter 和 SFTP Streaming Inbound Channel Adapter 以及 FTP Outbound Gateway 和 SFTP Outbound Gateway。FileSplitter
InputStream
stream
使用 Java 配置时,可以使用其他构造函数,如下例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
当为 true 时,标记表示为 JSON 字符串(使用 a )。markersJson
SimpleJsonSerializer
版本 5.0 引入了一个选项,用于指定内容的第一行是标题(例如 CSV 文件中的列名称)。
传递给此属性的参数是标头名称,在该名称下,第一行作为其余行发出的消息中的标头。
此行不包含在序列标题中(如果为 true),也不包含在与 关联的 中。
注意:从版本 5.5 开始,lineCount' 也作为消息的 into 标头包含在内,因为可以序列化为 JSON。
如果文件仅包含 header 行,则该文件将被视为空,因此,在拆分期间仅发出实例(如果启用了标记 — 否则,不会发出任何消息)。
默认情况下(如果未设置标头名称),第一行被视为 data 并成为第一个发出的消息的有效负载。firstLineAsHeader
applySequence
lineCount
FileMarker.END
FileHeaders.LINE_COUNT
FileMarker.END
FileMarker
FileMarker
如果您需要有关从文件内容中提取标头的更复杂的逻辑(不是第一行、不是行的整个内容、不是一个特定的标头等),请考虑在 .
请注意,已移动到标题的行可能会从正常内容流程的下游进行筛选。FileSplitter
1 | splitter 的 bean 名称。 |
2 | 设置为(默认值)以使用迭代器或在发送行之前将文件加载到内存中。true false |
3 | 设置为 to 可在文件数据之前和之后发出文件开始和文件结束标记消息。
标记是具有有效负载(属性中包含 and 值)的消息。
在筛选某些行的下游流中按顺序处理文件时,可以使用标记。
它们使下游处理能够知道文件何时被完全处理。
此外,包含这些消息或添加到这些消息的标头。
该标记包括行数。
如果文件为空,则仅发出 和 标记,并将其作为 .
默认值为 .
默认情况下,, 是 。
另请参阅 (next 属性)。true FileSplitter.FileMarker START END mark file_marker START END END START END 0 lineCount false true apply-sequence false markers-json |
4 | 当为 true 时,将此项设置为将对象转换为 JSON 字符串。
(使用 under)。markers true FileMarker SimpleJsonSerializer |
5 | 设置为 可禁用在邮件中包含 和 标头。
默认值为 ,除非为 。
当 和 is 时,标记包含在排序中。
当 和 is 时,标头设置为 ,因为大小未知。false sequenceSize sequenceNumber true markers true true markers true true iterator true sequenceSize 0 |
6 | 设置为 to 会导致在文件中没有行时引发 a。
默认值为 .true RequiresReplyException false |
7 | 设置将文本数据读取到有效负载时要使用的字符集名称。
默认值为 platform charset。String |
8 | 在为其余行发出的消息中作为标题携带的第一行的标题名称。 从 5.0 版本开始。 |
9 | 设置用于将消息发送到拆分器的输入通道。 |
10 | 设置将消息发送到的输出通道。 |
11 | 设置发送超时。
仅当 can 阻止时适用 — 例如完整的 .output-channel QueueChannel |
12 | 设置为 to 以禁用在刷新上下文时自动启动拆分器。
默认值为 .false true |
13 | 如果 是 ,则设置此端点的顺序。input-channel <publish-subscribe-channel/> |
14 | 设置分流器的启动阶段(在 is 时使用)。auto-startup true |
幂等下游处理拆分文件
当 为 true 时,拆分器会在标题中添加行号(当为 true 时,标记计为行)。
该行号可与幂等接收器一起使用,以避免在重新启动后重新处理行。apply-sequence
SEQUENCE_NUMBER
markers
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}