对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
TCP 消息关联
网关
网关会自动关联消息。 但是,您应该对相对较少的应用程序使用出站网关。 当您将连接工厂配置为对所有消息对使用单个共享连接 ('single-use=“false”') 时,一次只能处理一条消息。 新消息必须等待,直到收到对上一条消息的回复。 当为每个新消息配置连接工厂以使用新连接 ('single-use=“true”') 时,此限制不适用。 虽然此设置可以提供比共享连接环境更高的吞吐量,但它会带来为每个消息对打开和关闭新连接的开销。
因此,对于高容量消息,请考虑使用一对协作的通道适配器。 但是,为此,您需要提供协作逻辑。
Spring Integration 2.2 中引入的另一种解决方案是使用 ,它允许使用共享连接池。CachingClientConnectionFactory
协作出站和入站通道适配器
为了实现高容量吞吐量(如前所述,避免使用网关的陷阱),您可以配置一对协作的出站和入站通道适配器。 您还可以使用协作适配器(服务器端或客户端)进行完全异步的通信(而不是使用请求-答复语义)。 在服务器端,消息关联由适配器自动处理,因为入站适配器添加了一个 Headers,允许出站适配器在发送回复消息时确定要使用的连接。
在服务器端,您必须填充标头,因为它用于将消息关联到连接。
源自入站适配器的消息会自动设置 Header。
如果您希望构造要发送的其他消息,则需要设置 header。
您可以从传入消息中获取标头值。ip_connectionId |
在客户端,应用程序必须根据需要提供自己的关联逻辑。 您可以通过多种方式执行此操作。
如果消息有效负载具有一些自然关联数据(例如交易 ID 或订单号),并且您不需要保留原始出站消息中的任何信息(例如回复通道标头),则关联很简单,并且在任何情况下都会在应用程序级别完成。
如果消息负载具有一些自然关联数据(例如事务 ID 或订单号),但您需要保留原始出站消息中的一些信息(例如回复通道标头),则可以保留原始出站消息的副本(可能通过使用发布-订阅通道)并使用聚合器重新组合必要的数据。
对于前两种情况中的任何一种,如果有效负载没有自然关联数据,则可以在出站通道适配器的上游提供一个转换器,以使用此类数据增强有效负载。 此类转换器可以将原始有效负载转换为包含原始有效负载和消息标头的某些子集的新对象。 当然,来自 Headers 的 Live Objects(例如回复通道)不能包含在转换后的有效负载中。
如果选择这样的策略,则需要确保连接工厂具有适当的序列化器-反序列化器对来处理此类有效负载(例如和 ,它们使用 java 序列化,或自定义序列化器和反序列化器)。
TCP Connection Factories 中提到的选项(包括默认选项)不支持此类有效负载,除非转换后的有效负载是 或 。DefaultSerializer
DefaultDeserializer
ByteArray*Serializer
ByteArrayCrLfSerializer
String
byte[]
在 2.2 发行版之前,当协作通道适配器使用客户端连接工厂时,该属性默认为默认的回复超时(10 秒)。
这意味着,如果入站适配器在这段时间内没有收到任何数据,则套接字将关闭。 此默认行为在真正的异步环境中不适用,因此它现在默认为无限超时。
您可以通过将 Client 端连接工厂上的属性设置为 10000 毫秒来恢复以前的默认行为。 |
从版本 5.4 开始,多个出站通道适配器和一个可以共享同一个连接工厂。
这允许应用程序同时支持请求/回复和任意服务器→客户端消息收发。
有关更多信息,请参阅 TCP 网关。TcpInboundChannelAdapter
传输标头
TCP 是一种流协议。 并在流中划分消息。
在 3.0 之前,只能通过 TCP 传输消息负载 ( 或 )。
从 3.0 开始,您可以传输选定的标头以及有效负载。
但是,“活动”对象(如 header)不能序列化。Serializers
Deserializers
String
byte[]
replyChannel
通过 TCP 发送标头信息需要一些额外的配置。
第一步是提供 a ,该 使用 该属性。
此映射器委托给任何实现,以将消息与某个对象相互转换,这些对象可由配置的 和 .ConnectionFactory
MessageConvertingTcpMessageMapper
mapper
MessageConverter
serializer
deserializer
Spring 集成提供了一个 ,它允许指定添加到对象的 Headers 列表以及有效负载。
生成的 Map 有两个条目: 和 .
该条目本身是 a 并包含所选的标头。MapMessageConverter
Map
payload
headers
headers
Map
第二步是提供一个 serializer 和一个 deserializer,它们可以在 a 和一些 wire 格式之间进行转换。
这可以是自定义的 或 ,如果对等系统不是 Spring 集成应用程序,您通常需要它。Map
Serializer
Deserializer
Spring 集成提供了一个将 a 转换为 JSON 和从 JSON 转换的方法。
它使用 Spring Integration 。
如果需要,您可以提供自定义。
默认情况下,序列化器在对象之间插入换行符 () 字符。
有关更多信息,请参阅 Javadoc。MapJsonSerializer
Map
JsonObjectMapper
JsonObjectMapper
0x0a
它使用 Classpath 上的任何版本。JsonObjectMapper Jackson |
您还可以使用 和 来使用 的标准 Java 序列化 。Map
DefaultSerializer
DefaultDeserializer
以下示例显示了使用 JSON 传输 、 和 标头的连接工厂的配置:correlationId
sequenceNumber
sequenceSize
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>
<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
使用上述配置发送的有效负载为 'something' 的消息将出现在网络上,如下所示:
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}