For the latest stable version, please use Spring Integration 6.4.0! |
IntegrationFlowAdapter
The IntegrationFlow
interface can be implemented directly and specified as a component for scanning, as the following example shows:
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
It is picked up by the IntegrationFlowBeanPostProcessor
and correctly parsed and registered in the application context.
For convenience and to gain the benefits of loosely coupled architecture, we provide the IntegrationFlowAdapter
base class implementation.
It requires a buildFlow()
method implementation to produce an IntegrationFlowDefinition
by using one of from()
methods, as the following example shows:
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new AtomicBoolean();
public Instant nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : Instant.now();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return fromSupplier(this::messageSource,
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(this)
.enrichHeaders(Collections.singletonMap("thing1", "THING1"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "T,H,I,N,G,2";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> thing1) {
return thing1.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String thing1) {
return payload + ":" + thing1;
}
}