Spring Batch 集成
Spring Batch 集成
Spring Batch 的许多用户可能会遇到以下要求 在 Spring Batch 的范围之外,但这可能是有效的,并且 使用 Spring Integration 简洁地实现。相反,Spring 集成用户可能会遇到 Spring Batch 需求,需要一种方法 有效地集成这两个框架。在这种情况下,几个 模式和用例出现,Spring Batch 集成 满足这些要求。
Spring Batch 和 Spring Integration 之间的界限并不总是 很清楚,但有两条建议可以 help:考虑粒度并应用常见模式。一些 本节将介绍这些常见模式。
将消息传递添加到批处理中可实现 运营,以及关键问题的分离和战略制定。 例如,一条消息可能会触发要执行的作业,然后 发送消息可以通过多种方式公开。或者,当 作业完成或失败,该事件可能会触发要发送的消息, 这些消息的使用者可能有操作方面的顾虑 与应用程序本身无关。消息传递可以 也可以嵌入到作业中(例如,读取或写入 通过通道处理)。远程分区和远程分块 提供在多个 worker 之间分配工作负载的方法。
本节涵盖以下关键概念:
命名空间支持
专用 XML 命名空间支持已添加到 Spring Batch Integration 版本 1.3 中。 旨在提供更简单的配置 经验。要使用命名空间,请添加以下内容 命名空间声明添加到 Spring XML 应用程序上下文中 文件:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
xsi:schemaLocation="
http://www.springframework.org/schema/batch-integration
https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">
...
</beans>
以下示例显示了一个为 Spring 配置的 Spring XML 应用程序上下文文件 批量集成:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
xsi:schemaLocation="
http://www.springframework.org/schema/batch-integration
https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd">
...
</beans>
将版本号附加到引用的 XSD 文件也是 允许。但是,由于无版本声明始终使用 latest 架构,我们通常不建议附加版本 number 设置为 XSD 名称。添加版本号 可能会在更新 Spring Batch 时产生问题 集成依赖项,因为它们可能需要更新的版本 的 XML 架构。
通过消息启动 Batch 作业
使用核心 Spring Batch API 启动批处理作业时,您可以 基本上有两个选项:
-
在命令行中,使用
CommandLineJobRunner
-
以编程方式,使用 或
JobOperator.start()
JobLauncher.run()
例如,您可能希望在 when incalling batch jobs by
使用 shell 脚本。或者,您可以直接使用 (例如,使用
Spring Batch 作为 Web 应用程序的一部分)。但是,那又如何呢
更复杂的用例?也许您需要轮询远程 (S)FTP
server 检索 Batch Job 或应用程序的数据
必须同时支持多个不同的数据源。为
例如,您不仅可以从 Web 接收数据文件,还可以从
FTP 和其他来源。也许输入文件的其他转换是
在调用 Spring Batch 之前需要。CommandLineJobRunner
JobOperator
因此,执行批处理作业会更强大
通过使用 Spring Integration 及其众多适配器。例如
您可以使用 File Inbound Channel Adapter 来
监视文件系统中的目录,并以
一旦输入文件到达。此外,您还可以创建 Spring
使用多个不同适配器的集成流
从多个来源摄取批处理作业的数据
同时仅使用配置。实现所有这些
Spring Integration 的场景很容易,因为它允许
解耦、事件驱动的 .JobLauncher
Spring Batch 集成提供了您可以
用于启动批处理作业。的输入由
Spring Integration 消息,该消息具有 类型为 的有效负载。这个类是 to be launch 和 that are 的包装器
启动 Batch 作业所必需的。JobLaunchingMessageHandler
JobLaunchingMessageHandler
JobLaunchRequest
Job
JobParameters
下图显示了典型的 Spring 集成 启动 Batch 作业所需的消息流。EIP (Enterprise Integration Patterns) 网站提供了消息传送图标及其描述的完整概述。
将文件转换为 JobLaunchRequest
以下示例将文件转换为 :JobLaunchRequest
package io.spring.sbi;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution 响应
执行批处理作业时,将返回一个实例。您可以使用这个
instance 来确定执行的状态。如果
a 可以创建
successful,则始终返回它,无论
或者实际执行不成功。JobExecution
JobExecution
如何返回实例的确切行为取决于提供的 .如果使用(单线程)实现,则仅在作业完成时返回响应。使用 时,将返回实例
马上。然后,您可以获取 of 实例
(使用 ) 并查询作业的更新状态
使用 .了解更多
信息,请参阅 查询存储库。JobExecution
TaskExecutor
synchronous
TaskExecutor
JobExecution
after
asynchronous
TaskExecutor
JobExecution
id
JobExecution
JobExecution.getJobId()
JobRepository
JobExplorer
Spring Batch 集成配置
考虑某人需要创建一个文件来侦听的情况
对于所提供目录中的 CSV 文件,请将它们交给转换器
(),通过任务启动网关启动任务,以及
使用 记录 的输出。inbound-channel-adapter
FileMessageToJobRequest
JobExecution
logging-channel-adapter
以下示例显示了如何在 XML 中配置该常见情况: .XML配置
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
以下示例显示了如何在 Java 中配置该常见情况:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
ItemReader 配置示例
现在我们正在轮询文件并启动作业,我们需要配置我们的 Spring
批处理(例如),以使用在作业定义的位置找到的文件
参数命名为 “input.file.name”,如下面的 bean 配置所示:ItemReader
下面的 XML 示例显示了必要的 Bean 配置:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
下面的 Java 示例显示了必要的 Bean 配置:
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
前面示例中的要点是将 的值注入 作为 Resource 属性值,并设置 bean
具有 STEP 范围。将 bean 设置为具有 step scope 可利用
后期绑定支持,允许访问变量。#{jobParameters['input.file.name']}
ItemReader
jobParameters
Job-Launching Gateway 的可用属性
任务启动网关具有以下属性,您可以设置这些属性来控制任务:
-
id
:标识底层 Spring Bean 定义,它是以下任一的实例:-
EventDrivenConsumer
-
PollingConsumer
(确切的实现取决于组件的 input channel 是 a 还是 a 。SubscribableChannel
PollableChannel
-
-
auto-startup
:布尔标志,指示终端节点应在 启动。默认值为 .true
-
request-channel
:此端点的输入。MessageChannel
-
reply-channel
:生成的有效负载将发送到该目标。MessageChannel
JobExecution
-
reply-timeout
:允许您指定此网关等待回复消息的时间(以毫秒为单位) 在 throw 之前成功发送到 reply 通道 异常。此属性仅在通道 可能会阻塞(例如,当使用有界队列通道时 目前已满)。另请记住,当发送到 时,会发生调用 在发件人的线程中。因此,发送失败 操作可能由下游的其他组件引起。 该属性映射到底层实例的 property。如果未指定,则属性 默认为 -1,则 这意味着,默认情况下,会无限期等待。DirectChannel
reply-timeout
sendTimeout
MessagingTemplate
Gateway
-
job-launcher
:自选。接受 自定义 Bean 引用。 如果未指定,则适配器 重新使用在 的 下注册的实例。如果没有默认实例 存在时,会引发异常。JobLauncher
id
jobLauncher
-
order
:指定此终端节点作为订阅者连接时的调用顺序 转换为 .SubscribableChannel
子元素
当接收来自 的消息时,您必须提供
全局默认值或为 .Gateway
PollableChannel
Poller
Poller
Job Launching Gateway
下面的示例展示了如何在 XML 中提供 Poller:
<batch-int:job-launching-gateway request-channel="queueChannel"
reply-channel="replyChannel" job-launcher="jobLauncher">
<int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>
下面的示例展示了如何在 Java 中提供 Poller:
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
jobLaunchingGateway.setOutputChannel(replyChannel());
return jobLaunchingGateway;
}
通过信息性消息提供反馈
由于 Spring Batch 作业可以长时间运行,因此提供进度 信息通常至关重要。例如,利益相关者可能希望 以在批处理作业的部分或全部部分失败时收到通知。 Spring Batch 为正在收集的此信息提供支持 通过:
-
主动轮询
-
事件驱动的侦听器
异步启动 Spring Batch 作业时(例如,通过使用作业启动
Gateway),则返回一个实例。因此,您可以通过使用 从 中检索更新的实例来持续轮询状态更新。然而,这是
被视为次优,首选事件驱动方法。JobExecution
JobExecution.getJobId()
JobExecution
JobRepository
JobExplorer
因此,Spring Batch 提供了侦听器,包括最常用的三种 听众:
-
StepListener
-
ChunkListener
-
JobExecutionListener
在下图所示的示例中,Spring Batch 作业已配置了 .因此,Spring Integration 接收并处理之前的任何步骤
或事件发生后。例如,您可以使用 .根据该检查的结果,可能会发生各种情况(例如
将邮件路由到邮件出站通道适配器),以便电子邮件通知可以
根据某些条件发出。StepExecutionListener
StepExecution
Router
以下由两部分组成的示例显示了如何配置侦听器以发送
message 发送到 a 事件,并将其输出记录到 .Gateway
StepExecution
logging-channel-adapter
首先,创建通知集成 Bean。
以下示例显示了如何在 XML 中创建通知集成 Bean:
<int:channel id="stepExecutionsChannel"/>
<int:gateway id="notificationExecutionsListener"
service-interface="org.springframework.batch.core.StepExecutionListener"
default-request-channel="stepExecutionsChannel"/>
<int:logging-channel-adapter channel="stepExecutionsChannel"/>
以下示例显示了如何在 Java 中创建通知集成 Bean:
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
您需要将注释添加到您的配置中。@IntegrationComponentScan |
其次,修改您的作业以添加步骤级侦听器。
以下示例演示如何在 XML 中添加步骤级侦听器:
<job id="importPayments">
<step id="step1">
<tasklet ../>
<chunk ../>
<listeners>
<listener ref="notificationExecutionsListener"/>
</listeners>
</tasklet>
...
</step>
</job>
以下示例显示了如何在 Java 中添加步骤级侦听器:
public Job importPaymentsJob(JobRepository jobRepository) {
return new JobBuilder("importPayments", jobRepository)
.start(stepBuilderFactory.get("step1")
.chunk(200)
.listener(notificationExecutionsListener())
...
)
}
异步处理器
异步处理器可帮助您扩展项目的处理。在异步
处理器用例中,它充当调度程序,执行
对于新线程上的项。一旦项目完成,就是
传递给 要写。AsyncItemProcessor
ItemProcessor
Future
AsynchItemWriter
因此,基本上,您可以通过使用异步项目处理来提高性能
允许您实现 fork-join 方案。收集结果和
一旦所有结果都可用,就会写回 chunk。AsyncItemWriter
以下示例显示了如何在 XML 中配置 :AsyncItemProcessor
<bean id="processor"
class="org.springframework.batch.integration.async.AsyncItemProcessor">
<property name="delegate">
<bean class="your.ItemProcessor"/>
</property>
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
</property>
</bean>
以下示例显示了如何在 XML 中配置 :AsyncItemProcessor
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setTaskExecutor(taskExecutor);
asyncItemProcessor.setDelegate(itemProcessor);
return asyncItemProcessor;
}
property 引用您的 bean,property 引用您选择的 bean。delegate
ItemProcessor
taskExecutor
TaskExecutor
以下示例显示了如何在 XML 中配置:AsyncItemWriter
<bean id="itemWriter"
class="org.springframework.batch.integration.async.AsyncItemWriter">
<property name="delegate">
<bean id="itemWriter" class="your.ItemWriter"/>
</property>
</bean>
以下示例显示了如何在 Java 中配置 :AsyncItemWriter
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
同样,该属性是
实际上是对 bean 的引用。delegate
ItemWriter
外部化批处理执行
到目前为止讨论的集成方法建议了用例 其中 Spring Integration 将 Spring Batch 包装成一个外壳。 但是,Spring Batch 也可以在内部使用 Spring Integration。 通过使用这种方法, Spring Batch 用户可以委托 将项目甚至块处理到外部进程。这 用于卸载复杂的处理。Spring Batch 集成 为以下产品提供专门支持:
-
远程分块
-
远程分区
远程分块
下图显示了使用 Spring Batch 时远程分块工作的一种方式 与 Spring 集成一起:
更进一步,您还可以将
使用(由 Spring Batch 集成提供)进行块处理,将项目发送出去
并收集结果。发送后, Spring Batch 会继续
读取和分组项目的过程,而无需等待结果。
相反,收集结果并将其集成回 Spring Batch 流程是 的责任。ChunkMessageChannelItemWriter
ChunkMessageChannelItemWriter
使用 Spring Integration,您可以获得完整的
控制进程的并发性(例如,通过
使用 a 而不是 a )。此外,通过依赖
Spring 集成丰富的通道适配器集合(例如
JMS 和 AMQP),您可以将批处理作业的块分发到
用于处理的外部系统。QueueChannel
DirectChannel
具有要远程分块的步骤的作业可能具有类似于 following 的 XML 格式:
<job id="personJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
</tasklet>
...
</step>
</job>
具有要远程分块的步骤的作业可能具有类似于 在 Java 中遵循:
public Job chunkJob(JobRepository jobRepository) {
return new JobBuilder("personJob", jobRepository)
.start(stepBuilderFactory.get("step1")
.<Person, Person>chunk(200)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
该引用指向要用于读取
经理。该引用指向一个特殊的 (称为 ),如前所述。处理器(如果有)处于关闭状态
Manager 配置,因为它是在 worker 上配置的。您应该检查任何
其他组件属性,例如限制等,在实现
您的用例。ItemReader
ItemWriter
ItemWriter
ChunkMessageChannelItemWriter
以下 XML 配置提供了基本的 manager 设置:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>
<bean id="messagingTemplate"
class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="requests"/>
<property name="receiveTimeout" value="2000"/>
</bean>
<bean id="itemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step">
<property name="messagingOperations" ref="messagingTemplate"/>
<property name="replyChannel" ref="replies"/>
</bean>
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsReplies"
destination-name="replies"
channel="replies"/>
以下 Java 配置提供了基本的 manager 设置:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
/*
* Configure the ChunkMessageChannelItemWriter
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
= new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
前面的配置为我们提供了许多 bean。我们
使用 ActiveMQ 配置我们的消息中间件,并将
Spring 集成提供的入站和出站 JMS 适配器。如
显示了我们的 bean,它是
引用我们的作业步骤,使用 将块写入
配置的中间件。itemWriter
ChunkMessageChannelItemWriter
现在我们可以继续进行 worker 配置,如下例所示:
以下示例显示了 XML 中的 worker 配置:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int:channel id="requests"/>
<int:channel id="replies"/>
<int-jms:message-driven-channel-adapter id="incomingRequests"
destination-name="requests"
channel="requests"/>
<int-jms:outbound-channel-adapter id="outgoingReplies"
destination-name="replies"
channel="replies">
</int-jms:outbound-channel-adapter>
<int:service-activator id="serviceActivator"
input-channel="requests"
output-channel="replies"
ref="chunkProcessorChunkHandler"
method="handleChunk"/>
<bean id="chunkProcessorChunkHandler"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
<property name="chunkProcessor">
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
<property name="itemWriter">
<bean class="io.spring.sbi.PersonItemWriter"/>
</property>
<property name="itemProcessor">
<bean class="io.spring.sbi.PersonItemProcessor"/>
</property>
</bean>
</property>
</bean>
以下示例显示了 Java 中的 worker 配置:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
/*
* Configure outbound flow (replies going to the manager)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
/*
* Configure the ChunkProcessorChunkHandler
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
= new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
这些配置项中的大多数从
Manager 配置。工作人员不需要访问
Spring Batch 或
添加到实际的作业配置文件中。感兴趣的主要 bean
是 .的属性采用
配置,这是您将提供对将在 worker 上运行的 (以及可选的 ) 的引用的位置
当它从 Manager 接收 chunk 时。JobRepository
chunkProcessorChunkHandler
chunkProcessor
ChunkProcessorChunkHandler
SimpleChunkProcessor
ItemWriter
ItemProcessor
有关更多信息,请参阅“可扩展性”一章中有关 Remote Chunking 的部分。
从版本 4.1 开始, Spring Batch Integration 引入了可用于简化远程分块设置的 Comments。此注释提供
两个 bean 可以在应用程序上下文中自动装配:@EnableBatchIntegration
-
RemoteChunkingManagerStepBuilderFactory
:配置管理器步骤 -
RemoteChunkingWorkerBuilder
:配置远程工作人员集成流程
这些 API 负责配置许多组件,如下图所示:
在 Manager 端,允许您
通过声明来配置 Manager 步骤:RemoteChunkingManagerStepBuilderFactory
-
用于读取项目并将其发送给工作人员的项目读取器
-
输出通道(“Outgoing requests”),用于向 worker 发送请求
-
用于接收来自 worker 的回复的输入通道(“Incoming replies”)
您无需显式配置和 .
(如果找到理由,您仍然可以显式配置它们)。ChunkMessageChannelItemWriter
MessagingTemplate
在 worker 端,允许您将 worker 配置为:RemoteChunkingWorkerBuilder
-
侦听 manager 在 input 通道上发送的请求(“Incoming requests”)
-
调用 for each request 的方法 使用配置的 和
handleChunk
ChunkProcessorChunkHandler
ItemProcessor
ItemWriter
-
将输出通道上的回复(“传出回复”)发送到管理器
您无需显式配置 和 .(如果您发现
这样做的理由)。SimpleChunkProcessor
ChunkProcessorChunkHandler
以下示例演示如何使用这些 API:
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the manager
.outputChannel(replies()) // replies sent to the manager
.build();
}
// Middleware beans setup omitted
}
}
你可以在这里找到远程分块作业的完整示例。
远程分区
下图显示了典型的远程分区情况:
另一方面,远程分区在以下情况下很有用
不是项目的处理,而是关联的 I/O
造成瓶颈。使用远程分区,您可以发送工作
到执行完整 Spring Batch 的工作程序
步骤。因此,每个 worker 都有自己的 、 、 和 。为此,Spring Batch
集成提供 .ItemReader
ItemProcessor
ItemWriter
MessageChannelPartitionHandler
该接口的此实现使用实例来
向远程工作人员发送说明并接收他们的响应。
这为传输(例如 JMS
和 AMQP)用于与远程工作人员通信。PartitionHandler
MessageChannel
“可扩展性”一章中介绍远程分区的部分概述了这些概念和
组件,并显示
使用 Default to partition 的示例
在单独的本地执行线程中。用于远程分区
对于多个 JVM,需要两个额外的组件:TaskExecutorPartitionHandler
-
远程处理结构或网格环境
-
支持所需 远程处理 Fabric 或 Grid 环境
PartitionHandler
与远程分块类似,您可以使用 JMS 作为“远程处理结构”。在这种情况下,请使用
实例作为实现,
如前所述。MessageChannelPartitionHandler
PartitionHandler
以下示例假定一个现有的分区作业,并重点介绍 XML 中的 和 JMS 配置:MessageChannelPartitionHandler
<bean id="partitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="stepName" value="step1"/>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outbound-replies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outbound-requests"/>
<property name="receiveTimeout" value="100000"/>
</bean>
</property>
</bean>
<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
channel="outbound-requests"/>
<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
channel="inbound-requests"/>
<bean id="stepExecutionRequestHandler"
class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="stepLocator" ref="stepLocator"/>
</bean>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
output-channel="outbound-staging"/>
<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
channel="outbound-staging"/>
<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
channel="inbound-staging"/>
<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
output-channel="outbound-replies"/>
<int:channel id="outbound-replies">
<int:queue/>
</int:channel>
<bean id="stepLocator"
class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
以下示例假定一个现有的分区作业,并重点介绍 Java 中的 和 JMS 配置:MessageChannelPartitionHandler
/*
* Configuration of the manager side
*/
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(3);
partitionHandler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100000);
partitionHandler.setMessagingOperations(template);
return partitionHandler;
}
@Bean
public QueueChannel outboundReplies() {
return new QueueChannel();
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsRequests() {
return IntegrationFlow.from("outboundRequests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requestsQueue"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(partitionHandler());
aggregatorFactoryBean.setOutputChannel(outboundReplies());
// configure other propeties of the aggregatorFactoryBean
return aggregatorFactoryBean;
}
@Bean
public DirectChannel inboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundJmsStaging() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("stagingQueue"))
.channel(inboundStaging())
.get();
}
/*
* Configuration of the worker side
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}
@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}
@Bean
public DirectChannel inboundRequests() {
return new DirectChannel();
}
public IntegrationFlow inboundJmsRequests() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requestsQueue"))
.channel(inboundRequests())
.get();
}
@Bean
public DirectChannel outboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsStaging() {
return IntegrationFlow.from("outboundStaging")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("stagingQueue"))
.get();
}
您还必须确保 partition 属性映射到 Bean。handler
partitionHandler
以下示例将 partition 属性映射到 in
XML:handler
partitionHandler
<job id="personJob">
<step id="step1.manager">
<partition partitioner="partitioner" handler="partitionHandler"/>
...
</step>
</job>
以下示例将 partition 属性映射到 in
Java:handler
partitionHandler
public Job personJob(JobRepository jobRepository) {
return new JobBuilder("personJob", jobRepository)
.start(stepBuilderFactory.get("step1.manager")
.partitioner("step1.worker", partitioner())
.partitionHandler(partitionHandler())
.build())
.build();
}
您可以在此处找到远程分区作业的完整示例。
您可以使用注释来简化远程
partitioning 设置。此 Comments 提供了两个对远程分区有用的 bean:@EnableBatchIntegration
-
RemotePartitioningManagerStepBuilderFactory
:配置管理器步骤 -
RemotePartitioningWorkerStepBuilderFactory
:配置 worker 步骤
这些 API 负责配置许多组件,如下图所示:
在 Manager 端,允许您
通过声明来配置 Manager 步骤:RemotePartitioningManagerStepBuilderFactory
-
用于对数据进行分区的
Partitioner
-
向 worker 发送请求的输出通道(“传出请求”)
-
接收来自 worker 的回复的输入通道(“传入回复”)(配置回复聚合时)
-
轮询间隔和超时参数(配置作业存储库轮询时)
您无需显式配置 和 .
(如果您找到理由,您仍然可以显式配置它们)。MessageChannelPartitionHandler
MessagingTemplate
在 worker 端,允许您将 worker 配置为:RemotePartitioningWorkerStepBuilderFactory
-
侦听 manager 在 input 通道上发送的请求(“Incoming requests”)
-
调用 for each request 的方法
handle
StepExecutionRequestHandler
-
将输出通道上的回复(“传出回复”)发送到管理器
您无需显式配置 .
(如果您找到这样做的理由,您可以显式配置它)。StepExecutionRequestHandler
以下示例演示如何使用这些 API:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
// Middleware beans setup omitted
}
}