Sub-elements
When this Gateway
is receiving messages from a
PollableChannel
, you must either provide
a global default Poller
or provide a Poller
sub-element to the
Job Launching Gateway
.
-
Java
-
XML
The following example shows how to provide a poller in Java:
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
jobLaunchingGateway.setOutputChannel(replyChannel());
return jobLaunchingGateway;
}
The following example shows how to provide a poller in XML:
<batch-int:job-launching-gateway request-channel="queueChannel"
reply-channel="replyChannel" job-launcher="jobLauncher">
<int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>
Providing Feedback with Informational Messages
As Spring Batch jobs can run for long times, providing progress information is often critical. For example, stakeholders may want to be notified if some or all parts of a batch job have failed. Spring Batch provides support for this information being gathered through:
-
Active polling
-
Event-driven listeners
When starting a Spring Batch job asynchronously (for example, by using the Job Launching
Gateway), a JobExecution
instance is returned. Thus, you can use JobExecution.getJobId()
to continuously poll for status updates by retrieving updated instances of the
JobExecution
from the JobRepository
by using the JobExplorer
. However, this is
considered sub-optimal, and an event-driven approach is preferred.
Therefore, Spring Batch provides listeners, including the three most commonly used listeners:
-
StepListener
-
ChunkListener
-
JobExecutionListener
In the example shown in the following image, a Spring Batch job has been configured with a
StepExecutionListener
. Thus, Spring Integration receives and processes any step before
or after events. For example, you can inspect the received StepExecution
by using a
Router
. Based on the results of that inspection, various things can occur (such as
routing a message to a mail outbound channel adapter), so that an email notification can
be sent out based on some condition.
The following two-part example shows how a listener is configured to send a
message to a Gateway
for a StepExecution
events and log its output to a
logging-channel-adapter
.
First, create the notification integration beans.
-
Java
-
XML
The following example shows the how to create the notification integration beans in Java:
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
You need to add the @IntegrationComponentScan annotation to your configuration.
|
The following example shows the how to create the notification integration beans in XML:
<int:channel id="stepExecutionsChannel"/>
<int:gateway id="notificationExecutionsListener"
service-interface="org.springframework.batch.core.StepExecutionListener"
default-request-channel="stepExecutionsChannel"/>
<int:logging-channel-adapter channel="stepExecutionsChannel"/>
Second, modify your job to add a step-level listener.
-
Java
-
XML
The following example shows the how to add a step-level listener in Java:
public Job importPaymentsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("importPayments", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.chunk(200, transactionManager)
.listener(notificationExecutionsListener())
// ...
.build();
)
.build();
}
The following example shows the how to add a step-level listener in XML:
<job id="importPayments">
<step id="step1">
<tasklet ../>
<chunk ../>
<listeners>
<listener ref="notificationExecutionsListener"/>
</listeners>
</tasklet>
...
</step>
</job>
Asynchronous Processors
Asynchronous Processors help you scale the processing of items. In the asynchronous
processor use case, an AsyncItemProcessor
serves as a dispatcher, executing the logic of
the ItemProcessor
for an item on a new thread. Once the item completes, the Future
is
passed to the AsyncItemWriter
to be written.
Therefore, you can increase performance by using asynchronous item processing, basically
letting you implement fork-join scenarios. The AsyncItemWriter
gathers the results and
writes back the chunk as soon as all the results become available.
-
Java
-
XML
The following example shows how to configuration the AsyncItemProcessor
in Java:
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setTaskExecutor(taskExecutor);
asyncItemProcessor.setDelegate(itemProcessor);
return asyncItemProcessor;
}
The following example shows how to configuration the AsyncItemProcessor
in XML:
<bean id="processor"
class="org.springframework.batch.integration.async.AsyncItemProcessor">
<property name="delegate">
<bean class="your.ItemProcessor"/>
</property>
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
</property>
</bean>
The delegate
property refers to your ItemProcessor
bean, and the taskExecutor
property refers to the TaskExecutor
of your choice.
-
Java
-
XML
The following example shows how to configure the AsyncItemWriter
in Java:
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
The following example shows how to configure the AsyncItemWriter
in XML:
<bean id="itemWriter"
class="org.springframework.batch.integration.async.AsyncItemWriter">
<property name="delegate">
<bean id="itemWriter" class="your.ItemWriter"/>
</property>
</bean>
Again, the delegate
property is
actually a reference to your ItemWriter
bean.
Externalizing Batch Process Execution
The integration approaches discussed so far suggest use cases where Spring Integration wraps Spring Batch like an outer shell. However, Spring Batch can also use Spring Integration internally. By using this approach, Spring Batch users can delegate the processing of items or even chunks to outside processes. This lets you offload complex processing. Spring Batch Integration provides dedicated support for:
-
Remote Chunking
-
Remote Partitioning
Remote Chunking
The following image shows one way that remote chunking works when you use Spring Batch together with Spring Integration:
Taking things one step further, you can also externalize the
chunk processing by using the
ChunkMessageChannelItemWriter
(provided by Spring Batch Integration), which sends items out
and collects the result. Once sent, Spring Batch continues the
process of reading and grouping items, without waiting for the results.
Rather, it is the responsibility of the ChunkMessageChannelItemWriter
to gather the results and integrate them back into the Spring Batch process.
With Spring Integration, you have full
control over the concurrency of your processes (for instance, by
using a QueueChannel
instead of a
DirectChannel
). Furthermore, by relying on
Spring Integration’s rich collection of channel adapters (such as
JMS and AMQP), you can distribute chunks of a batch job to
external systems for processing.
-
Java
-
XML
A job with a step to be remotely chunked might have a configuration similar to the following in Java:
public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Person, Person>chunk(200, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
A job with a step to be remotely chunked might have a configuration similar to the following in XML:
<job id="personJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
</tasklet>
...
</step>
</job>
The ItemReader
reference points to the bean you want to use for reading data on the
manager. The ItemWriter
reference points to a special ItemWriter
(called
ChunkMessageChannelItemWriter
), as described earlier. The processor (if any) is left off
the manager configuration, as it is configured on the worker. You should check any
additional component properties, such as throttle limits and so on, when implementing
your use case.
-
Java
-
XML
The following Java configuration provides a basic manager setup:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
/*
* Configure the ChunkMessageChannelItemWriter
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
= new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
The following XML configuration provides a basic manager setup:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>
<bean id="messagingTemplate"
class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="requests"/>
<property name="receiveTimeout" value="2000"/>
</bean>
<bean id="itemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step">
<property name="messagingOperations" ref="messagingTemplate"/>
<property name="replyChannel" ref="replies"/>
</bean>
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsReplies"
destination-name="replies"
channel="replies"/>
The preceding configuration provides us with a number of beans. We
configure our messaging middleware by using ActiveMQ and the
inbound and outbound JMS adapters provided by Spring Integration. As
shown, our itemWriter
bean, which is
referenced by our job step, uses the
ChunkMessageChannelItemWriter
to write chunks over the
configured middleware.
Now we can move on to the worker configuration, as the following example shows:
-
Java
-
XML
The following example shows the worker configuration in Java:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
/*
* Configure outbound flow (replies going to the manager)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
/*
* Configure the ChunkProcessorChunkHandler
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
= new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
The following example shows the worker configuration in XML:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int:channel id="requests"/>
<int:channel id="replies"/>
<int-jms:message-driven-channel-adapter id="incomingRequests"
destination-name="requests"
channel="requests"/>
<int-jms:outbound-channel-adapter id="outgoingReplies"
destination-name="replies"
channel="replies">
</int-jms:outbound-channel-adapter>
<int:service-activator id="serviceActivator"
input-channel="requests"
output-channel="replies"
ref="chunkProcessorChunkHandler"
method="handleChunk"/>
<bean id="chunkProcessorChunkHandler"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
<property name="chunkProcessor">
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
<property name="itemWriter">
<bean class="io.spring.sbi.PersonItemWriter"/>
</property>
<property name="itemProcessor">
<bean class="io.spring.sbi.PersonItemProcessor"/>
</property>
</bean>
</property>
</bean>
Most of these configuration items should look familiar from the
manager configuration. Workers do not need access to
the Spring Batch JobRepository
nor
to the actual job configuration file. The main bean of interest
is the chunkProcessorChunkHandler
. The
chunkProcessor
property of ChunkProcessorChunkHandler
takes a
configured SimpleChunkProcessor
, which is where you would provide a reference to your
ItemWriter
(and, optionally, your
ItemProcessor
) that will run on the worker
when it receives chunks from the manager.
For more information, see the section of the “Scalability” chapter on Remote Chunking.
Starting from version 4.1, Spring Batch Integration introduces the @EnableBatchIntegration
annotation that can be used to simplify a remote chunking setup. This annotation provides
two beans that you can autowire in your application context:
-
RemoteChunkingManagerStepBuilderFactory
: Configures the manager step -
RemoteChunkingWorkerBuilder
: Configures the remote worker integration flow
These APIs take care of configuring a number of components, as the following diagram shows:
On the manager side, the RemoteChunkingManagerStepBuilderFactory
lets you
configure a manager step by declaring:
-
The item reader to read items and send them to workers
-
The output channel ("Outgoing requests") to send requests to workers
-
The input channel ("Incoming replies") to receive replies from workers
You need not explicitly configure ChunkMessageChannelItemWriter
and the MessagingTemplate
.
(You can still explicitly configure them if find a reason to do so).
On the worker side, the RemoteChunkingWorkerBuilder
lets you configure a worker to:
-
Listen to requests sent by the manager on the input channel (“Incoming requests”)
-
Call the
handleChunk
method ofChunkProcessorChunkHandler
for each request with the configuredItemProcessor
andItemWriter
-
Send replies on the output channel (“Outgoing replies”) to the manager
You need not explicitly configure the SimpleChunkProcessor
and the ChunkProcessorChunkHandler
. (You can still explicitly configure them if you find
a reason to do so).
The following example shows how to use these APIs:
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the manager
.outputChannel(replies()) // replies sent to the manager
.build();
}
// Middleware beans setup omitted
}
}
You can find a complete example of a remote chunking job here.
Remote Partitioning
The following image shows a typical remote partitioning situation:
Remote Partitioning, on the other hand, is useful when it
is not the processing of items but rather the associated I/O that
causes the bottleneck. With remote partitioning, you can send work
to workers that execute complete Spring Batch
steps. Thus, each worker has its own ItemReader
, ItemProcessor
, and
ItemWriter
. For this purpose, Spring Batch
Integration provides the MessageChannelPartitionHandler
.
This implementation of the PartitionHandler
interface uses MessageChannel
instances to
send instructions to remote workers and receive their responses.
This provides a nice abstraction from the transports (such as JMS
and AMQP) being used to communicate with the remote workers.
The section of the “Scalability” chapter that addresses
remote partitioning provides an overview of the concepts and
components needed to configure remote partitioning and shows an
example of using the default
TaskExecutorPartitionHandler
to partition
in separate local threads of execution. For remote partitioning
to multiple JVMs, two additional components are required:
-
A remoting fabric or grid environment
-
A
PartitionHandler
implementation that supports the desired remoting fabric or grid environment
Similar to remote chunking, you can use JMS as the “remoting fabric”. In that case, use
a MessageChannelPartitionHandler
instance as the PartitionHandler
implementation,
as described earlier.
-
Java
-
XML
The following example assumes an existing partitioned job and focuses on the
MessageChannelPartitionHandler
and JMS configuration in Java:
/*
* Configuration of the manager side
*/
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(3);
partitionHandler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100000);
partitionHandler.setMessagingOperations(template);
return partitionHandler;
}
@Bean
public QueueChannel outboundReplies() {
return new QueueChannel();
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsRequests() {
return IntegrationFlow.from("outboundRequests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requestsQueue"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(partitionHandler());
aggregatorFactoryBean.setOutputChannel(outboundReplies());
// configure other propeties of the aggregatorFactoryBean
return aggregatorFactoryBean;
}
@Bean
public DirectChannel inboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundJmsStaging() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("stagingQueue"))
.channel(inboundStaging())
.get();
}
/*
* Configuration of the worker side
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}
@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}
@Bean
public DirectChannel inboundRequests() {
return new DirectChannel();
}
public IntegrationFlow inboundJmsRequests() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requestsQueue"))
.channel(inboundRequests())
.get();
}
@Bean
public DirectChannel outboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsStaging() {
return IntegrationFlow.from("outboundStaging")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("stagingQueue"))
.get();
}
The following example assumes an existing partitioned job and focuses on the
MessageChannelPartitionHandler
and JMS configuration in XML:
<bean id="partitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="stepName" value="step1"/>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outbound-replies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outbound-requests"/>
<property name="receiveTimeout" value="100000"/>
</bean>
</property>
</bean>
<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
channel="outbound-requests"/>
<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
channel="inbound-requests"/>
<bean id="stepExecutionRequestHandler"
class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="stepLocator" ref="stepLocator"/>
</bean>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
output-channel="outbound-staging"/>
<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
channel="outbound-staging"/>
<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
channel="inbound-staging"/>
<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
output-channel="outbound-replies"/>
<int:channel id="outbound-replies">
<int:queue/>
</int:channel>
<bean id="stepLocator"
class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
You must also ensure that the partition handler
attribute maps to the partitionHandler
bean.
-
Java
-
XML
The following example maps the partition handler
attribute to the partitionHandler
in
Java:
public Job personJob(JobRepository jobRepository) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1.manager", jobRepository)
.partitioner("step1.worker", partitioner())
.partitionHandler(partitionHandler())
.build())
.build();
}
The following example maps the partition handler
attribute to the partitionHandler
in
XML:
<job id="personJob">
<step id="step1.manager">
<partition partitioner="partitioner" handler="partitionHandler"/>
...
</step>
</job>
You can find a complete example of a remote partitioning job here.
You can use the @EnableBatchIntegration
annotation to simplify a remote
partitioning setup. This annotation provides two beans that are useful for remote partitioning:
-
RemotePartitioningManagerStepBuilderFactory
: Configures the manager step -
RemotePartitioningWorkerStepBuilderFactory
: Configures the worker step
These APIs take care of configuring a number of components, as the following diagrams show:
On the manager side, the RemotePartitioningManagerStepBuilderFactory
lets you
configure a manager step by declaring:
-
The
Partitioner
used to partition data -
The output channel (“Outgoing requests”) on which to send requests to workers
-
The input channel (“Incoming replies”) on which to receive replies from workers (when configuring replies aggregation)
-
The poll interval and timeout parameters (when configuring job repository polling)
You need not explicitly configure The MessageChannelPartitionHandler
and the MessagingTemplate
.
(You can still explicitly configured them if you find a reason to do so).
On the worker side, the RemotePartitioningWorkerStepBuilderFactory
lets you configure a worker to:
-
Listen to requests sent by the manager on the input channel (“Incoming requests”)
-
Call the
handle
method ofStepExecutionRequestHandler
for each request -
Send replies on the output channel (“Outgoing replies”) to the manager
You need not explicitly configure the StepExecutionRequestHandler
.
(You can explicitly configure it if you find a reason to do so).
The following example shows how to use these APIs:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
// Middleware beans setup omitted
}
}