Spring Batch 集成

Spring Batch 集成

Spring Batch 集成介绍

Spring Batch 的许多用户可能会遇到以下要求 在 Spring Batch 的范围之外,但这可能是有效的,并且 使用 Spring Integration 简洁地实现。相反,Spring 集成用户可能会遇到 Spring Batch 需求,需要一种方法 有效地集成这两个框架。在这种情况下,几个 模式和用例出现,Spring Batch 集成 满足这些要求。spring-doc.cn

Spring Batch 和 Spring Integration 之间的界限并不总是 很清楚,但有两条建议可以 help:考虑粒度,并应用常见模式。一些 这些常见模式在本参考手册中进行了描述 部分。spring-doc.cn

将消息传递添加到批处理中可实现 运营,以及关键问题的分离和战略制定。 例如,一条消息可能会触发要执行的作业,然后 消息的发送可以通过多种方式公开。或者,当 作业完成或失败,该事件可能会触发要发送的消息, 这些消息的使用者可能有操作方面的顾虑 与应用程序本身无关。消息传递可以 也可以嵌入到作业中(例如,读取或写入 通过通道处理)。远程分区和远程分块 提供在多个 worker 之间分配工作负载的方法。spring-doc.cn

本节涵盖以下关键概念:spring-doc.cn

命名空间支持

从 Spring Batch Integration 1.3 开始,专用的 XML 命名空间 添加了支持,目的是提供更简单的配置 经验。要激活命名空间,请添加以下内容 命名空间声明添加到 Spring XML 应用程序上下文中 文件:spring-doc.cn

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">

    ...

</beans>

一个完全配置的 Spring XML Application Context 文件 批量集成可能如下所示:spring-doc.cn

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:batch="http://www.springframework.org/schema/batch"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
    http://www.springframework.org/schema/batch
    https://www.springframework.org/schema/batch/spring-batch.xsd
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd">

    ...

</beans>

将版本号附加到引用的 XSD 文件也是 allowed,但是,作为无版本的声明,始终使用 latest 架构,我们通常不建议附加版本 number 设置为 XSD 名称。添加版本号 可能会在更新 Spring Batch 时产生问题 集成依赖项,因为它们可能需要更新的版本 的 XML 架构。spring-doc.cn

通过消息启动 Batch 作业

使用核心 Spring Batch API 启动批处理作业时,您可以 基本上有 2 个选项:spring-doc.cn

  • 在命令行中,使用CommandLineJobRunnerspring-doc.cn

  • 以编程方式,使用 或JobOperator.start()JobLauncher.run()spring-doc.cn

例如,您可能希望在调用 Batch Jobs 时通过以下方式使用 使用 shell 脚本。或者,您可以直接使用 (例如,使用 Spring Batch 作为 Web 应用程序的一部分)。但是,那又如何呢 更复杂的用例?也许您需要轮询远程 (S)FTP server 检索 Batch Job 或应用程序的数据 必须同时支持多个不同的数据源。为 例如,您不仅可以从 Web 接收数据文件,还可以从 FTP 和其他来源。也许输入文件的其他转换是 在调用 Spring Batch 之前需要。CommandLineJobRunnerJobOperatorspring-doc.cn

因此,执行批处理作业会更强大 使用 Spring Integration 及其众多适配器。例如 您可以使用 File Inbound Channel Adapter 来 监视文件系统中的目录,并以 一旦输入文件到达。此外,您还可以创建 Spring 使用多个不同适配器的集成流 从多个来源摄取批处理作业的数据 同时使用 only configuration。实现所有这些 Spring Integration 的场景很容易,因为它允许 解耦、事件驱动的 .JobLauncherspring-doc.cn

Spring Batch 集成提供了您可以 用于启动批处理作业。的输入由 Spring Integration 消息,该消息具有 类型为 的有效负载。此类是需要启动的 以及启动 Batch 作业所需的包装器。JobLaunchingMessageHandlerJobLaunchingMessageHandlerJobLaunchRequestJobJobParametersspring-doc.cn

下图说明了典型的 Spring 集成 消息流以启动 Batch 作业。EIP (Enterprise Integration Patterns) 网站提供了消息传送图标及其描述的完整概述。spring-doc.cn

