使用 NIO (参见 IP 配置属性) 可避免将线程专用于从每个套接字读取。 对于少量的 sockets,你可能会发现不使用 NIO 以及异步切换(例如 to a )的性能与使用 NIO 一样好,甚至更好。using-nioQueueChannelspring-doc.cn

在处理大量连接时,应考虑使用 NIO。 但是,使用 NIO 还有其他一些影响。 线程池(在 task executor 中)在所有套接字之间共享。 每个传入消息都被组装起来,并作为从该池中选择的线程上的单独工作单元发送到配置的通道。 到达同一套接字的两条 Sequential 消息可能由不同的线程处理。 这意味着消息发送到通道的顺序是不确定的。 不维护到达套接字的消息的严格顺序。spring-doc.cn

对于某些应用程序,这不是问题。 对其他人来说,这是一个问题。 如果需要严格的排序,请考虑设置并使用异步切换。using-niofalsespring-doc.cn

或者,您可以在入站终端节点的下游插入一个重新排序器,以将消息返回到正确的顺序。 如果在连接工厂上设置为 ,则到达 TCP 连接的消息将设置 and headers。 resequencer 使用这些 Headers 将消息返回到正确的 Sequence。apply-sequencetruesequenceNumbercorrelationIdspring-doc.cn

从版本 5.1.4 开始,优先接受新连接而不是从现有连接中读取。 通常,除非您的新传入连接率非常高,否则这应该不会产生什么影响。 如果您希望恢复到以前给予读取优先权的行为,请将 property on 设置为 。multiAcceptTcpNioServerConnectionFactoryfalse
从版本 5.1.4 开始,优先接受新连接而不是从现有连接中读取。 通常,除非您的新传入连接率非常高,否则这应该不会产生什么影响。 如果您希望恢复到以前给予读取优先权的行为,请将 property on 设置为 。multiAcceptTcpNioServerConnectionFactoryfalse

池大小

不再使用 pool size 属性。 以前,当未指定 task-executor 时,它指定了默认线程池的大小。 它还用于设置服务器套接字上的连接积压。 不再需要第一个函数(请参阅下一段)。 第二个函数将替换为 attribute。backlogspring-doc.cn

以前,当在 NIO 中使用固定线程池任务执行程序(这是默认的)时,可能会出现死锁,并且处理将停止。 当缓冲区已满时,出现问题,从套接字读取的线程尝试向缓冲区添加更多数据,并且没有线程可用于在缓冲区中腾出空间。 这仅发生在非常小的泳池规模下,但在极端条件下也可能发生。 从 2.2 开始,两个更改消除了这个问题。 首先,默认任务执行程序是缓存的线程池执行程序。 其次,添加了死锁检测逻辑,以便在发生线程匮乏时,将引发异常,而不是死锁,从而释放死锁资源。spring-doc.cn

现在,默认任务执行程序是无限的,如果消息处理需要较长时间,则传入消息的速率较高时可能会出现内存不足情况。 如果您的应用程序表现出此类行为,则应使用具有适当池大小的池化任务执行程序,但请参阅下一节
现在,默认任务执行程序是无限的,如果消息处理需要较长时间,则传入消息的速率较高时可能会出现内存不足情况。 如果您的应用程序表现出此类行为,则应使用具有适当池大小的池化任务执行程序,但请参阅下一节

具有策略的线程池任务执行程序CALLER_RUNS

当您使用带有 ( when using the namespace) 且队列容量较小时,您应该记住一些重要的注意事项。CallerRunsPolicyCALLER_RUNS<task/>spring-doc.cn

如果您不使用固定线程池,则以下内容不适用。spring-doc.cn

使用 NIO 连接时,有三种不同的任务类型。 I/O 选择器处理在一个专用线程上执行(检测事件、接受新连接以及使用任务执行程序将 I/O 读取操作分派给其他线程)。 当 I/O 读取器线程(读取操作被调度到该线程)读取数据时,它会移交给另一个线程来组装传入的消息。 大型邮件可能需要多次读取才能完成。 这些 “assembler” 线程在等待数据时可能会阻塞。 当新的读取事件发生时,读取器会确定此套接字是否已经具有汇编器,如果没有,则运行新的汇编器。 组装过程完成后,汇编程序线程将返回到池中。spring-doc.cn

当池耗尽、拒绝策略正在使用并且任务队列已满时,这可能会导致死锁。 当池为空且队列中没有空间时,IO 选择器线程将接收事件并使用执行程序调度读取。 队列已满,因此 selector 线程本身会启动读取进程。 现在,它检测到此套接字没有汇编器,并在执行读取之前触发汇编器。 同样,队列已满,选择器线程成为汇编器。 汇编器现在被阻塞,等待读取数据,这永远不会发生。 连接工厂现在已死锁,因为 selector 线程无法处理新事件。CALLER_RUNSOP_READspring-doc.cn

为了避免这种死锁,我们必须避免 selector(或 reader)线程执行汇编任务。 我们希望为 IO 和汇编操作使用单独的池。spring-doc.cn

该框架提供了一个 ,它允许配置两个不同的执行程序:一个用于执行 IO 操作,另一个用于消息组装。 在此环境中,IO 线程永远不会成为汇编程序线程,并且不会发生死锁。CompositeExecutorspring-doc.cn

此外,应将任务执行程序配置为使用 ( when using )。 当 I/O 任务无法完成时,它会延迟一小段时间并不断重试,直到可以完成并分配一个汇编程序。 默认情况下,延迟为 100 毫秒,但您可以通过在连接工厂上设置属性来更改它(使用 XML 命名空间进行配置时)。AbortPolicyABORT<task>readDelayread-delayspring-doc.cn

以下三个示例显示了如何配置复合执行程序:spring-doc.cn

@Bean
private CompositeExecutor compositeExecutor() {
    ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
    ioExec.setCorePoolSize(4);
    ioExec.setMaxPoolSize(10);
    ioExec.setQueueCapacity(0);
    ioExec.setThreadNamePrefix("io-");
    ioExec.setRejectedExecutionHandler(new AbortPolicy());
    ioExec.initialize();
    ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
    assemblerExec.setCorePoolSize(4);
    assemblerExec.setMaxPoolSize(10);
    assemblerExec.setQueueCapacity(0);
    assemblerExec.setThreadNamePrefix("assembler-");
    assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
    assemblerExec.initialize();
    return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg ref="io"/>
    <constructor-arg ref="assembler"/>
</bean>

<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="io-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="8" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="threadNamePrefix" value="assembler-" />
            <property name="corePoolSize" value="4" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="0" />
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
            </property>
        </bean>
    </constructor-arg>
</bean>