TCP 和 UDP 支持
TCP 和 UDP 支持
Spring 集成提供了通道适配器,用于通过 Internet 协议接收和发送消息。 提供 UDP (用户数据报协议) 和 TCP (传输控制协议) 适配器。 每个适配器都提供通过底层协议的单向通信。 此外, Spring 集成提供了简单的入站和出站 TCP 网关。 当需要双向通信时,会使用这些选项。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-ip:6.0.9"
介绍
分别提供了两种风格的 UDP 入站和出站通道适配器:
-
UnicastSendingMessageHandler
将数据报数据包发送到单个目标。 -
UnicastReceivingChannelAdapter
接收传入的数据报数据包。 -
MulticastSendingMessageHandler
将 (广播) 数据报数据包发送到多播地址。 -
MulticastReceivingChannelAdapter
通过加入多播地址来接收传入的数据报数据包。
提供了 TCP 入站和出站通道适配器:
-
TcpSendingMessageHandler
通过 TCP 发送消息。 -
TcpReceivingChannelAdapter
通过 TCP 接收消息。
提供入站 TCP 网关。 它允许简单的请求-响应处理。 虽然网关可以支持任意数量的连接,但每个连接只能按顺序处理。 从套接字读取的线程在再次读取之前等待并发送响应。 如果连接出厂配置为一次性连接,则连接将在套接字超时后关闭。
提供出站 TCP 网关。 它允许简单的请求-响应处理。 如果为一次性连接配置了关联的连接工厂,则会立即为每个新请求创建一个新连接。 否则,如果连接正在使用中,则调用线程将阻塞连接,直到收到响应或发生超时或 I/O 错误。
TCP 和 UDP 入站通道适配器以及 TCP 入站网关支持该属性。
这提供了与进入GatewayProxyFactoryBean
中描述的相同的基本功能。error-channel
UDP 适配器
本节介绍如何配置和使用 UDP 适配器。
出站 UDP 适配器 (XML 配置)
以下示例配置 UDP 出站通道适配器:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
socket-customizer="udpCustomizer"
channel="exampleChannel"/>
设置为 时,还应在 host 属性中提供多播地址。multicast true |
UDP 是一种高效但不可靠的协议。
Spring 集成添加了两个属性来提高可靠性:和 。
设置为 时,适配器在消息数据前面有一个 length 字段(按网络字节顺序排列的四个字节)。
这使接收方能够验证收到的数据包的长度。
如果接收系统使用的缓冲区太短而无法包含数据包,则数据包可能会被截断。
标头提供了一种检测此情况的机制。check-length
acknowledge
check-length
true
length
从版本 4.3 开始,您可以将 设置为 ,在这种情况下,操作系统会选择端口。
可以通过在适配器启动后调用 来发现所选端口,并返回 。port
0
getPort()
isListening()
true
从版本 5.3.3 开始,您可以添加 bean 以在创建后对其进行修改(例如, call )。SocketCustomizer
DatagramSocket
setTrafficClass(0x10)
以下示例显示了一个出站通道适配器,该适配器向数据报数据包添加了长度检查:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
check-length="true"
channel="exampleChannel"/>
数据包的接收方还必须配置为预期长度在实际数据之前。
对于 Spring 集成 UDP 入站通道适配器,请设置其属性。check-length |
第二个可靠性改进允许使用应用程序级确认协议。 接收方必须在指定时间内向发送方发送确认。
以下示例显示了一个出站通道适配器,该适配器向数据报数据包添加长度检查并等待确认:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
check-length="true"
acknowledge="true"
ack-host="thishost"
ack-port="22222"
ack-timeout="10000"
channel="exampleChannel"/>
设置为 表示数据包的接收方可以解释添加到包含确认数据(主机和端口)的数据包的报头。
最有可能的是,接收者是 Spring 集成入站通道适配器。acknowledge true |
当 multicast 为 true 时,附加属性 () 指定必须在 .min-acks-for-success ack-timeout |
从版本 4.3 开始,您可以将 设置为 ,在这种情况下,操作系统会选择端口。ackPort
0
出站 UDP 适配器 (Java 配置)
以下示例演示如何使用 Java 配置出站 UDP 适配器:
@Bean
@ServiceActivator(inputChannel = "udpOut")
public UnicastSendingMessageHandler handler() {
return new UnicastSendingMessageHandler("localhost", 11111);
}
(或用于多播)。MulticastSendingChannelAdapter
出站 UDP 适配器 (Java DSL 配置)
以下示例说明如何使用 Java DSL 配置出站 UDP 适配器:
@Bean
public IntegrationFlow udpOutFlow() {
return f -> f.handle(Udp.outboundAdapter("localhost", 1234)
.configureSocket(socket -> socket.setTrafficClass(0x10)))
.get();
}
入站 UDP 适配器 (XML 配置)
以下示例说明如何配置基本的单播入站 udp 通道适配器。
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="false"
socket-customizer="udpCustomizer"
check-length="true"/>
以下示例说明如何配置基本的多播入站 udp 通道适配器:
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="true"
multicast-address="225.6.7.8"
check-length="true"/>
默认情况下,不对入站数据包执行反向 DNS 查找:在未配置 DNS 的环境中(例如 Docker 容器),这可能会导致连接延迟。
要将 IP 地址转换为主机名以用于邮件报头,可以通过将属性设置为 来覆盖默认行为。lookup-host
true
从版本 5.3.3 开始,您可以添加 bean 以在创建后对其进行修改。
它被调用用于接收套接字和为发送 ack 创建的任何套接字。SocketCustomizer
DatagramSocket
入站 UDP 适配器 (Java 配置)
以下示例显示如何使用 Java 配置入站 UDP 适配器:
@Bean
public UnicastReceivingChannelAdapter udpIn() {
UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
adapter.setOutputChannelName("udpChannel");
return adapter;
}
以下示例显示了如何使用 Java DSL 配置入站 UDP 适配器:
入站 UDP 适配器 (Java DSL 配置)
@Bean
public IntegrationFlow udpIn() {
return IntegrationFlow.from(Udp.inboundAdapter(11111))
.channel("udpChannel")
.get();
}
服务器侦听事件
从版本 5.0.2 开始,当入站适配器启动并开始侦听时,会发出 a。
当适配器配置为侦听 port 时,这很有用,这意味着操作系统选择端口。
如果您需要在启动将连接到套接字的其他进程之前等待,也可以使用它代替 polling 。UdpServerListeningEvent
0
isListening()
高级出站配置
() 有 和 选项。<int-ip:udp-outbound-channel-adapter>
UnicastSendingMessageHandler
destination-expression
socket-expression
您可以使用 hardcoded - 对的运行时替代方案来确定传出数据报数据包的目标地址(使用评估上下文的根对象)。
表达式的计算结果必须为 URI 样式的 、 a (请参阅 RFC-2396) 或 a 。
您还可以将此表达式使用 inbound 标头。
在框架中,当我们在 中接收数据报并将其转换为消息时,它会填充此标头。
标头值正是传入数据报的结果。destination-expression
host
port
requestMessage
URI
String
SocketAddress
IpHeaders.PACKET_ADDRESS
DatagramPacketMessageMapper
UnicastReceivingChannelAdapter
DatagramPacket.getSocketAddress()
使用 ,出站通道适配器可以使用(例如)入站通道适配器套接字通过接收数据的同一端口发送数据报。
当我们的应用程序作为 UDP 服务器工作,而客户端在网络地址转换 (NAT) 后面运行时,它非常有用。
此表达式的计算结果必须为 。
该用作评估上下文的根对象。
不能将参数与 和 参数一起使用。
以下示例说明如何使用转换为大写并使用套接字的转换器配置 UDP 入站通道适配器:socket-expression
DatagramSocket
requestMessage
socket-expression
multicast
acknowledge
<int-ip:udp-inbound-channel-adapter id="inbound" port="0" channel="in" />
<int:channel id="in" />
<int:transformer expression="new String(payload).toUpperCase()"
input-channel="in" output-channel="out"/>
<int:channel id="out" />
<int-ip:udp-outbound-channel-adapter id="outbound"
socket-expression="@inbound.socket"
destination-expression="headers['ip_packetAddress']"
channel="out" />
以下示例显示了 Java DSL 的等效配置:
@Bean
public IntegrationFlow udpEchoUpcaseServer() {
return IntegrationFlow.from(Udp.inboundAdapter(11111).id("udpIn"))
.<byte[], String>transform(p -> new String(p).toUpperCase())
.handle(Udp.outboundAdapter("headers['ip_packetAddress']")
.socketExpression("@udpIn.socket"))
.get();
}
TCP 连接工厂
概述
对于 TCP,底层连接的配置是使用连接工厂提供的。 提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。 客户端连接工厂建立传出连接。 服务器连接工厂侦听传入连接。
出站通道适配器使用客户端连接工厂,但您也可以提供对入站通道适配器的客户端连接工厂的引用。 该适配器接收在出站适配器创建的连接上收到的任何传入消息。
入站通道适配器或网关使用服务器连接工厂。 (事实上,没有一个,连接工厂就无法运行)。 您还可以提供对出站适配器的服务器连接工厂的引用。 然后,您可以使用该适配器向同一连接上的传入邮件发送回复。
仅当回复包含连接工厂插入到原始消息中的标头时,才会将回复消息路由到连接。ip_connectionId |
这是在入站和出站适配器之间共享连接工厂时执行的消息关联的范围。 这种共享允许通过 TCP 进行异步双向通信。 默认情况下,仅使用 TCP 传输负载信息。 因此,任何消息关联都必须由下游组件(如聚合器或其他终端节点)执行。 版本 3.0 中引入了对传输所选标头的支持。 有关更多信息,请参阅 TCP 消息关联。 |
您可以为每种类型的最多一个适配器提供对连接工厂的引用。
Spring Integration 提供了使用 和 的连接工厂。java.net.Socket
java.nio.channel.SocketChannel
以下示例显示了一个使用 connections 的简单 Server Connection Factory:java.net.Socket
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"/>
以下示例显示了一个使用 connections 的简单 Server Connection Factory:java.nio.channel.SocketChannel
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
using-nio="true"/>
从 Spring Integration 版本 4.2 开始,如果将服务器配置为侦听随机端口(通过将 port 设置为 ),则可以使用 OS 获取 OS 选择的实际端口。
此外,还可以获取完整的 .
有关更多信息,请参见 TcpServerConnectionFactory 接口的 Javadoc。0 getPort() getServerSocketAddress() SocketAddress |
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"/>
以下示例显示了一个客户端连接工厂,该工厂使用 connections 并为每条消息创建一个新连接:java.net.Socket
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"
using-nio=true/>
从版本 5.2 开始,客户端连接工厂支持以秒为单位指定的属性,默认为 60。connectTimeout
消息划分(序列化器和反序列化器)
TCP 是一种流协议。 这意味着必须为通过 TCP 传输的数据提供一些结构,以便接收方可以将数据划分为离散的消息。 连接工厂配置为使用序列化器和反序列化器在消息有效负载和通过 TCP 发送的位之间进行转换。 这是通过分别为入站和出站消息提供反序列化器和序列化器来实现的。 Spring 集成提供了许多标准的序列化器和反序列化器。
ByteArrayCrlfSerializer
*将字节数组转换为字节流,后跟回车符和换行符 ()。
这是默认的序列化程序(和反序列化程序),例如,可以与 telnet 一起用作 client。\r\n
它将字节数组转换为字节流,后跟一个终止字符(默认值为 )。ByteArraySingleTerminatorSerializer
*0x00
它将字节数组转换为字节流,后跟单个换行符 ()。ByteArrayLfSerializer
*0x0a
它将字节数组转换为前面有 STX () 后跟 ETX () 的字节流。ByteArrayStxEtxSerializer
*0x02
0x03
它将字节数组转换为字节流,前面是按网络字节顺序(大端)排列的二进制长度。
这是一个高效的反序列化器,因为它不必解析每个字节来查找终止字符序列。
它还可用于包含二进制数据的有效负载。
前面的序列化程序仅支持有效负载中的文本。
长度标头的默认大小为 4 个字节(一个整数),允许消息最大为 (2^31 - 1) 字节。
但是,报头可以是单个字节(无符号),用于最大 255 字节的消息,也可以是无符号短(2 字节),用于最大 (2^16 - 1) 字节的消息。
如果你需要任何其他格式的 header,你可以子类化并为 and 方法提供实现。
绝对最大数据大小为 (2^31 - 1) 字节。
从版本 5.2 开始,除了有效负载之外,标头值还可以包括 标头的长度。
设置该属性以启用该机制(对于生产者和使用者,必须将其设置为相同)。ByteArrayLengthHeaderSerializer
length
ByteArrayLengthHeaderSerializer
readHeader
writeHeader
inclusive
, 将字节数组转换为字节流,并且不添加其他消息划分数据。
使用此序列化程序(和反序列化程序),消息的结尾由 Client 端按顺序关闭套接字来指示。
使用此序列化程序时,消息接收将挂起,直到客户端关闭套接字或发生超时。
超时不会导致消息。
当使用此序列化器并且 Client 端是 Spring Integration 应用程序时,Client 端必须使用配置了 .
这样做会导致适配器在发送消息后关闭套接字。
序列化程序本身不会关闭连接。
你应该只将此序列化器与通道适配器(而不是网关)使用的连接工厂一起使用,并且连接工厂应该由入站或出站适配器使用,但不能同时由两者使用。
另请参阅本节后面的 。
但是,从版本 5.2 开始,出站网关具有一个新属性;这允许使用 Raw serializers/deserializers,因为 EOF 被发送到服务器,同时保持连接打开以接收回复。ByteArrayRawSerializer
*single-use="true"
ByteArrayElasticRawDeserializer
closeStreamAfterSend
在 4.2.2 版本之前,当使用非阻塞 I/O (NIO) 时,此序列化器将超时(读取期间)视为文件结束,并且到目前为止读取的数据作为消息发出。
这是不可靠的,不应用于分隔消息。
现在,它将此类条件视为异常。
万一您以这种方式使用它,您可以通过将 constructor 参数设置为 来恢复以前的行为。treatTimeoutAsEndOfMessage true |
它们中的每一个都是 的子类,它同时实现 和 。
为了向后兼容,使用任何子类进行序列化的连接也接受首先转换为字节数组的 a。
这些序列化器和反序列化器中的每一个都将包含相应格式的 input 流转换为字节数组有效负载。AbstractByteArraySerializer
org.springframework.core.serializer.Serializer
org.springframework.core.serializer.Deserializer
AbstractByteArraySerializer
String
为了避免由于行为不良的 Client 端(不遵守已配置的序列化程序的协议)而导致的内存耗尽,这些序列化程序会施加最大消息大小。
如果传入消息超过此大小,则会引发异常。
默认最大消息大小为 2048 字节。
您可以通过设置属性来增加它。
如果使用默认序列化器或反序列化器并希望增加最大消息大小,则必须将最大消息大小声明为具有属性集的显式 Bean,并将连接工厂配置为使用该 Bean。maxMessageSize
maxMessageSize
本节前面标记的类使用中间缓冲区,并将解码后的数据复制到正确大小的最终缓冲区。
从版本 4.3 开始,你可以通过设置一个属性来配置这些缓冲区,以允许这些原始缓冲区被重用,而不是为每个消息分配和丢弃,这是默认行为。
将该属性设置为负值将创建一个没有边界的池。
如果池是有界的,您还可以设置属性 (以毫秒为单位),之后如果没有缓冲区可用,则会引发异常。
它默认为无穷大。
此类异常会导致套接字关闭。*poolSize
poolWaitTimeout
如果你想在自定义反序列化器中使用相同的机制,你可以扩展(而不是它的超类)和实现而不是.
缓冲区会自动返回到池中。 还提供了一种方便的 Utility 方法: 。AbstractPooledBufferByteArraySerializer
AbstractByteArraySerializer
doDeserialize()
deserialize()
AbstractPooledBufferByteArraySerializer
copyToSizedArray()
版本 5.0 添加了 .
这与上面的 deserializer 端类似,只是没有必要设置 .
在内部,它使用 a 让缓冲区根据需要增长。
客户端必须有序地关闭套接字以发出消息结束的信号。ByteArrayElasticRawDeserializer
ByteArrayRawSerializer
maxMessageSize
ByteArrayOutputStream
仅当 peer 受信任时,才应使用此反序列化器;由于内存不足情况,它容易受到 DoS 连接的影响。 |
它使用 Jackson 在 a 和 JSON 之间进行转换。
您可以将此序列化程序与 a 和 a 结合使用,以 JSON 格式传输选定的标头和负载。MapJsonSerializer
ObjectMapper
Map
MessageConvertingTcpMessageMapper
MapMessageConverter
Jackson 无法在流中划分消息。
因此,需要委托给另一个序列化程序或反序列化程序来处理消息划分。
默认情况下,使用 a,从而生成格式为 on the wire 的消息,但您可以将其配置为使用其他消息。
(下一个示例演示如何执行此操作。ObjectMapper MapJsonSerializer ByteArrayLfSerializer <json><LF> |
最终的标准序列化程序是 ,您可以使用它来转换具有 Java 序列化的可序列化对象。 用于包含可序列化对象的流的入站反序列化。org.springframework.core.serializer.DefaultSerializer
org.springframework.core.serializer.DefaultDeserializer
如果你不想使用默认的序列化器和反序列化器 (),则必须在连接工厂上设置 and 属性。
以下示例显示了如何执行此操作:ByteArrayCrLfSerializer
serializer
deserializer
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer" />
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
一个服务器连接工厂,它使用连接并在网络上使用 Java 序列化。java.net.Socket
有关连接工厂上可用属性的完整详细信息,请参阅本节末尾的参考。
默认情况下,不对入站数据包执行反向 DNS 查找:在未配置 DNS 的环境中(例如 Docker 容器),这可能会导致连接延迟。
要将 IP 地址转换为主机名以用于邮件报头,可以通过将属性设置为 来覆盖默认行为。lookup-host
true
您还可以修改 sockets 和 socket factories 的属性。 有关更多信息,请参阅 SSL/TLS 支持。 如前所述,如果使用 SSL,则进行此类修改是可能的。 |
自定义序列化器和反序列化器
如果您的数据不是标准反序列化器之一支持的格式,则可以实现自己的格式;您还可以实现自定义序列化程序。
要实现自定义序列化器和反序列化器对,请实现 and 接口。org.springframework.core.serializer.Deserializer
org.springframework.core.serializer.Serializer
当反序列化器检测到消息之间关闭的输入流时,它必须抛出一个 ;这是向框架发出的信号,表明关闭是 “正常 ”的。
如果在解码消息时关闭了流,则应改为引发其他异常。SoftEndOfStreamException
从版本 5.2 开始,现在是 a 而不是扩展 。SoftEndOfStreamException
RuntimeException
IOException
TCP 缓存客户端连接工厂
如前所述,TCP 套接字可以是“一次性”(一个请求或响应)或共享的。 共享套接字在大容量环境中与出站网关一起性能不佳,因为套接字一次只能处理一个请求或响应。
为了提高性能,您可以使用协作通道适配器而不是网关,但这需要应用程序级别的消息关联。 有关更多信息,请参阅 TCP 消息关联。
Spring Integration 2.2 引入了一个缓存客户端连接工厂,它使用共享套接字池,允许网关使用共享连接池处理多个并发请求。
TCP 故障转移客户端连接工厂
您可以配置支持故障转移到一个或多个其他服务器的 TCP 连接工厂。 发送消息时,工厂会遍历其所有已配置的工厂,直到可以发送消息或找不到连接。 最初,使用已配置列表中的第一个 factory。 如果连接随后失败,则下一个工厂将成为当前工厂。 以下示例说明如何配置故障转移客户端连接工厂:
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
<constructor-arg>
<list>
<ref bean="clientFactory1"/>
<ref bean="clientFactory2"/>
</list>
</constructor-arg>
</bean>
使用故障转移连接工厂时,工厂本身与其配置为使用的工厂列表之间的属性必须一致。singleUse |
当与共享连接 () 一起使用时,连接工厂有两个与故障回复相关的属性:singleUse=false
-
refreshSharedInterval
-
closeOnRefresh
根据上述配置,考虑以下场景:
假设 cannot establish a connection,但可以。
当该方法在 the 通过后调用时,我们将再次尝试使用 ;如果成功,将关闭 到 的连接。
如果为 is ,则“旧”连接将保持打开状态,如果第一个工厂再次失败,将来可能会重新使用。clientFactory1
clientFactory2
failCF
getConnection()
refreshSharedInterval
clientFactory1
clientFactory2
closeOnRefresh
false
设置为仅在该时间到期后尝试重新连接第一个工厂;如果您只想在当前连接失败时故障回复到第一个工厂,请将其设置为 (default)。refreshSharedInterval
Long.MAX_VALUE
设置为在刷新后关闭 “旧” 连接,实际会创建新连接。closeOnRefresh
如果任何委托工厂是 a,则这些属性不适用,因为连接缓存是在那里处理的;在这种情况下,将始终查阅 Connection Factory 列表以获取 CONNECTION。CachingClientConnectionFactory |
从版本 5.3 开始,这些默认为,因此工厂仅在当前连接失败时尝试故障恢复。
要恢复到以前版本的默认行为,请将它们设置为 和 。Long.MAX_VALUE
true
0
false
另请参阅 测试连接。
TCP 线程关联连接工厂
Spring 集成版本 5.0 引入了这个连接工厂。
它将连接绑定到调用线程,并且每次该线程发送消息时都会重复使用相同的连接。
这种情况一直持续到连接关闭(由服务器或网络)或直到线程调用该方法。
连接本身由另一个 Client 端工厂实现提供,该实现必须配置为提供非共享(一次性)连接,以便每个线程都获得一个连接。releaseConnection()
以下示例说明如何配置 TCP 线程关联连接工厂:
@Bean
public TcpNetClientConnectionFactory cf() {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
Integer.parseInt(System.getProperty(PORT)));
cf.setSingleUse(true);
return cf;
}
@Bean
public ThreadAffinityClientConnectionFactory tacf() {
return new ThreadAffinityClientConnectionFactory(cf());
}
@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(tacf());
outGate.setReplyChannelName("toString");
return outGate;
}
测试连接
在某些情况下,在首次打开连接时发送某种运行状况检查请求可能很有用。 一种情况可能是在使用 TCP 故障转移客户端连接工厂时,这样,如果所选服务器允许打开连接,但报告连接不正常,我们可以进行故障转移。
为了支持此功能,请将 a 添加到 Client 端连接工厂。connectionTest
/**
* Set a {@link Predicate} that will be invoked to test a new connection; return true
* to accept the connection, false the reject.
* @param connectionTest the predicate.
* @since 5.3
*/
public void setConnectionTest(@Nullable Predicate<TcpConnectionSupport> connectionTest) {
this.connectionTest = connectionTest;
}
要测试连接,请将临时侦听器附加到测试中的连接。 如果测试失败,则连接将关闭并引发异常。 当与 TCP 故障转移客户端连接工厂一起使用时,这会触发尝试下一个服务器。
只有来自服务器的第一个回复才会发送到测试侦听器。 |
在以下示例中,如果服务器在我们发送 .PONG
PING
Message<String> ping = new GenericMessage<>("PING");
byte[] pong = "PONG".getBytes();
clientFactory.setConnectionTest(conn -> {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean result = new AtomicBoolean();
conn.registerTestListener(msg -> {
if (Arrays.equals(pong, (byte[]) msg.getPayload())) {
result.set(true);
}
latch.countDown();
return false;
});
conn.send(ping);
try {
latch.await(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return result.get();
});
TCP 连接拦截器
您可以使用对 .
您可以使用侦听器向连接添加行为,例如协商、安全性和其他选项。
框架目前没有提供拦截器,但有关示例,请参阅源存储库中的 InterceptedSharedConnectionTests
。TcpConnectionInterceptorFactoryChain
测试用例中使用的 The used 工作原理如下:HelloWorldInterceptor
拦截器首先使用 Client 端连接工厂进行配置。 当第一条消息通过被拦截的连接发送时,拦截器通过该连接发送 'Hello' 并期望接收 'world!'。 发生这种情况时,协商完成并发送原始消息。 此外,使用同一连接的消息无需任何其他协商即可发送。
当配置了服务器连接工厂时,拦截器要求第一条消息是 'Hello',如果是,则返回 'world!'。 否则,它将引发导致连接关闭的异常。
所有方法都被拦截。
拦截器实例由拦截器工厂为每个连接创建。
如果拦截器是有状态的,则工厂应为每个连接创建一个新实例。
如果没有状态,则同一个拦截器可以包装每个连接。
拦截器工厂被添加到拦截器工厂链的配置中,你可以通过设置属性将其提供给连接工厂。
拦截器必须扩展 .
工厂必须实现接口。 具有直通方法。
通过扩展这个类,你只需要实现你想要拦截的那些方法。TcpConnection
interceptor-factory
TcpConnectionInterceptorSupport
TcpConnectionInterceptorFactory
TcpConnectionInterceptorSupport
下面的示例展示了如何配置连接拦截器工厂链:
<bean id="helloWorldInterceptorFactory"
class="o.s.i.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
<property name="interceptors">
<array>
<bean class="o.s.i.ip.tcp.connection.HelloWorldInterceptorFactory"/>
</array>
</property>
</bean>
<int-ip:tcp-connection-factory id="server"
type="server"
port="12345"
using-nio="true"
single-use="true"
interceptor-factory-chain="helloWorldInterceptorFactory"/>
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
single-use="true"
so-timeout="100000"
using-nio="true"
interceptor-factory-chain="helloWorldInterceptorFactory"/>
TCP 连接事件
从版本 3.0 开始,对实例的更改由实例报告。 是 的子类,因此可以由 中定义的任何 — 例如,事件入站通道适配器接收。TcpConnection
TcpConnectionEvent
TcpConnectionEvent
ApplicationEvent
ApplicationListener
ApplicationContext
TcpConnectionEvents
具有以下属性:
-
connectionId
:连接标识符,您可以在消息标头中使用该标识符将数据发送到连接。 -
connectionFactoryName
:连接所属的连接工厂的 Bean 名称。 -
throwable
:(仅适用于事件)。Throwable
TcpConnectionExceptionEvent
-
source
:这。 例如,您可以使用它来确定远程 IP 地址 (cast required)。TcpConnection
getHostAddress()
此外,从 4.0 版本开始,TCP Connection Factories 中讨论的标准反序列化器现在在解码数据流时遇到问题时会发出实例。
这些事件包含异常、正在构建的缓冲区,以及在发生异常的点进入缓冲区的偏移量(如果可用)。
应用程序可以使用正常或(参见接收 Spring 应用程序事件)来捕获这些事件,从而允许分析问题。TcpDeserializationExceptionEvent
ApplicationListener
ApplicationEventListeningMessageProducer
从版本 4.0.7 和 4.1.3 开始,每当服务器套接字上发生意外异常时(例如,当服务器套接字正在使用时),都会发布实例。
这些事件引用了连接工厂和原因。TcpConnectionServerExceptionEvent
BindException
从版本 4.2 开始,每当端点(入站网关或协作出站通道适配器)收到由于 Headers 无效而无法路由到连接的消息时,都会发布实例。
出站网关也会在收到延迟回复(发送方线程已超时)时发布此事件。
该事件包含连接 ID 以及属性中的异常,其中包含失败的消息。TcpConnectionFailedCorrelationEvent
ip_connectionId
cause
从版本 4.3 开始,当启动服务器连接工厂时,会发出 a。
当工厂配置为侦听 port 时,这很有用,这意味着操作系统选择端口。
如果您需要在启动连接到套接字的其他进程之前等待,也可以使用它代替 polling 。TcpConnectionServerListeningEvent
0
isListening()
为了避免延迟侦听线程接受连接,该事件将在单独的线程上发布。 |
从版本 4.3.2 开始,每当无法创建 Client 端连接时,都会发出 a。
事件的来源是连接工厂,您可以使用它来确定无法建立连接的主机和端口。TcpConnectionFailedEvent
TCP 适配器
提供了使用前面提到的连接工厂的 TCP 入站和出站通道适配器。
这些适配器具有两个相关属性:和 .
该属性指示要使用哪个连接工厂来管理适配器的连接。
该属性指定消息到达出站适配器的通道以及入站适配器放置消息的通道。
虽然入站和出站适配器可以共享一个连接工厂,但服务器连接工厂始终由入站适配器“拥有”。
客户端连接工厂始终由出站适配器“拥有”。
每种类型只有一个适配器可以获得对连接工厂的引用。
以下示例说明如何定义客户端和服务器 TCP 连接工厂:connection-factory
channel
connection-factory
channel
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer"/>
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer"/>
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"
using-nio="true"
single-use="true"/>
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="#{server.port}"
single-use="true"
so-timeout="10000"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
<int:channel id="input" />
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-ip:tcp-outbound-channel-adapter id="outboundClient"
channel="input"
connection-factory="client"/>
<int-ip:tcp-inbound-channel-adapter id="inboundClient"
channel="replies"
connection-factory="client"/>
<int-ip:tcp-inbound-channel-adapter id="inboundServer"
channel="loop"
connection-factory="server"/>
<int-ip:tcp-outbound-channel-adapter id="outboundServer"
channel="loop"
connection-factory="server"/>
<int:channel id="loop"/>
在前面的配置中,到达通道的消息通过连接工厂创建的连接进行序列化,在服务器上接收,并放置在通道上。
由于 是 的输入通道,因此消息通过同一连接环回,由 接收并存入通道中。
Java 序列化用于网络。input
client
loop
loop
outboundServer
inboundClient
replies
通常,入站适配器使用连接工厂,该工厂侦听传入的连接请求。
在某些情况下,您可能希望反向建立连接,以便入站适配器连接到外部服务器,然后等待该连接上的入站消息。type="server"
入站适配器上的设置支持此拓扑。
在这种情况下,连接工厂必须是 type 并且必须设置为 。client-mode="true"
client
single-use
false
两个附加属性支持此机制。
它指定(以毫秒为单位)框架在连接失败后尝试重新连接的频率。
提供 a 来计划连接尝试并测试连接是否仍处于活动状态。retry-interval
scheduler
TaskScheduler
如果您不提供调度程序,则使用框架的默认taskScheduler Bean。
对于出站适配器,通常在发送第一条消息时建立连接。
在出站适配器上会导致在启动适配器时建立连接。
默认情况下,适配器会自动启动。
同样,连接工厂必须是类型并且具有 。
A 和 也受支持。
如果连接失败,则调度程序或在发送下一条消息时重新建立连接。client-mode="true"
client
single-use="false"
retry-interval
scheduler
对于入站和出站,如果适配器已启动,则可以通过发送命令来强制适配器建立连接: 。
然后,您可以使用 检查当前状态。<control-bus />
@adapter_id.retryConnection()
@adapter_id.isClientModeConnected()
TCP 网关
入站 TCP 网关和出站 TCP 网关分别使用服务器和客户端连接工厂。
每个连接一次可以处理一个请求或响应。TcpInboundGateway
TcpOutboundGateway
入站网关在构造包含传入负载的消息并将其发送到 后,等待响应,并通过将响应消息写入连接来发送响应消息中的负载。requestChannel
对于入站网关,您必须保留或填充标头,因为它用于将消息与连接相关联。
源自网关的消息会自动设置标头。
如果回复是作为新消息构建的,则需要设置 header。
可以从传入消息中捕获标头值。ip_connectionId |
与入站适配器一样,入站网关通常使用连接工厂,该工厂侦听传入的连接请求。
在某些情况下,您可能希望反向建立连接,以便入站网关连接到外部服务器,然后等待并回复该连接上的入站邮件。type="server"
在入站网关上使用支持此拓扑。
在这种情况下,连接工厂必须是 type 并且必须设置为 。client-mode="true"
client
single-use
false
两个附加属性支持此机制。
它指定 (以毫秒为单位) 框架在连接失败后尝试重新连接的频率。
提供 a 来计划连接尝试并测试连接是否仍处于活动状态。retry-interval
scheduler
TaskScheduler
如果网关已启动,则可以通过发送命令来强制网关建立连接,并使用 检查当前状态。<control-bus/>
@adapter_id.retryConnection()
@adapter_id.isClientModeConnected()
出站网关在通过连接发送消息后,等待响应,构建响应消息,并将其放在回复通道上。 通过连接进行的通信是单线程的。 一次只能处理一条消息。 如果另一个线程在收到当前响应之前尝试发送消息,它将阻塞,直到任何先前的请求完成(或超时)。 但是,如果将客户端连接工厂配置为一次性连接,则每个新请求都会获得自己的连接并立即得到处理。 以下示例配置入站 TCP 网关:
<int-ip:tcp-inbound-gateway id="inGateway"
request-channel="tcpChannel"
reply-channel="replyChannel"
connection-factory="cfServer"
reply-timeout="10000"/>
如果使用配置了默认序列化器或反序列化器的连接工厂,则消息是分隔数据,并且网关可以由简单的客户端(如 telnet)使用。\r\n
以下示例显示了出站 TCP 网关:
<int-ip:tcp-outbound-gateway id="outGateway"
request-channel="tcpChannel"
reply-channel="replyChannel"
connection-factory="cfClient"
request-timeout="10000"
remote-timeout="10000"/> <!-- or e.g. remote-timeout-expression="headers['timeout']" -->
目前,出站网关 不可用。client-mode
从版本 5.2 开始,可以使用属性 .
如果连接工厂配置为(每个请求/回复的新连接),则网关将关闭输出流;这会向服务器发出 EOF 信号。
如果服务器使用 EOF 来确定消息的结尾,而不是流中的某个分隔符,但保持连接打开以接收回复,则这非常有用。closeStreamAfterSend
single-use
通常,调用线程将在网关中阻塞,等待回复(或超时)。
从版本 5.3 开始,您可以在网关上设置该属性,并释放发送线程以执行其他工作。
回复(或错误)将在接收线程上发送。
这仅适用于使用 时,它在使用 NIO 时被忽略,因为存在争用条件,即在收到回复后发生的套接字错误可以在回复之前传递给网关。async
TcpNetClientConnectionFactory
使用共享连接 () 时,当另一个请求正在处理时,新请求将被阻止,直到收到当前回复。
如果您希望在长期连接池上支持并发请求,请考虑使用 。singleUse=false CachingClientConnectionFactory |
从版本 5.4 开始,可以使用 .
未经请求的入站消息以及延迟回复(客户端超时)将发送到此通道。
要在服务器端支持此功能,您现在可以向连接工厂注册多个 s。
网关和通道适配器会自动注册自身。
从服务器发送未经请求的邮件时,必须将 appropriate 添加到发送的邮件中。unsolicitedMessageChannel
TcpSender
IpHeaders.CONNECTION_ID
TCP 消息关联
IP 端点的一个目标是提供与 Spring Integration 应用程序以外的系统的通信。 因此,默认情况下,仅发送和接收消息负载。 从 3.0 开始,您可以使用 JSON、Java 序列化或自定义序列化程序和反序列化程序来传输标头。 有关更多信息,请参阅 传输标头 。 框架(使用网关时除外)或服务器端的协作通道适配器不提供消息关联。在本文档的后面,我们将讨论应用程序可用的各种关联技术。 在大多数情况下,这需要消息的特定应用程序级关联,即使消息负载包含一些自然关联数据(例如订单号)也是如此。
网关
网关会自动关联消息。 但是,您应该对相对较少的应用程序使用出站网关。 当您将连接工厂配置为对所有消息对使用单个共享连接 ('single-use=“false”') 时,一次只能处理一条消息。 新消息必须等待,直到收到对上一条消息的回复。 当为每个新消息配置连接工厂以使用新连接 ('single-use=“true”') 时,此限制不适用。 虽然此设置可以提供比共享连接环境更高的吞吐量,但它会带来为每个消息对打开和关闭新连接的开销。
因此,对于高容量消息,请考虑使用一对协作的通道适配器。 但是,为此,您需要提供协作逻辑。
Spring Integration 2.2 中引入的另一种解决方案是使用 ,它允许使用共享连接池。CachingClientConnectionFactory
协作出站和入站通道适配器
为了实现高容量吞吐量(如前所述,避免使用网关的陷阱),您可以配置一对协作的出站和入站通道适配器。 您还可以使用协作适配器(服务器端或客户端)进行完全异步的通信(而不是使用请求-答复语义)。 在服务器端,消息关联由适配器自动处理,因为入站适配器添加了一个 Headers,允许出站适配器在发送回复消息时确定要使用的连接。
在服务器端,您必须填充标头,因为它用于将消息关联到连接。
源自入站适配器的消息会自动设置 Header。
如果您希望构造要发送的其他消息,则需要设置 header。
您可以从传入消息中获取标头值。ip_connectionId |
在客户端,应用程序必须根据需要提供自己的关联逻辑。 您可以通过多种方式执行此操作。
如果消息有效负载具有一些自然关联数据(例如交易 ID 或订单号),并且您不需要保留原始出站消息中的任何信息(例如回复通道标头),则关联很简单,并且在任何情况下都会在应用程序级别完成。
如果消息负载具有一些自然关联数据(例如事务 ID 或订单号),但您需要保留原始出站消息中的一些信息(例如回复通道标头),则可以保留原始出站消息的副本(可能通过使用发布-订阅通道)并使用聚合器重新组合必要的数据。
对于前两种情况中的任何一种,如果有效负载没有自然关联数据,则可以在出站通道适配器的上游提供一个转换器,以使用此类数据增强有效负载。 此类转换器可以将原始有效负载转换为包含原始有效负载和消息标头的某些子集的新对象。 当然,来自 Headers 的 Live Objects(例如回复通道)不能包含在转换后的有效负载中。
如果选择这样的策略,则需要确保连接工厂具有适当的序列化器-反序列化器对来处理此类有效负载(例如和 ,它们使用 java 序列化,或自定义序列化器和反序列化器)。
TCP Connection Factories 中提到的选项(包括默认选项)不支持此类有效负载,除非转换后的有效负载是 或 。DefaultSerializer
DefaultDeserializer
ByteArray*Serializer
ByteArrayCrLfSerializer
String
byte[]
在 2.2 发行版之前,当协作通道适配器使用客户端连接工厂时,该属性默认为默认的回复超时(10 秒)。
这意味着,如果入站适配器在这段时间内没有收到任何数据,则套接字将关闭。 此默认行为在真正的异步环境中不适用,因此它现在默认为无限超时。
您可以通过将 Client 端连接工厂上的属性设置为 10000 毫秒来恢复以前的默认行为。 |
从版本 5.4 开始,多个出站通道适配器和一个可以共享同一个连接工厂。
这允许应用程序同时支持请求/回复和任意服务器→客户端消息收发。
有关更多信息,请参阅 TCP 网关。TcpInboundChannelAdapter
传输标头
TCP 是一种流协议。 并在流中划分消息。
在 3.0 之前,只能通过 TCP 传输消息负载 ( 或 )。
从 3.0 开始,您可以传输选定的标头以及有效负载。
但是,“活动”对象(如 header)不能序列化。Serializers
Deserializers
String
byte[]
replyChannel
通过 TCP 发送标头信息需要一些额外的配置。
第一步是提供 a ,该 使用 该属性。
此映射器委托给任何实现,以将消息与某个对象相互转换,这些对象可由配置的 和 .ConnectionFactory
MessageConvertingTcpMessageMapper
mapper
MessageConverter
serializer
deserializer
Spring 集成提供了一个 ,它允许指定添加到对象的 Headers 列表以及有效负载。
生成的 Map 有两个条目: 和 .
该条目本身是 a 并包含所选的标头。MapMessageConverter
Map
payload
headers
headers
Map
第二步是提供一个 serializer 和一个 deserializer,它们可以在 a 和一些 wire 格式之间进行转换。
这可以是自定义的 或 ,如果对等系统不是 Spring 集成应用程序,您通常需要它。Map
Serializer
Deserializer
Spring 集成提供了一个将 a 转换为 JSON 和从 JSON 转换的方法。
它使用 Spring Integration 。
如果需要,您可以提供自定义。
默认情况下,序列化器在对象之间插入换行符 () 字符。
有关更多信息,请参阅 Javadoc。MapJsonSerializer
Map
JsonObjectMapper
JsonObjectMapper
0x0a
它使用 Classpath 上的任何版本。JsonObjectMapper Jackson |
您还可以使用 和 来使用 的标准 Java 序列化 。Map
DefaultSerializer
DefaultDeserializer
以下示例显示了使用 JSON 传输 、 和 标头的连接工厂的配置:correlationId
sequenceNumber
sequenceSize
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>
<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
使用上述配置发送的有效负载为 'something' 的消息将出现在网络上,如下所示:
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}
关于非阻塞 I/O (NIO)
使用 NIO (参见 IP 配置属性) 可避免将线程专用于从每个套接字读取。
对于少量的 sockets,你可能会发现不使用 NIO 以及异步切换(例如 to a )的性能与使用 NIO 一样好,甚至更好。using-nio
QueueChannel
在处理大量连接时,应考虑使用 NIO。 但是,使用 NIO 还有其他一些影响。 线程池(在 task executor 中)在所有套接字之间共享。 每个传入消息都被组装起来,并作为从该池中选择的线程上的单独工作单元发送到配置的通道。 到达同一套接字的两条 Sequential 消息可能由不同的线程处理。 这意味着消息发送到通道的顺序是不确定的。 不维护到达套接字的消息的严格顺序。
对于某些应用程序,这不是问题。
对其他人来说,这是一个问题。
如果需要严格的排序,请考虑设置并使用异步切换。using-nio
false
或者,您可以在入站终端节点的下游插入一个重新排序器,以将消息返回到正确的顺序。
如果在连接工厂上设置为 ,则到达 TCP 连接的消息将设置 and headers。
resequencer 使用这些 Headers 将消息返回到正确的 Sequence。apply-sequence
true
sequenceNumber
correlationId
从版本 5.1.4 开始,优先接受新连接而不是从现有连接中读取。
通常,除非您的新传入连接率非常高,否则这应该不会产生什么影响。
如果您希望恢复到以前给予读取优先权的行为,请将 property on 设置为 。multiAccept TcpNioServerConnectionFactory false |
池大小
不再使用 pool size 属性。
以前,当未指定 task-executor 时,它指定了默认线程池的大小。
它还用于设置服务器套接字上的连接积压。
不再需要第一个函数(请参阅下一段)。
第二个函数将替换为 attribute。backlog
以前,当在 NIO 中使用固定线程池任务执行程序(这是默认的)时,可能会出现死锁,并且处理将停止。 当缓冲区已满时,出现问题,从套接字读取的线程尝试向缓冲区添加更多数据,并且没有线程可用于在缓冲区中腾出空间。 这仅发生在非常小的泳池规模下,但在极端条件下也可能发生。 从 2.2 开始,两个更改消除了这个问题。 首先,默认任务执行程序是缓存的线程池执行程序。 其次,添加了死锁检测逻辑,以便在发生线程匮乏时,将引发异常,而不是死锁,从而释放死锁资源。
现在,默认任务执行程序是无限的,如果消息处理需要较长时间,则传入消息的速率较高时可能会出现内存不足情况。 如果您的应用程序表现出此类行为,则应使用具有适当池大小的池化任务执行程序,但请参阅下一节。 |
具有策略的线程池任务执行程序CALLER_RUNS
当您使用带有 ( when using the namespace) 且队列容量较小时,您应该记住一些重要的注意事项。CallerRunsPolicy
CALLER_RUNS
<task/>
如果您不使用固定线程池,则以下内容不适用。
使用 NIO 连接时,有三种不同的任务类型。 I/O 选择器处理在一个专用线程上执行(检测事件、接受新连接以及使用任务执行程序将 I/O 读取操作分派给其他线程)。 当 I/O 读取器线程(读取操作被调度到该线程)读取数据时,它会移交给另一个线程来组装传入的消息。 大型邮件可能需要多次读取才能完成。 这些 “assembler” 线程在等待数据时可能会阻塞。 当新的读取事件发生时,读取器会确定此套接字是否已经具有汇编器,如果没有,则运行新的汇编器。 组装过程完成后,汇编程序线程将返回到池中。
当池耗尽、拒绝策略正在使用并且任务队列已满时,这可能会导致死锁。
当池为空且队列中没有空间时,IO 选择器线程将接收事件并使用执行程序调度读取。
队列已满,因此 selector 线程本身会启动读取进程。
现在,它检测到此套接字没有汇编器,并在执行读取之前触发汇编器。
同样,队列已满,选择器线程成为汇编器。
汇编器现在被阻塞,等待读取数据,这永远不会发生。
连接工厂现在已死锁,因为 selector 线程无法处理新事件。CALLER_RUNS
OP_READ
为了避免这种死锁,我们必须避免 selector(或 reader)线程执行汇编任务。 我们希望为 IO 和汇编操作使用单独的池。
该框架提供了一个 ,它允许配置两个不同的执行程序:一个用于执行 IO 操作,另一个用于消息组装。
在此环境中,IO 线程永远不会成为汇编程序线程,并且不会发生死锁。CompositeExecutor
此外,应将任务执行程序配置为使用 ( when using )。
当 I/O 任务无法完成时,它会延迟一小段时间并不断重试,直到可以完成并分配一个汇编程序。
默认情况下,延迟为 100 毫秒,但您可以通过在连接工厂上设置属性来更改它(使用 XML 命名空间进行配置时)。AbortPolicy
ABORT
<task>
readDelay
read-delay
以下三个示例显示了如何配置复合执行程序:
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>
SSL/TLS 支持
支持安全套接字层/传输层安全性。
使用蔚来时,使用 JDK 5+ 功能处理建立连接后的握手。
不使用 NIO 时,使用 standard 和 objects 创建连接。
提供了许多策略接口以允许进行重大自定义。
这些接口的默认实现提供了开始使用安全通信的最简单方法。SSLEngine
SSLSocketFactory
SSLServerSocketFactory
开始
无论您是否使用 NIO,都需要在连接工厂上配置该属性。
此属性引用 <bean/> 定义,该定义描述所需密钥库的位置和密码。ssl-context-support
SSL/TLS 对等体每个都需要两个密钥库:
-
包含私有密钥和公钥对的密钥库,用于标识对等节点
-
一个信任库,其中包含受信任的对等节点的公钥。 请参阅 JDK 随附的实用程序的文档。 基本步骤是
keytool
-
创建新的密钥对并将其存储在密钥库中。
-
导出公钥。
-
将公钥导入到对等节点的信任库中。
-
对另一个对等体重复上述步骤。
-
在测试用例中,在两个 Peer 节点上使用相同的密钥存储是很常见的,但在 production 中应避免这种情况。 |
建立密钥存储之后,下一步是向 Bean 指示它们的位置,并向连接工厂提供对该 Bean 的引用。TcpSSLContextSupport
以下示例配置 SSL 连接:
<bean id="sslContextSupport"
class="o.sf.integration.ip.tcp.connection.support.DefaultTcpSSLContextSupport">
<constructor-arg value="client.ks"/>
<constructor-arg value="client.truststore.ks"/>
<constructor-arg value="secret"/>
<constructor-arg value="secret"/>
</bean>
<ip:tcp-connection-factory id="clientFactory"
type="client"
host="localhost"
port="1234"
ssl-context-support="sslContextSupport" />
该类还有一个可选属性,可以是 or(默认值)。DefaultTcpSSLContextSupport
protocol
SSL
TLS
密钥库文件名(前两个构造函数参数)使用 Spring 抽象。
默认情况下,文件位于 Classpath 上,但您可以使用前缀覆盖它(以查找文件系统上的文件)。Resource
file:
从版本 4.3.6 开始,当您使用 NIO 时,您可以在连接工厂上指定 (以秒为单位)。
此超时(默认值为 30 秒)在 SSL 握手期间等待数据时使用。
如果超过超时,则停止进程并关闭套接字。ssl-handshake-timeout
房东验证
从版本 5.0.8 开始,您可以配置是否启用主机验证。 从版本 5.1 开始,它默认处于启用状态;禁用它的机制取决于您是否使用 NIO。
主机验证用于确保您连接到的服务器与证书中的信息匹配,即使证书受信任也是如此。
使用蔚来时,例如配置 。DefaultTcpNioSSLConnectionSupport
@Bean
public DefaultTcpNioSSLConnectionSupport connectionSupport() {
DefaultTcpSSLContextSupport sslContextSupport = new DefaultTcpSSLContextSupport("test.ks",
"test.truststore.ks", "secret", "secret");
sslContextSupport.setProtocol("SSL");
DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport =
new DefaultTcpNioSSLConnectionSupport(sslContextSupport, false);
return tcpNioConnectionSupport;
}
第二个 constructor 参数禁用主机验证。
然后将 bean 注入到 NIO 连接工厂中。connectionSupport
不使用 NIO 时,配置位于 :TcpSocketSupport
connectionFactory.setTcpSocketSupport(new DefaultTcpSocketSupport(false));
同样,constructor 参数禁用主机验证。
高级技术
本节介绍在某些情况下可能会有用的高级技术。
策略接口
在许多情况下,前面描述的配置就是通过 TCP/IP 启用安全通信所需的全部内容。 但是, Spring 集成提供了许多策略接口,以允许自定义和修改套接字工厂和套接字:
-
TcpSSLContextSupport
-
TcpSocketFactorySupport
-
TcpSocketSupport
-
TcpNetConnectionSupport
-
TcpNioConnectionSupport
策略界面TcpSSLContextSupport
下面的清单显示了 strategy 界面:TcpSSLContextSupport
public interface TcpSSLContextSupport {
SSLContext getSSLContext() throws Exception;
}
接口的实现负责创建一个对象。
框架提供的实现是前面描述的 。
如果需要不同的行为,请实现此接口,并为连接工厂提供对类实现的 bean 的引用。TcpSSLContextSupport
SSLContext
DefaultTcpSSLContextSupport
策略界面TcpSocketFactorySupport
下面的清单显示了 strategy 接口的定义:TcpSocketFactorySupport
public interface TcpSocketFactorySupport {
ServerSocketFactory getServerSocketFactory();
SocketFactory getSocketFactory();
}
此接口的实现负责获取对 和 的引用。
提供了两种实现。
第一个用于非 SSL 套接字(未定义属性时)。
这将使用 JDK 的默认工厂。
第二个实现是 。
默认情况下,在定义属性时使用此选项。
它使用该 bean 创建的来创建套接字工厂。ServerSocketFactory
SocketFactory
DefaultTcpNetSocketFactorySupport
ssl-context-support
DefaultTcpNetSSLSocketFactorySupport
ssl-context-support
SSLContext
此接口仅在 is 时适用。
NIO 不使用 socket 工厂。using-nio false |
策略界面TcpSocketSupport
下面的清单显示了 strategy 接口的定义:TcpSocketSupport
public interface TcpSocketSupport {
void postProcessServerSocket(ServerSocket serverSocket);
void postProcessSocket(Socket socket);
}
此接口的实现可以在创建套接字之后以及应用所有配置的属性之后但在使用套接字之前修改套接字。
无论您是否使用 NIO,这都适用。
例如,您可以使用此接口的实现来修改 SSL 套接字上支持的密码套件,也可以添加在 SSL 握手完成后收到通知的侦听器。
框架提供的唯一实现是 ,它不会以任何方式修改套接字。DefaultTcpSocketSupport
要提供您自己的 or 实现,请分别通过设置 和 属性为连接工厂提供对自定义类型的 bean 的引用。TcpSocketFactorySupport
TcpSocketSupport
socket-factory-support
socket-support
策略界面TcpNetConnectionSupport
下面的清单显示了 strategy 接口的定义:TcpNetConnectionSupport
public interface TcpNetConnectionSupport {
TcpNetConnection createNewConnection(Socket socket,
boolean server, boolean lookupHost,
ApplicationEventPublisher applicationEventPublisher,
String connectionFactoryName) throws Exception;
}
调用此接口以创建 type (或其 subclasses) 的对象。
框架提供了一个 implementation (),默认情况下,它创建简单的对象。
它有两个属性: 和 .
启用后推后,实现将返回一个子类,该子类将连接的 .
与默认值一致,缓冲区大小默认为 1。
这允许反序列化器将 “unread” (push back) 字节放入流中。
下面的简单示例显示了如何在委托 deserializer 中使用它,该 deserializer “peeks” 第一个字节以确定要调用哪个 deserializer:TcpNetConnection
DefaultTcpNetConnectionSupport
TcpNetConnection
pushbackCapable
pushbackBufferSize
InputStream
PushbackInputStream
PushbackInputStream
public class CompositeDeserializer implements Deserializer<byte[]> {
private final ByteArrayStxEtxSerializer stxEtx = new ByteArrayStxEtxSerializer();
private final ByteArrayCrLfSerializer crlf = new ByteArrayCrLfSerializer();
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
PushbackInputStream pbis = (PushbackInputStream) inputStream;
int first = pbis.read();
if (first < 0) {
throw new SoftEndOfStreamException();
}
pbis.unread(first);
if (first == ByteArrayStxEtxSerializer.STX) {
this.receivedStxEtx = true;
return this.stxEtx.deserialize(pbis);
}
else {
this.receivedCrLf = true;
return this.crlf.deserialize(pbis);
}
}
}
策略界面TcpNioConnectionSupport
下面的清单显示了 strategy 接口的定义:TcpNioConnectionSupport
public interface TcpNioConnectionSupport {
TcpNioConnection createNewConnection(SocketChannel socketChannel,
boolean server, boolean lookupHost,
ApplicationEventPublisher applicationEventPublisher,
String connectionFactoryName) throws Exception;
}
调用此接口以创建对象(或子类中的对象)。
Spring 集成提供两种实现:和 。
使用哪一个取决于是否正在使用 SSL。
一个常见的用例是 subclass 和 override 。
请参阅 SSL 客户端身份验证示例。
与 一样,这些实现也支持回推。TcpNioConnection
DefaultTcpNioSSLConnectionSupport
DefaultTcpNioConnectionSupport
DefaultTcpNioSSLConnectionSupport
postProcessSSLEngine
DefaultTcpNetConnectionSupport
示例:启用 SSL 客户端身份验证
要在使用 SSL 时启用客户端证书身份验证,该技术取决于您是否使用 NIO。
当您不 NIO 时,请提供自定义实现以对服务器套接字进行后处理:TcpSocketSupport
serverFactory.setTcpSocketSupport(new DefaultTcpSocketSupport() {
@Override
public void postProcessServerSocket(ServerSocket serverSocket) {
((SSLServerSocket) serverSocket).setNeedClientAuth(true);
}
});
(使用 XML 配置时,通过设置属性来提供对 Bean 的引用)。socket-support
当您使用 NIO 时,请提供自定义实现来对 进行后处理,如下例所示:TcpNioSslConnectionSupport
SSLEngine
@Bean
public DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport() {
return new DefaultTcpNioSSLConnectionSupport(serverSslContextSupport) {
@Override
protected void postProcessSSLEngine(SSLEngine sslEngine) {
sslEngine.setNeedClientAuth(true);
}
}
}
@Bean
public TcpNioServerConnectionFactory server() {
...
serverFactory.setTcpNioConnectionSupport(tcpNioConnectionSupport());
...
}
(当您使用 XML 配置时,从版本 4.3.7 开始,通过设置属性来提供对 bean 的引用)。nio-connection-support
IP 配置属性
下表描述了您可以设置以配置 IP 连接的属性:
属性名称 | 客户? | 服务器? | 允许的值 | 属性描述 |
---|---|---|---|---|
|
Y |
Y |
客户端, 服务器 |
确定连接工厂是客户机还是服务器。 |
|
Y |
N |
目标的主机名或 IP 地址。 |
|
|
Y |
Y |
端口。 |
|
|
Y |
Y |
用于序列化有效负载的实现。
默认为 |
|
|
Y |
Y |
用于反序列化有效负载的实现。
默认为 |
|
|
Y |
Y |
|
连接是否使用 NIO。
有关更多信息,请参阅软件包。
请参见关于非阻塞 I/O (NIO)。
违约:。 |
|
Y |
N |
|
使用 NIO 时,连接是否使用直接缓冲区。
有关更多信息,请参阅文档。
如果为 ,则必须为 。 |
|
Y |
Y |
|
使用 NIO 时,可能需要对消息进行重新排序。
当此属性设置为 时,标头将添加到收到的消息中。
请参见关于非阻塞 I/O (NIO)。
违约:。 |
|
Y |
Y |
默认为 (infinity),但具有 .
在这种情况下,它默认为默认回复超时(10 秒)。 |
|
|
Y |
Y |
看。 |
|
|
Y |
Y |
看。 |
|
|
Y |
Y |
|
看。 |
|
Y |
Y |
设置为与提供的值一起使用。
看。 |
|
|
Y |
Y |
|
看。 |
|
Y |
Y |
看。 |
|
|
N |
Y |
在多宿主系统上,指定套接字绑定到的接口的 IP 地址。 |
|
|
Y |
Y |
指定要用于套接字处理的特定执行程序。
如果未提供,则使用内部缓存线程执行程序。
在某些需要使用特定任务执行程序的平台上需要,例如 . |
|
|
Y |
Y |
|
指定一个连接是否可用于多条消息。
如果 ,则每条消息都使用新连接。 |
|
N |
N |
此属性不再使用。
为了向后兼容,它设置了 backlog,但您应该使用它来指定服务器工厂中的连接 backlog。 |
|
|
N |
Y |
设置服务器工厂的连接积压。 |
|
|
Y |
Y |
|
指定是否对 IP 地址进行反向查找,以转换为主机名,以便在邮件报头中使用。
如果为 false,则改用 IP 地址。
违约:。 |
|
Y |
Y |
请参阅 TCP 连接侦听器。 |
|
|
Y |
Y |
||
|
Y |
Y |
||
|
Y |
Y |
请参阅 SSL/TLS 支持。 |
|
|
Y |
Y |
请参见高级技术。 |
|
|
Y |
Y |
长> 0 |
由于线程不足,在上一次尝试失败后重试读取之前的延迟(以毫秒为单位)。
默认值:100。
仅当 is 时适用。 |
下表描述了您可以设置以配置 UDP 入站通道适配器的属性:
属性名称 | 允许的值 | 属性描述 |
---|---|---|
|
适配器侦听的端口。 |
|
|
|
UDP 适配器是否使用多播。 |
|
当 multicast 为 true 时,适配器加入的多播地址。 |
|
|
指定可以同时处理的数据包数。 它仅在未配置 task-executor 时适用。 默认值:5。 |
|
task-executor |
指定要用于套接字处理的特定执行程序。
如果未提供,则使用内部池化执行程序。
在某些需要使用特定任务执行程序(如 .
有关线程要求,请参阅 pool-size。 |
|
|
用于接收 的缓冲区的大小。
通常设置为最大传输单元 (MTU) 大小。
如果使用的缓冲区小于发送的数据包的大小,则可能会发生截断。
您可以使用属性 .. |
|
|
|
UDP 适配器是否需要收到的数据包中的数据长度字段。 用于检测数据包截断。 |
|
有关更多信息,请参阅 中的方法。 |
|
|
用于 UDP 确认数据包。
有关更多信息,请参阅 中的 setSendBufferSize() 方法。 |
|
|
有关更多信息,请参阅。 |
|
|
在多宿主系统上,指定套接字绑定到的接口的 IP 地址。 |
|
|
如果下游组件引发异常,则包含异常和 failed 消息的消息将发送到此通道。 |
|
|
|
指定是否对 IP 地址进行反向查找,以转换为主机名,以便在邮件报头中使用。
如果 ,则改用 IP 地址。
违约:。 |
下表描述了您可以设置以配置 UDP 出站通道适配器的属性:
属性名称 | 允许的值 | 属性描述 |
---|---|---|
|
目标的主机名或 IP 地址。 对于多播 udp 适配器,多播地址。 |
|
|
目标上的端口。 |
|
|
|
UDP 适配器是否使用多播。 |
|
|
UDP 适配器是否需要来自目标的确认。
启用后,它需要设置以下四个属性: 、 、 和 。 |
|
When is 表示应将确认发送到的主机或 IP 地址。
通常是当前主机,但可能不同 — 例如,当使用网络地址转换 (NAT) 时。 |
|
|
When is ,指示应将确认发送到的端口。
适配器侦听此端口的确认。 |
|
|
When is 表示适配器等待确认的时间(以毫秒为单位)。
如果未及时收到确认,则适配器将引发异常。 |
|
|
默认值为 1。 对于多播适配器,您可以将其设置为更大的值,这需要来自多个目标的确认。 |
|
|
|
UDP 适配器是否在发送到目标的数据包中包含数据长度字段。 |
|
对于多播适配器,指定 的生存时间属性。
控制多播的范围。
有关更多信息,请参阅 Java API 文档。 |
|
|
有关更多信息,请参阅 setSoTimeout() 方法。 |
|
|
有关更多信息,请参阅 中的方法。 |
|
|
用于 UDP 确认数据包。
有关更多信息,请参阅 中的方法。 |
|
本地地址 |
在多宿主系统上,对于 UDP 适配器,指定套接字要发送到的接口的 IP 地址,以接收回复消息。 对于多播适配器,它还确定多播数据包通过哪个接口发送。 |
|
|
指定要用于确认处理的特定执行程序。
如果未提供,则使用内部单线程执行程序。
在某些需要使用特定任务执行程序的平台上需要,例如 .
一个线程专用于处理确认(如果选项为 true)。 |
|
|
SPEL 表达式 |
要评估的 SPEL 表达式,以确定将哪个表达式用作传出 UDP 数据包的目标地址。 |
|
SPEL 表达式 |
要评估的 SPEL 表达式,用于确定哪个数据报套接字用于发送传出的 UDP 数据包。 |
下表描述了您可以设置以配置 TCP 入站通道适配器的属性:
属性名称 | 允许的值 | 属性描述 |
---|---|---|
|
入站消息发送到的通道。 |
|
|
如果连接工厂的类型为 ,则该工厂由此适配器“拥有”。
如果它的 type 为 ,则它由出站通道适配器“拥有”,并且此适配器在出站适配器创建的连接上接收任何传入消息。 |
|
|
如果下游组件引发异常,则包含异常和失败消息的消息将发送到此通道。 |
|
|
|
When 时,入站适配器充当客户端,用于建立连接,然后在该连接上接收传入消息。
违约:。
另请参阅 和 。
连接工厂的类型必须为 ,并且已设置为 。 |
|
当 in 中时,指定在连接尝试之间或连接失败后等待的毫秒数。
默认值:60000(60 秒)。 |
|
|
|
指定用于管理连接的 。
如果未指定,则默认为全局 Spring 集成 Bean,其默认池大小为 10。
请参阅配置 Task Scheduler。 |
下表描述了您可以设置以配置 TCP 出站通道适配器的属性:
属性名称 | 允许的值 | 属性描述 |
---|---|---|
|
出站消息到达的通道。 |
|
|
如果连接工厂的类型为 ,则该工厂由此适配器“拥有”。
如果它的 type 为 ,则它由入站通道适配器“拥有”,并且此适配器尝试将消息与接收原始入站消息的连接相关联。 |
|
|
|
When 时,出站适配器会在启动后立即尝试建立连接。
When 时,在发送第一条消息时建立连接。
违约:。
另请参阅 和 。
连接工厂的类型必须为 ,并且已设置为 。 |
|
当 in 中时,指定在连接尝试之间或连接失败后等待的毫秒数。
默认值:60000(60 秒)。 |
|
|
|
指定用于管理连接的 。
如果未指定,则默认为全局 Spring 集成 Bean,其默认池大小为 10。
请参阅配置 Task Scheduler。 |
下表介绍了您可以设置以配置 TCP 入站网关的属性:
属性名称 | 允许的值 | 属性描述 |
---|---|---|
|
连接工厂的类型必须为 server。 |
|
|
传入消息发送到的通道。 |
|
|
回复消息可能到达的通道。 通常,回复到达添加到入站邮件头的临时回复通道。 |
|
|
网关等待回复的时间(以毫秒为单位)。 默认值:1000(1 秒)。 |
|
|
如果下游组件引发异常,则包含异常和失败消息的消息将发送到此通道。
然后,网关将作为响应返回来自该流的任何回复。 |
|
|
|
When 时,入站网关充当客户端,用于建立连接,然后接收(和回复)该连接上的传入邮件。
默认值:false。
另请参阅 和 。
连接工厂的类型必须为 ,并且已设置为 。 |
|
当 in 中时,指定在连接尝试之间或连接失败后等待的毫秒数。
默认值:60000(60 秒)。 |
|
|
|
指定用于管理连接的 。
如果未指定,则默认为全局 Spring 集成 Bean,其默认池大小为 10。
请参阅配置 Task Scheduler。 |
下表描述了您可以设置以配置 TCP 出站网关的属性:
属性名称 | 允许的值 | 属性描述 |
---|---|---|
|
连接工厂的类型必须为 。 |
|
|
传出消息到达的通道。 |
|
|
自选。 将回复消息发送到的通道。 |
|
|
网关等待远程系统回复的时间(以毫秒为单位)。
与 互斥。
默认值:10000(10 秒)。
注意:在 4.2 之前的版本中,此值默认为 (如果已设置)。 |
|
|
一个 SPEL 表达式,根据消息进行评估,以确定网关等待远程系统回复的时间(以毫秒为单位)。
与 互斥。 |
|
|
如果未使用一次性连接工厂,则为网关等待访问共享连接的时间(以毫秒为单位)。 |
|
|
网关在向 reply-channel 发送回复时等待的时间(以毫秒为单位)。 仅在 reply-channel 可能阻塞时适用(例如当前已满的有界 QueueChannel)。 |
|
|
发送后释放发送线程;回复(或错误)将在接收线程上发送。 |
|
|
向其发送未经请求的消息和延迟回复的通道。 |
IP 消息报头
此模块使用以下实例:MessageHeader
标头名称 | IpHeaders 常量 | 描述 |
---|---|---|
|
|
从中接收 TCP 消息或 UDP 数据包的主机名。
如果为 ,则包含 IP 地址。 |
|
|
从中接收 TCP 消息或 UDP 数据包的 IP 地址。 |
|
|
UDP 数据包的远程端口。 |
ip_localInetAddress |
|
套接字连接到的本地(自版本 4.2.5 起)。 |
|
|
UDP 应用程序级确认发送到的远程 IP 地址。 该框架在数据包中包含确认信息。 |
|
|
UDP 应用程序级确认的相关 ID。 该框架在数据包中包含确认信息。 |
|
|
TCP 连接的远程端口。 |
|
|
TCP 连接的唯一标识符。 由框架为入站消息设置。 发送到服务器端入站通道适配器或回复入站网关时,此标头是必需的,以便终端节点可以确定将消息发送到的连接。 |
|
|
仅供参考。 使用缓存或故障转移客户端连接工厂时,它包含实际的基础连接 ID。 |
|
|
入站邮件的可选内容类型
在此表后面进行了描述。
请注意,与其他 Headers 常量不同,此常量位于 class 中,而不是 class 中。 |
对于入站消息, , , , 和 默认映射 。
如果将 mapper 的 property 设置为 ,则 mapper 将设置 Headers (,默认情况下)。
您可以通过设置属性来更改默认值。
您可以通过子类化和覆盖方法来添加其他 Headers。
例如,当您使用 SSL 时,您可以通过从对象获取会话对象来添加 的属性,该对象作为方法的参数提供。ip_hostname
ip_address
ip_tcp_remotePort
ip_connectionId
TcpHeaderMapper
addContentTypeHeader
true
contentType
application/octet-stream;charset="UTF-8"
contentType
TcpHeaderMapper
supplyCustomHeaders
SSLSession
TcpConnection
supplyCustomHeaders
对于出站消息,有效负载将转换为使用默认 () 字符集。
设置该属性以更改默认值。String
byte[]
UTF-8
charset
在自定义映射器属性或子类化时,将映射器声明为 Bean,并使用该属性为连接工厂提供实例。mapper
基于注释的配置
示例存储库中的以下示例显示了使用注释而不是 XML 时可用的一些配置选项:
@EnableIntegration (1)
@IntegrationComponentScan (2)
@Configuration
public static class Config {
@Value(${some.port})
private int port;
@MessagingGateway(defaultRequestChannel="toTcp") (3)
public interface Gateway {
String viaTcp(String in);
}
@Bean
@ServiceActivator(inputChannel="toTcp") (4)
public MessageHandler tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
TcpOutboundGateway gate = new TcpOutboundGateway();
gate.setConnectionFactory(connectionFactory);
gate.setOutputChannelName("resultToString");
return gate;
}
@Bean (5)
public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory) {
TcpInboundGateway inGate = new TcpInboundGateway();
inGate.setConnectionFactory(connectionFactory);
inGate.setRequestChannel(fromTcp());
return inGate;
}
@Bean
public MessageChannel fromTcp() {
return new DirectChannel();
}
@MessageEndpoint
public static class Echo { (6)
@Transformer(inputChannel="fromTcp", outputChannel="toEcho")
public String convert(byte[] bytes) {
return new String(bytes);
}
@ServiceActivator(inputChannel="toEcho")
public String upCase(String in) {
return in.toUpperCase();
}
@Transformer(inputChannel="resultToString")
public String convertResult(byte[] bytes) {
return new String(bytes);
}
}
@Bean
public AbstractClientConnectionFactory clientCF() { (7)
return new TcpNetClientConnectionFactory("localhost", this.port);
}
@Bean
public AbstractServerConnectionFactory serverCF() { (8)
return new TcpNetServerConnectionFactory(this.port);
}
}
1 | 标准 Spring 集成注释,为集成应用程序启用基础结构。 |
2 | 搜索接口。@MessagingGateway |
3 | 流客户端的入口点。
调用应用程序可以用于此 bean 并调用其方法。@Autowired Gateway |
4 | 出站终端节点由 a 和包装它的使用者组成。
在此方案中,根据通道类型配置终结点。MessageHandler @ServiceActivator |
5 | 入站端点(在 TCP/UDP 模块中)都是消息驱动的,因此只需要声明为简单实例。@Bean |
6 | 此类提供了许多 POJO 方法,用于此示例流程(a 和服务器端和 a 客户端)。@Transformer @ServiceActivator @Transformer |
7 | 客户端连接工厂。 |
8 | 服务器端连接工厂。 |
将 Java DSL 用于 TCP 组件
对 TCP 组件的 DSL 支持包括适配器和网关的规范、用于创建连接工厂 Bean 的具有工厂方法的类,以及用于创建序列化器和反序列化器的具有工厂方法的类。
有关更多信息,请参阅他们的 javadocs。Tcp
TcpCodecs
以下是使用 DSL 通过 DSL 配置流的一些示例。
@Bean
public IntegrationFlow server() {
return IntegrationFlow.from(Tcp.inboundAdapter(Tcp.netServer(1234)
.deserializer(TcpCodecs.lengthHeader1())
.backlog(30))
.errorChannel("tcpIn.errorChannel")
.id("tcpIn"))
.transform(Transformers.objectToString())
.channel("tcpInbound")
.get();
}
@Bean
public IntegrationFlow client() {
return f -> f.handle(Tcp.outboundAdapter(Tcp.nioClient("localhost", 1234)
.serializer(TcpCodecs.lengthHeader1())));
}
@Bean
public IntegrationFlow server() {
return IntegrationFlow.from(Tcp.inboundGateway(Tcp.netServer(1234)
.deserializer(TcpCodecs.lengthHeader1())
.serializer(TcpCodecs.lengthHeader1())
.backlog(30))
.errorChannel("tcpIn.errorChannel")
.id("tcpIn"))
.transform(Transformers.objectToString())
.channel("tcpInbound")
.get();
}
@Bean
public IntegrationFlow client() {
return f -> f.handle(Tcp.outboundGateway(Tcp.nioClient("localhost", 1234)
.deserializer(TcpCodecs.lengthHeader1())
.serializer(TcpCodecs.lengthHeader1())));
}