启动 Batch Job
图 1.启动 Batch Job
将文件转换为 JobLaunchRequest
package io.spring.sbi;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}
回应JobExecution

执行批处理作业时,将返回一个实例。这 instance 可用于确定执行的状态。如果 a 可以创建 successful,则始终返回它,无论 或者实际执行不成功。JobExecutionJobExecutionspring-doc.cn

如何返回实例的确切行为取决于提供的 .如果使用(单线程)实现,则仅在作业完成时返回响应。使用 时,将返回实例 马上。然后,用户可以获取 of 实例 (使用 ) 并查询作业的更新状态 使用 .了解更多 信息,请查阅春 有关查询存储库的批处理参考文档。JobExecutionTaskExecutorsynchronousTaskExecutorJobExecutionafterasynchronousTaskExecutorJobExecutionidJobExecutionJobExecution.getJobId()JobRepositoryJobExplorerspring-doc.cn

Spring Batch 集成配置

考虑某人需要创建一个文件来侦听的情况 对于所提供目录中的 CSV 文件,请将它们交给转换器 (),通过 Job Launching Gateway 启动任务,然后 使用 记录 的输出。inbound-channel-adapterFileMessageToJobRequestJobExecutionlogging-channel-adapterspring-doc.cn

以下示例显示了如何在 XML 中配置该常见情况:spring-doc.cn

XML 配置
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

以下示例显示了如何在 Java 中配置该常见情况:spring-doc.cn

Java 配置
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}
ItemReader 配置示例

现在我们正在轮询文件并启动作业,我们需要配置我们的 Spring 批处理(例如),以使用在作业定义的位置找到的文件 参数命名为 “input.file.name”,如下面的 bean 配置所示:ItemReaderspring-doc.cn

下面的 XML 示例显示了必要的 Bean 配置:spring-doc.cn

XML 配置
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

下面的 Java 示例显示了必要的 Bean 配置:spring-doc.cn

Java 配置
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

前面示例中的要点是将 的值注入 作为 Resource 属性值,并设置 bean 具有 Step 范围。将 bean 设置为具有 Step 范围可利用 后期绑定支持,允许访问变量。#{jobParameters['input.file.name']}ItemReaderjobParametersspring-doc.cn

Job-Launching Gateway 的可用属性

任务启动网关具有以下属性,您可以设置这些属性来控制任务:spring-doc.cn

  • id:标识底层 Spring Bean 定义,它是以下任一的实例:spring-doc.cn

    • EventDrivenConsumerspring-doc.cn

    • PollingConsumer(确切的实现取决于组件的 input 通道是 a 还是 。SubscribableChannelPollableChannelspring-doc.cn

  • auto-startup:布尔标志,指示终端节点应在 启动。默认值为 truespring-doc.cn

  • request-channel:此端点的输入。MessageChannelspring-doc.cn

  • reply-channel:生成的有效负载将发送到该目标。MessageChannelJobExecutionspring-doc.cn

  • reply-timeout:允许您指定此网关等待回复消息的时间(以毫秒为单位) 在 throw 之前成功发送到 reply 通道 异常。此属性仅在通道 可能会阻塞(例如,当使用有界队列通道时 目前已满)。另请记住,当发送到 时,会发生调用 在发件人的线程中。因此,发送失败 操作可能由下游的其他组件引起。 该属性映射到底层实例的 property。如果未指定,则属性 默认为 <emphasis>-1</emphasis>, 这意味着,默认情况下,会无限期等待。DirectChannelreply-timeoutsendTimeoutMessagingTemplateGatewayspring-doc.cn

  • job-launcher:自选。接受 自定义 Bean 引用。 如果未指定适配器 重新使用在 的 下注册的实例。如果没有默认实例 存在时,会引发异常。JobLauncheridjobLauncherspring-doc.cn

  • order:指定此终端节点作为订阅者连接时的调用顺序 转换为 .SubscribableChannelspring-doc.cn

子元素

当接收来自 的消息时,您必须提供 全局默认值或为 .GatewayPollableChannelPollerPollerJob Launching Gatewayspring-doc.cn

下面的示例展示了如何在 XML 中提供 Poller:spring-doc.cn

XML 配置
<batch-int:job-launching-gateway request-channel="queueChannel"
    reply-channel="replyChannel" job-launcher="jobLauncher">
  <int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>

下面的示例展示了如何在 Java 中提供 Poller:spring-doc.cn

Java 配置
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
    jobLaunchingGateway.setOutputChannel(replyChannel());
    return jobLaunchingGateway;
}

