对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

概述

对于 TCP,基础连接的配置是使用连接工厂提供的。 提供两种类型的连接工厂:客户端连接工厂和服务器连接工厂。 客户端连接工厂建立传出连接。 服务器连接工厂侦听传入连接。Spring中文文档

出站通道适配器使用客户端连接工厂,但您也可以向入站通道适配器提供对客户端连接工厂的引用。 该适配器接收在出站适配器创建的连接上接收的任何传入消息。Spring中文文档

入站通道适配器或网关使用服务器连接工厂。 (事实上,没有连接工厂就无法运行)。 还可以提供对出站适配器的服务器连接工厂的引用。 然后,您可以使用该适配器发送对同一连接上的传入邮件的答复。Spring中文文档

仅当回复包含连接工厂插入到原始消息中的标头时,回复消息才会路由到连接。ip_connectionId
这是在入站适配器和出站适配器之间共享连接工厂时执行的消息关联程度。 这种共享允许通过 TCP 进行异步双向通信。 默认情况下,仅使用 TCP 传输有效负载信息。 因此,任何消息关联都必须由下游组件(如聚合器或其他端点)执行。 版本 3.0 中引入了对传输选定标头的支持。 有关详细信息,请参阅 TCP 消息关联

您可以为每种类型的最多一个适配器提供对连接工厂的引用。Spring中文文档

Spring Integration 提供了使用 和 的连接工厂。java.net.Socketjava.nio.channel.SocketChannelSpring中文文档

以下示例演示了使用连接的简单服务器连接工厂:java.net.SocketSpring中文文档

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

以下示例演示了使用连接的简单服务器连接工厂:java.nio.channel.SocketChannelSpring中文文档

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>
从 Spring Integration 版本 4.2 开始,如果服务器配置为侦听随机端口(通过将端口设置为 ),则可以使用 获取操作系统选择的实际端口。 此外,还可以让您获得完整的. 有关更多信息,请参见 TcpServerConnectionFactory 接口的 Javadoc0getPort()getServerSocketAddress()SocketAddress
<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

下面的示例显示了一个客户端连接工厂,该工厂使用连接并为每条消息创建一个新连接:java.net.SocketSpring中文文档

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

从 V5.2 开始,客户端连接工厂支持属性 ,以秒为单位指定,默认值为 60。connectTimeoutSpring中文文档

仅当回复包含连接工厂插入到原始消息中的标头时,回复消息才会路由到连接。ip_connectionId
这是在入站适配器和出站适配器之间共享连接工厂时执行的消息关联程度。 这种共享允许通过 TCP 进行异步双向通信。 默认情况下,仅使用 TCP 传输有效负载信息。 因此,任何消息关联都必须由下游组件(如聚合器或其他端点)执行。 版本 3.0 中引入了对传输选定标头的支持。 有关详细信息,请参阅 TCP 消息关联
从 Spring Integration 版本 4.2 开始,如果服务器配置为侦听随机端口(通过将端口设置为 ),则可以使用 获取操作系统选择的实际端口。 此外,还可以让您获得完整的. 有关更多信息,请参见 TcpServerConnectionFactory 接口的 Javadoc0getPort()getServerSocketAddress()SocketAddress

消息分界(序列化程序和反序列化程序)

TCP 是一种流协议。 这意味着必须为通过TCP传输的数据提供一些结构,以便接收方可以将数据划分为离散的消息。 连接工厂配置为使用序列化程序和反序列化程序在消息有效负载和通过 TCP 发送的位之间进行转换。 这是通过分别为入站和出站邮件提供反序列化程序和序列化程序来实现的。 Spring Integration 提供了许多标准序列化程序和反序列化程序。Spring中文文档

ByteArrayCrlfSerializer*将字节数组转换为字节流,后跟回车符和换行符 ()。 这是默认的序列化程序(和反序列化程序),可以(例如)与 telnet 一起使用作为客户端。\r\nSpring中文文档

将字节数组转换为字节流,后跟单个终止字符(默认值为 )。ByteArraySingleTerminatorSerializer*0x00Spring中文文档

