For the latest stable version, please use Spring Integration 6.3.4!spring-doc.cn

For the latest stable version, please use Spring Integration 6.3.4!spring-doc.cn

Starting with version 5.3, an IntegrationFlowExtension has been introduced to allow extension of the existing Java DSL with custom or composed EIP-operators. All that is needed is an extension of this class that provides methods which can be used in the IntegrationFlow bean definitions. The extension class can also be used for custom IntegrationComponentSpec configuration; for example, missed or default options can be implemented in the existing IntegrationComponentSpec extension. The sample below demonstrates a composite custom operator and usage of an AggregatorSpec extension for a default custom outputProcessor:spring-doc.cn

public class CustomIntegrationFlowDefinition
        extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {

    public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
        return split()
                .transform("payload.toUpperCase()");
    }

    public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
        return register(new CustomAggregatorSpec(), aggregator);
    }

}

public class CustomAggregatorSpec extends AggregatorSpec {

    CustomAggregatorSpec() {
        outputProcessor(group ->
                group.getMessages()
                        .stream()
                        .map(Message::getPayload)
                        .map(String.class::cast)
                        .collect(Collectors.joining(", ")));
    }

}

For a method chain flow the new DSL operator in these extensions must return the extension class. This way a target IntegrationFlow definition will work with new and existing DSL operators:spring-doc.cn

@Bean
public IntegrationFlow customFlowDefinition() {
    return
            new CustomIntegrationFlowDefinition()
                    .log()
                    .upperCaseAfterSplit()
                    .channel("innerChannel")
                    .customAggregate(customAggregatorSpec ->
                            customAggregatorSpec.expireGroupsUponCompletion(true))
                    .logAndReply();
}