通过信息性消息提供反馈

由于 Spring Batch 作业可以长时间运行,因此提供进度 信息通常至关重要。例如,利益相关者可能希望 以在批处理作业的部分或全部部分失败时收到通知。 Spring Batch 为正在收集的此信息提供支持 通过:spring-doc.cn

异步启动 Spring Batch 作业时(例如,通过使用 ),将返回一个实例。因此,可以是 用于通过使用 从 检索更新的实例来持续轮询状态更新。然而,这是 被认为是次优的,应该首选事件驱动的方法。Job Launching GatewayJobExecutionJobExecution.getJobId()JobExecutionJobRepositoryJobExplorerspring-doc.cn

因此,Spring Batch 提供了侦听器,包括最常用的三种 听众:spring-doc.cn

在下图所示的示例中,Spring Batch 作业已配置了 .因此,Spring Integration 接收并处理之前的任何步骤 或事件发生后。例如,可以使用 .根据该检查的结果,可能会发生各种情况(例如 将邮件路由到邮件出站通道适配器),以便电子邮件通知可以 根据某些条件发出。StepExecutionListenerStepExecutionRouterspring-doc.cn

处理信息性消息
图 2.处理信息性消息

以下由两部分组成的示例显示了如何配置侦听器以发送 message 发送到 a 事件,并将其输出记录到 .GatewayStepExecutionlogging-channel-adapterspring-doc.cn

首先,创建通知集成 Bean。spring-doc.cn

以下示例显示了如何在 XML 中创建通知集成 Bean:spring-doc.cn

XML 配置
<int:channel id="stepExecutionsChannel"/>

<int:gateway id="notificationExecutionsListener"
    service-interface="org.springframework.batch.core.StepExecutionListener"
    default-request-channel="stepExecutionsChannel"/>

<int:logging-channel-adapter channel="stepExecutionsChannel"/>

以下示例显示了如何在 Java 中创建通知集成 Bean:spring-doc.cn

Java 配置
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
    LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
    adapter.setLoggerName("TEST_LOGGER");
    adapter.setLogExpressionString("headers.id + ': ' + payload");
    return adapter;
}

@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
您需要将注释添加到您的配置中。@IntegrationComponentScan

其次,修改您的作业以添加步骤级侦听器。spring-doc.cn

以下示例演示如何在 XML 中添加步骤级侦听器:spring-doc.cn

XML 配置
<job id="importPayments">
    <step id="step1">
        <tasklet ../>
            <chunk ../>
            <listeners>
                <listener ref="notificationExecutionsListener"/>
            </listeners>
        </tasklet>
        ...
    </step>
</job>

以下示例显示了如何在 Java 中添加步骤级侦听器:spring-doc.cn

Java 配置
public Job importPaymentsJob() {
    return jobBuilderFactory.get("importPayments")
        .start(stepBuilderFactory.get("step1")
                .chunk(200)
                .listener(notificationExecutionsListener())
                ...
}

异步处理器

异步处理器可帮助您扩展项目的处理。在异步 处理器用例中,它充当调度程序,执行 对于新线程上的项。一旦项目完成,就是 传递给 要写。AsyncItemProcessorItemProcessorFutureAsynchItemWriterspring-doc.cn

因此,基本上,您可以通过使用异步项目处理来提高性能 允许您实现 fork-join 方案。收集结果和 一旦所有结果都可用,就会写回 chunk。AsyncItemWriterspring-doc.cn

以下示例显示了如何在 XML 中配置 :AsyncItemProcessorspring-doc.cn

XML 配置
<bean id="processor"
    class="org.springframework.batch.integration.async.AsyncItemProcessor">
  <property name="delegate">
    <bean class="your.ItemProcessor"/>
  </property>
  <property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
  </property>
</bean>

以下示例显示了如何在 XML 中配置 :AsyncItemProcessorspring-doc.cn

Java 配置
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
    AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
    asyncItemProcessor.setTaskExecutor(taskExecutor);
    asyncItemProcessor.setDelegate(itemProcessor);
    return asyncItemProcessor;
}