将字节数组转换为字节流,后跟一个换行符 ()。ByteArrayLfSerializer*0x0aSpring中文文档

将字节数组转换为字节流,其前跟 STX () 后跟 ETX ()。ByteArrayStxEtxSerializer*0x020x03Spring中文文档

将字节数组转换为字节流,其前面是网络字节顺序(大字节序)的二进制长度。 这是一个有效的反序列化程序,因为它不必解析每个字节来查找终止字符序列。 它还可用于包含二进制数据的有效负载。 前面的序列化程序仅支持有效负载中的文本。 长度标头的默认大小为 4 个字节(一个 Integer),允许最多 (2^31 - 1) 个字节的消息。 但是,对于最大 255 字节的消息,标头可以是单个字节(无符号),对于最大 (2^16 - 1) 字节的消息,标头可以是无符号短(2 字节)。 如果需要标头的任何其他格式,可以对 and 方法进行子类化并提供实现。 绝对最大数据大小为 (2^31 - 1) 字节。 从版本 5.2 开始,除了有效负载之外,标头值还可以包括标头的长度。 设置属性以启用该机制(必须为生产者和使用者设置相同的属性)。ByteArrayLengthHeaderSerializerlengthByteArrayLengthHeaderSerializerreadHeaderwriteHeaderinclusiveSpring中文文档

这 ,将字节数组转换为字节流,并且不添加额外的消息分界数据。 使用此序列化程序(和反序列化程序)时,客户端以有序的方式关闭套接字来指示消息的结束。 使用此序列化程序时,消息接收将挂起,直到客户端关闭套接字或发生超时。 超时不会导致消息。 当使用此序列化程序并且客户端是 Spring Integration 应用程序时,客户端必须使用配置了 的连接工厂。 这样做会导致适配器在发送消息后关闭套接字。 序列化程序本身不会关闭连接。 应仅将此序列化程序用于通道适配器(而不是网关)使用的连接工厂,并且应由入站或出站适配器使用连接工厂,但不能同时使用两者。 另请参阅本节后面的 。 但是,从版本 5.2 开始,出站网关具有新属性;这允许使用原始序列化程序/反序列化程序,因为 EOF 向服务器发出信号,同时保持连接处于打开状态以接收回复。ByteArrayRawSerializer*single-use="true"ByteArrayElasticRawDeserializercloseStreamAfterSendSpring中文文档

在版本 4.2.2 之前,当使用非阻塞 I/O (NIO) 时,此序列化程序将超时(读取期间)视为文件结束,并且到目前为止读取的数据作为消息发出。 这是不可靠的,不应用于分隔消息。 它现在将此类条件视为例外。 万一您以这种方式使用它,您可以通过将构造函数参数设置为 来恢复以前的行为。treatTimeoutAsEndOfMessagetrue

它们中的每一个都是 的子类,它实现了 和 。 为了向后兼容,使用 for 序列化的任何子类的连接也接受首先转换为字节数组的 a。 这些序列化程序和反序列化程序中的每一个都将包含相应格式的输入流转换为字节数组有效负载。AbstractByteArraySerializerorg.springframework.core.serializer.Serializerorg.springframework.core.serializer.DeserializerAbstractByteArraySerializerStringSpring中文文档

为了避免由于行为不端的客户端(不遵守所配置序列化程序的协议)而导致内存耗尽,这些序列化程序会强制执行最大消息大小。 如果传入消息超过此大小,则会引发异常。 默认最大消息大小为 2048 字节。 您可以通过设置属性来增加它。 如果使用缺省序列化程序或反序列化程序,并希望增加最大消息大小,那么必须使用该属性将最大消息大小声明为显式 Bean,并将连接工厂配置为使用该 Bean。maxMessageSizemaxMessageSizeSpring中文文档

本节前面标记的类使用中间缓冲区,并将解码后的数据复制到正确大小的最终缓冲区。 从版本 4.3 开始,可以通过设置一个属性来配置这些缓冲区,以允许重用这些原始缓冲区,而不是为每条消息分配和丢弃这些缓冲区,这是默认行为。 将属性设置为负值将创建一个没有边界的池。 如果池是有界的,则还可以设置该属性(以毫秒为单位),之后如果没有可用的缓冲区,则会引发异常。 它默认为无穷大。 此类异常会导致套接字关闭。*poolSizepoolWaitTimeoutSpring中文文档

