该接口可以直接实现并指定为扫描组件,如以下示例所示:IntegrationFlowSpring中文文档

@Component
public class MyFlow implements IntegrationFlow {

    @Override
    public void configure(IntegrationFlowDefinition<?> f) {
        f.<String, String>transform(String::toUpperCase);
    }

}

它被 选取,并在应用程序上下文中正确解析和注册。IntegrationFlowBeanPostProcessorSpring中文文档

为了方便起见并获得松散耦合体系结构的好处,我们提供了基类实现。 它需要一个方法实现才能使用其中一个方法生成 Through,如以下示例所示:IntegrationFlowAdapterbuildFlow()IntegrationFlowDefinitionfrom()Spring中文文档

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

    private final AtomicBoolean invoked = new AtomicBoolean();

    public Instant nextExecutionTime(TriggerContext triggerContext) {
        return this.invoked.getAndSet(true) ? null : Instant.now();
    }

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return fromSupplier(this::messageSource,
                e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                .split(this)
                .transform(this)
                .aggregate(this)
                .enrichHeaders(Collections.singletonMap("thing1", "THING1"))
                .filter(this)
                .handle(this)
                .channel(c -> c.queue("myFlowAdapterOutput"));
    }

    public String messageSource() {
        return "T,H,I,N,G,2";
    }

    @Splitter
    public String[] split(String payload) {
        return StringUtils.commaDelimitedListToStringArray(payload);
    }

    @Transformer
    public String transform(String payload) {
        return payload.toLowerCase();
    }

    @Aggregator
    public String aggregate(List<String> payloads) {
        return payloads.stream().collect(Collectors.joining());
    }

    @Filter
    public boolean filter(@Header Optional<String> thing1) {
        return thing1.isPresent();
    }

    @ServiceActivator
    public String handle(String payload, @Header String thing1) {
        return payload + ":" + thing1;
    }

}