property 引用您的 bean,property 引用您选择的 bean。delegateItemProcessortaskExecutorTaskExecutorspring-doc.cn

以下示例显示了如何在 XML 中配置:AsyncItemWriterspring-doc.cn

XML 配置
<bean id="itemWriter"
    class="org.springframework.batch.integration.async.AsyncItemWriter">
  <property name="delegate">
    <bean id="itemWriter" class="your.ItemWriter"/>
  </property>
</bean>

以下示例显示了如何在 Java 中配置 :AsyncItemWriterspring-doc.cn

Java 配置
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
    AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
    asyncItemWriter.setDelegate(itemWriter);
    return asyncItemWriter;
}

同样,该属性是 实际上是对 bean 的引用。delegateItemWriterspring-doc.cn

外部化批处理执行

到目前为止讨论的集成方法建议了用例 其中 Spring Integration 将 Spring Batch 包装成一个 shell。 但是,Spring Batch 也可以在内部使用 Spring Integration。 使用这种方法, Spring Batch 用户可以委托 将项目甚至块处理到外部进程。这 允许您卸载复杂的处理。Spring Batch 集成 为以下产品提供专门支持:spring-doc.cn

远程分块
远程分块
图 3.远程分块

更进一步,您还可以将 使用(由 Spring Batch 集成提供)进行块处理,将项目发送出去 并收集结果。发送后, Spring Batch 会继续 读取和分组项目的过程,而无需等待结果。 相反,收集结果并将其集成回 Spring Batch 流程是 的责任。ChunkMessageChannelItemWriterChunkMessageChannelItemWriterspring-doc.cn

使用 Spring Integration,您可以获得完整的 控制进程的并发性(例如,通过 使用 a 而不是 a )。此外,通过依赖 Spring 集成丰富的通道适配器集合(例如 JMS 和 AMQP),您可以将 Batch 作业的块分发到 用于处理的外部系统。QueueChannelDirectChannelspring-doc.cn

具有要远程分块的步骤的作业可能具有类似于 following 的 XML 格式:spring-doc.cn

XML 配置
<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>

具有要远程分块的步骤的作业可能具有类似于 在 Java 中遵循:spring-doc.cn

Java 配置
public Job chunkJob() {
     return jobBuilderFactory.get("personJob")
             .start(stepBuilderFactory.get("step1")
                     .<Person, Person>chunk(200)
                     .reader(itemReader())
                     .writer(itemWriter())
                     .build())
             .build();
 }

该引用指向要用于读取 经理。该引用指向一个特殊的 (称为 ),如上所述。处理器(如果有)处于关闭状态 Manager 配置,因为它是在 worker 上配置的。您应该检查任何 其他组件属性,例如限制等,在实现 您的用例。ItemReaderItemWriterItemWriterChunkMessageChannelItemWriterspring-doc.cn

以下 XML 配置提供了基本的 manager 设置:spring-doc.cn

XML 配置
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>

以下 Java 配置提供了基本的 manager 设置:spring-doc.cn

Java 配置
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(requests())
            .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
            .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter
 */
@Bean
public ItemWriter<Integer> itemWriter() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
            = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(replies());
    return chunkMessageChannelItemWriter;
}

前面的配置为我们提供了许多 bean。我们 使用 ActiveMQ 配置我们的消息中间件,并将 Spring Integration 提供的入站/出站 JMS 适配器。如 显示了我们的 bean,它是 引用我们的 Job 步骤,使用 for writing chunks over 配置的中间件。itemWriterChunkMessageChannelItemWriterspring-doc.cn

现在我们可以继续进行 worker 配置,如以下示例所示:spring-doc.cn

以下示例显示了 XML 中的 worker 配置:spring-doc.cn

XML 配置
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="incomingRequests"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>

以下示例显示了 Java 中的 worker 配置:spring-doc.cn