如果您希望在自定义反序列化程序中使用相同的机制,您可以扩展(而不是其超类)并实现而不是 。 缓冲区将自动返回到池中。 还提供了一种方便的实用方法:.AbstractPooledBufferByteArraySerializerAbstractByteArraySerializerdoDeserialize()deserialize()AbstractPooledBufferByteArraySerializercopyToSizedArray()Spring中文文档

版本 5.0 添加了 . 这与上面的解串器侧类似,只是没有必要设置 . 在内部,它使用一个让缓冲区根据需要增长。 客户端必须有序地关闭套接字,以发出消息结束的信号。ByteArrayElasticRawDeserializerByteArrayRawSerializermaxMessageSizeByteArrayOutputStreamSpring中文文档

仅当对等体受信任时,才应使用此解串器;由于内存不足的情况,它容易受到 DoS 连接的影响。

使用 Jackson 在 a 和 JSON 之间进行转换。 可以将此序列化程序与 a 和 a 结合使用,以 JSON 格式传输选定的标头和有效负载。MapJsonSerializerObjectMapperMapMessageConvertingTcpMessageMapperMapMessageConverterSpring中文文档

Jackson 无法在流中划分消息。 因此,需要委托给另一个序列化器或反序列化器来处理消息分隔。 默认情况下,使用 a,生成格式为 on the wire 的消息,但您可以将其配置为改用其他消息。 (下一个示例演示如何执行此操作。ObjectMapperMapJsonSerializerByteArrayLfSerializer<json><LF>

最终的标准序列化程序是 ,您可以使用它来转换具有 Java 序列化的可序列化对象。 用于对包含可序列化对象的流进行入站反序列化。org.springframework.core.serializer.DefaultSerializerorg.springframework.core.serializer.DefaultDeserializerSpring中文文档

如果不希望使用默认的序列化程序和反序列化程序 (),则必须在连接工厂上设置 and 属性。 以下示例演示如何执行此操作:ByteArrayCrLfSerializerserializerdeserializerSpring中文文档

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

使用连接并在网络上使用 Java 序列化的服务器连接工厂。java.net.SocketSpring中文文档

有关连接工厂上可用属性的完整详细信息,请参阅本节末尾的参考Spring中文文档

默认情况下,不会对入站数据包执行反向 DNS 查找:在未配置 DNS 的环境(例如 Docker 容器)中,这可能会导致连接延迟。 若要将 IP 地址转换为主机名以用于邮件头,可以通过将属性设置为 来覆盖默认行为。lookup-hosttrueSpring中文文档

您还可以修改套接字和套接字工厂的属性。 有关详细信息,请参阅 SSL/TLS 支持。 如前所述,如果使用SSL,则可以进行此类修改。
在版本 4.2.2 之前,当使用非阻塞 I/O (NIO) 时,此序列化程序将超时(读取期间)视为文件结束,并且到目前为止读取的数据作为消息发出。 这是不可靠的,不应用于分隔消息。 它现在将此类条件视为例外。 万一您以这种方式使用它,您可以通过将构造函数参数设置为 来恢复以前的行为。treatTimeoutAsEndOfMessagetrue
仅当对等体受信任时,才应使用此解串器;由于内存不足的情况,它容易受到 DoS 连接的影响。
Jackson 无法在流中划分消息。 因此,需要委托给另一个序列化器或反序列化器来处理消息分隔。 默认情况下,使用 a,生成格式为 on the wire 的消息,但您可以将其配置为改用其他消息。 (下一个示例演示如何执行此操作。ObjectMapperMapJsonSerializerByteArrayLfSerializer<json><LF>
您还可以修改套接字和套接字工厂的属性。 有关详细信息,请参阅 SSL/TLS 支持。 如前所述,如果使用SSL,则可以进行此类修改。

自定义序列化程序和反序列化程序

如果您的数据不是某个标准反序列化程序支持的格式,则可以实现自己的格式;还可以实现自定义序列化程序。Spring中文文档

若要实现自定义序列化程序和反序列化程序对,请实现 and 接口。org.springframework.core.serializer.Deserializerorg.springframework.core.serializer.SerializerSpring中文文档

当解串器检测到消息之间的封闭输入流时,它必须抛出一个 ;这是向框架发出的信号,表明收盘价为“正常”。 如果在解码消息时关闭了流,则应抛出其他一些异常。SoftEndOfStreamExceptionSpring中文文档

从版本 5.2 开始,现在是 而不是扩展 .SoftEndOfStreamExceptionRuntimeExceptionIOExceptionSpring中文文档

TCP 缓存客户端连接工厂

如前所述,TCP 套接字可以是“一次性”(一个请求或响应)或共享的。 在大容量环境中,共享套接字在出站网关中表现不佳,因为套接字一次只能处理一个请求或响应。Spring中文文档

若要提高性能,可以使用协作通道适配器而不是网关,但这需要应用程序级消息关联。 有关详细信息,请参阅 TCP 消息关联Spring中文文档

Spring Integration 2.2 引入了一个缓存客户端连接工厂,它使用共享套接字池,允许网关使用共享连接池处理多个并发请求。Spring中文文档

TCP 故障转移客户端连接工厂

您可以配置支持故障转移到一个或多个其他服务器的 TCP 连接工厂。 发送消息时,工厂会循环访问其所有配置的工厂,直到可以发送消息或找不到连接。 最初,使用配置列表中的第一个工厂。 如果连接随后失败,则下一个工厂将成为当前工厂。 以下示例演示如何配置故障转移客户端连接工厂:Spring中文文档

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>
使用故障转移连接工厂时,该属性必须在工厂本身与其配置为使用的工厂列表之间保持一致。singleUse

当与共享连接一起使用时,连接工厂具有两个与故障回复相关的属性 ():singleUse=falseSpring中文文档

根据上述配置,请考虑以下方案: 假设无法建立连接,但可以。 当该方法在经过后被调用时,我们将再次尝试使用 ;如果成功,将关闭与的连接。 如果是,“旧”连接将保持打开状态,如果第一个工厂再次发生故障,将来可能会重复使用。clientFactory1clientFactory2failCFgetConnection()refreshSharedIntervalclientFactory1clientFactory2closeOnRefreshfalseSpring中文文档

设置为仅在该时间到期后尝试与第一家工厂重新连接;如果只想在当前连接失败时故障回复到第一个出厂,请将其设置为(默认)。refreshSharedIntervalLong.MAX_VALUESpring中文文档

设置为在刷新后关闭“旧”连接实际上会创建一个新连接。closeOnRefreshSpring中文文档

如果任何委托工厂是 a,则这些属性不适用,因为连接缓存是在那里处理的;在这种情况下,将始终查阅连接工厂列表以获取连接。CachingClientConnectionFactory

从版本 5.3 开始,这些默认为 和 因此,出厂时仅在当前连接失败时尝试故障回复。 若要恢复到以前版本的默认行为,请将其设置为 和 。Long.MAX_VALUEtrue0falseSpring中文文档

使用故障转移连接工厂时,该属性必须在工厂本身与其配置为使用的工厂列表之间保持一致。singleUse
如果任何委托工厂是 a,则这些属性不适用,因为连接缓存是在那里处理的;在这种情况下,将始终查阅连接工厂列表以获取连接。CachingClientConnectionFactory

TCP 线程关联连接工厂

Spring Integration 5.0 版引入了此连接工厂。 它将连接绑定到调用线程,并且每次该线程发送消息时都会重复使用相同的连接。 这将一直持续到连接关闭(由服务器或网络关闭)或直到线程调用该方法。 连接本身由另一个客户端工厂实现提供,该实现必须配置为提供非共享(一次性)连接,以便每个线程都获得连接。releaseConnection()Spring中文文档

以下示例演示如何配置 TCP 线程关联连接工厂:Spring中文文档

@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}