关于非阻塞 I/O (NIO)
使用 NIO (参见 IP 配置属性) 可避免将线程专用于从每个套接字读取。
对于少量的 sockets,你可能会发现不使用 NIO 以及异步切换(例如 to a )的性能与使用 NIO 一样好,甚至更好。using-nio
QueueChannel
在处理大量连接时,应考虑使用 NIO。 但是,使用 NIO 还有其他一些影响。 线程池(在 task executor 中)在所有套接字之间共享。 每个传入消息都被组装起来,并作为从该池中选择的线程上的单独工作单元发送到配置的通道。 到达同一套接字的两条 Sequential 消息可能由不同的线程处理。 这意味着消息发送到通道的顺序是不确定的。 不维护到达套接字的消息的严格顺序。
对于某些应用程序,这不是问题。
对其他人来说,这是一个问题。
如果需要严格的排序,请考虑设置并使用异步切换。using-nio
false
或者,您可以在入站终端节点的下游插入一个重新排序器,以将消息返回到正确的顺序。
如果在连接工厂上设置为 ,则到达 TCP 连接的消息将设置 and headers。
resequencer 使用这些 Headers 将消息返回到正确的 Sequence。apply-sequence
true
sequenceNumber
correlationId
从版本 5.1.4 开始,优先接受新连接而不是从现有连接中读取。
通常,除非您的新传入连接率非常高,否则这应该不会产生什么影响。
如果您希望恢复到以前给予读取优先权的行为,请将 property on 设置为 。multiAccept TcpNioServerConnectionFactory false |
池大小
不再使用 pool size 属性。
以前,当未指定 task-executor 时,它指定了默认线程池的大小。
它还用于设置服务器套接字上的连接积压。
不再需要第一个函数(请参阅下一段)。
第二个函数将替换为 attribute。backlog
以前,当在 NIO 中使用固定线程池任务执行程序(这是默认的)时,可能会出现死锁,并且处理将停止。 当缓冲区已满时,出现问题,从套接字读取的线程尝试向缓冲区添加更多数据,并且没有线程可用于在缓冲区中腾出空间。 这仅发生在非常小的泳池规模下,但在极端条件下也可能发生。 从 2.2 开始,两个更改消除了这个问题。 首先,默认任务执行程序是缓存的线程池执行程序。 其次,添加了死锁检测逻辑,以便在发生线程匮乏时,将引发异常,而不是死锁,从而释放死锁资源。
现在,默认任务执行程序是无限的,如果消息处理需要较长时间,则传入消息的速率较高时可能会出现内存不足情况。 如果您的应用程序表现出此类行为,则应使用具有适当池大小的池化任务执行程序,但请参阅下一节。 |
具有策略的线程池任务执行程序CALLER_RUNS
当您使用带有 ( when using the namespace) 且队列容量较小时,您应该记住一些重要的注意事项。CallerRunsPolicy
CALLER_RUNS
<task/>
如果您不使用固定线程池,则以下内容不适用。
使用 NIO 连接时,有三种不同的任务类型。 I/O 选择器处理在一个专用线程上执行(检测事件、接受新连接以及使用任务执行程序将 I/O 读取操作分派给其他线程)。 当 I/O 读取器线程(读取操作被调度到该线程)读取数据时,它会移交给另一个线程来组装传入的消息。 大型邮件可能需要多次读取才能完成。 这些 “assembler” 线程在等待数据时可能会阻塞。 当新的读取事件发生时,读取器会确定此套接字是否已经具有汇编器,如果没有,则运行新的汇编器。 组装过程完成后,汇编程序线程将返回到池中。
当池耗尽、拒绝策略正在使用并且任务队列已满时,这可能会导致死锁。
当池为空且队列中没有空间时,IO 选择器线程将接收事件并使用执行程序调度读取。
队列已满,因此 selector 线程本身会启动读取进程。
现在,它检测到此套接字没有汇编器,并在执行读取之前触发汇编器。
同样,队列已满,选择器线程成为汇编器。
汇编器现在被阻塞,等待读取数据,这永远不会发生。
连接工厂现在已死锁,因为 selector 线程无法处理新事件。CALLER_RUNS
OP_READ
为了避免这种死锁,我们必须避免 selector(或 reader)线程执行汇编任务。 我们希望为 IO 和汇编操作使用单独的池。
该框架提供了一个 ,它允许配置两个不同的执行程序:一个用于执行 IO 操作,另一个用于消息组装。
在此环境中,IO 线程永远不会成为汇编程序线程,并且不会发生死锁。CompositeExecutor
此外,应将任务执行程序配置为使用 ( when using )。
当 I/O 任务无法完成时,它会延迟一小段时间并不断重试,直到可以完成并分配一个汇编程序。
默认情况下,延迟为 100 毫秒,但您可以通过在连接工厂上设置属性来更改它(使用 XML 命名空间进行配置时)。AbortPolicy
ABORT
<task>
readDelay
read-delay
以下三个示例显示了如何配置复合执行程序:
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>