Java 配置
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure inbound flow (requests coming from the manager)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .channel(requests())
            .get();
}

/*
 * Configure outbound flow (replies going to the manager)
 */
@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(replies())
            .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
            .get();
}

/*
 * Configure the ChunkProcessorChunkHandler
 */
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
    ChunkProcessor<Integer> chunkProcessor
            = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
    ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
            = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}

这些配置项中的大多数从 Manager 配置。工作人员不需要访问 Spring Batch 或 添加到实际的作业配置文件中。感兴趣的主要 bean 是 .的属性采用 配置,这是您将提供对将在 worker 上运行的 (以及可选的 ) 的引用的位置 当它从 Manager 接收 chunk 时。JobRepositorychunkProcessorChunkHandlerchunkProcessorChunkProcessorChunkHandlerSimpleChunkProcessorItemWriterItemProcessorspring-doc.cn

有关更多信息,请参阅“可扩展性”一章中有关 Remote Chunking 的部分。spring-doc.cn

从版本 4.1 开始, Spring Batch Integration 引入了可用于简化远程分块设置的 Comments。此注释提供 两个可以在应用程序上下文中自动连接的 bean:@EnableBatchIntegrationspring-doc.cn

  • RemoteChunkingManagerStepBuilderFactory:用于配置管理器步骤spring-doc.cn

  • RemoteChunkingWorkerBuilder:用于配置 Remote Worker 集成流程spring-doc.cn

这些 API 负责配置许多组件,如下图所示:spring-doc.cn

远程分块配置
图 4.远程分块配置

在 Manager 端,允许您 通过声明来配置 Manager 步骤:RemoteChunkingManagerStepBuilderFactoryspring-doc.cn

  • 项目读取器,用于读取项目并将其发送给工作人员spring-doc.cn

  • 输出通道(“Outgoing requests”)向 worker 发送请求spring-doc.cn

  • 输入通道(“Incoming replies”)用于接收来自 worker 的回复spring-doc.cn

A 和 不需要显式配置 (如果需要,仍可以显式配置这些内容)。ChunkMessageChannelItemWriterMessagingTemplatespring-doc.cn

在 worker 端,允许您将 worker 配置为:RemoteChunkingWorkerBuilderspring-doc.cn

  • 侦听 Manager 在 input 通道上发送的请求(“Incoming requests”)spring-doc.cn

  • 调用 for each request 的方法 使用配置的 和handleChunkChunkProcessorChunkHandlerItemProcessorItemWriterspring-doc.cn

  • 将输出通道上的回复(“传出回复”)发送到管理器spring-doc.cn

无需显式配置 和 (如果需要,可以显式配置这些)。SimpleChunkProcessorChunkProcessorChunkHandlerspring-doc.cn

以下示例演示如何使用这些 API:spring-doc.cn

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public TaskletStep managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // requests sent to workers
                       .inputChannel(replies())   // replies received from workers
                       .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // requests received from the manager
                       .outputChannel(replies()) // replies sent to the manager
                       .build();
        }

        // Middleware beans setup omitted

    }

}

你可以在这里找到远程分块作业的完整示例。spring-doc.cn

远程分区
远程分区
图 5.远程分区

另一方面,远程分区在以下情况下很有用 不是项目的处理,而是关联的 I/O 造成瓶颈。使用 Remote Partitioning,工作可以 被分配给执行完整 Spring Batch 的 worker 步骤。因此,每个 worker 都有自己的 、 、 和 。为此,Spring Batch 集成提供 .ItemReaderItemProcessorItemWriterMessageChannelPartitionHandlerspring-doc.cn

该接口的此实现使用实例来 向远程工作人员发送说明并接收他们的响应。 这为传输(例如 JMS 和 AMQP)用于与远程工作人员通信。PartitionHandlerMessageChannelspring-doc.cn

“可扩展性”一章中介绍远程分区的部分概述了这些概念和 组件,并显示 使用 Default to partition 的示例 在单独的本地执行线程中。用于远程分区 对于多个 JVM,需要两个额外的组件:TaskExecutorPartitionHandlerspring-doc.cn

