此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0spring-doc.cadn.net.cn

入站通道适配器:轮询多个服务器和目录

从版本 5.0.7 开始,RotatingServerAdvice可用;当配置为 Poller Advice 时,入站适配器可以轮询多个服务器和目录。 配置建议并将其正常添加到 Poller 的建议链中。 一个DelegatingSessionFactory用于选择服务器,有关更多信息,请参阅 Delegating Session Factory。 建议配置由一个RotationPolicy.KeyDirectory对象。spring-doc.cadn.net.cn

@Bean
public RotatingServerAdvice advice() {
    List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
    return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

此建议将轮询目录foo在服务器上one直到没有新文件存在,然后移动到目录bar,然后是目录baz在服务器上two等。spring-doc.cadn.net.cn

可以使用fairconstructor arg 的 Constructor Arg 中:spring-doc.cadn.net.cn

公平
@Bean
public RotatingServerAdvice advice() {
    ...
    return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}

在这种情况下,无论上一个 poll 是否返回文件,通知都将移动到下一个服务器/目录。spring-doc.cadn.net.cn

或者,您也可以提供自己的RotationPolicy要根据需要重新配置消息源:spring-doc.cadn.net.cn

政策
public interface RotationPolicy {

    void beforeReceive(MessageSource<?> source);

    void afterReceive(boolean messageReceived, MessageSource<?> source);

}
习惯
@Bean
public RotatingServerAdvice advice() {
    return new RotatingServerAdvice(myRotationPolicy());
}

local-filename-generator-expression属性 (localFilenameGeneratorExpression)现在可以包含#remoteDirectory变量。 这允许将从不同目录中检索到的文件下载到本地的类似目录:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(Sftp.inboundAdapter(sf())
                    .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                    .localDirectory(new File(tmpDir))
                    .localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
                    .remoteDirectory("."),
                e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
            .channel(MessageChannels.queue("files"))
            .get();
}
不要配置TaskExecutor在轮询器上有关更多信息,请参见Conditional Pollers for Message Sources

APP信息