与远程分块类似,JMS 可以用作“远程处理结构”。在这种情况下,请使用 实例作为实现, 如前所述。MessageChannelPartitionHandlerPartitionHandlerspring-doc.cn

以下示例假定一个现有的分区作业,并重点介绍 XML 中的 和 JMS 配置:MessageChannelPartitionHandlerspring-doc.cn

XML 配置
<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />

以下示例假定一个现有的分区作业,并重点介绍 Java 中的 和 JMS 配置:MessageChannelPartitionHandlerspring-doc.cn

Java 配置
/*
 * Configuration of the manager side
 */
@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("step1");
    partitionHandler.setGridSize(3);
    partitionHandler.setReplyChannel(outboundReplies());
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100000);
    partitionHandler.setMessagingOperations(template);
    return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
    return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
    return IntegrationFlows.from("outboundRequests")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("requestsQueue"))
            .get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(partitionHandler());
    aggregatorFactoryBean.setOutputChannel(outboundReplies());
    // configure other propeties of the aggregatorFactoryBean
    return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("stagingQueue"))
            .channel(inboundStaging())
            .get();
}

/*
 * Configuration of the worker side
 */
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
    return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("requestsQueue"))
            .channel(inboundRequests())
            .get();
}

@Bean
public DirectChannel outboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
    return IntegrationFlows.from("outboundStaging")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("stagingQueue"))
            .get();
}

您还必须确保 partition 属性映射到 Bean。handlerpartitionHandlerspring-doc.cn

以下示例将 partition 属性映射到 in XML:handlerpartitionHandlerspring-doc.cn

XML 配置
<job id="personJob">
  <step id="step1.manager">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>

以下示例将 partition 属性映射到 in Java:handlerpartitionHandlerspring-doc.cn

Java 配置
	public Job personJob() {
		return jobBuilderFactory.get("personJob")
				.start(stepBuilderFactory.get("step1.manager")
						.partitioner("step1.worker", partitioner())
						.partitionHandler(partitionHandler())
						.build())
				.build();
	}

您可以在此处找到远程分区作业的完整示例。spring-doc.cn

可用于简化远程 partitioning 设置。此 Comments 提供了两个对远程分区有用的 bean:@EnableBatchIntegrationspring-doc.cn

  • RemotePartitioningManagerStepBuilderFactory:用于配置管理器步骤spring-doc.cn

  • RemotePartitioningWorkerStepBuilderFactory:用于配置 worker 步骤spring-doc.cn

这些 API 负责配置许多组件,如下图所示:spring-doc.cn

远程分区配置 (使用作业存储库轮询)
图 6.远程分区配置 (使用作业存储库轮询)
远程分区配置(带回复聚合)
图 7.远程分区配置(带回复聚合)

在经理方面,允许您 通过声明来配置 Manager 步骤:RemotePartitioningManagerStepBuilderFactoryspring-doc.cn

  • 用于对数据进行分区Partitionerspring-doc.cn

  • 输出通道(“Outgoing requests”)向 worker 发送请求spring-doc.cn

  • 输入通道(“传入回复”),用于接收来自 worker 的回复(配置回复聚合时)spring-doc.cn

  • Poll interval 和 timeout 参数(配置作业存储库轮询时)spring-doc.cn

和 不需要显式配置 (如果需要,仍可以显式配置这些内容)。MessageChannelPartitionHandlerMessagingTemplatespring-doc.cn

在 worker 端,允许您将 worker 配置为:RemotePartitioningWorkerStepBuilderFactoryspring-doc.cn

  • 侦听 Manager 在 input 通道上发送的请求(“Incoming requests”)spring-doc.cn

  • 调用 for each request 的方法handleStepExecutionRequestHandlerspring-doc.cn

  • 将输出通道上的回复(“传出回复”)发送到管理器spring-doc.cn

无需显式配置 (如果需要,可以显式配置)。StepExecutionRequestHandlerspring-doc.cn

以下示例演示如何使用这些 API:spring-doc.cn

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public Step managerStep() {
                 return this.managerStepBuilderFactory
                    .get("managerStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromManager())
                    .outputChannel(outgoingRepliesToManager())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }

        // Middleware beans setup omitted

    }

}