对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
网关隐藏了 Spring 集成提供的消息传递 API。 它允许你的应用程序的业务逻辑不知道 Spring 集成 API。 通过使用通用 Gateway,您的代码仅与一个简单的接互。
输入GatewayProxyFactoryBean
如前所述,不依赖于 Spring 集成 API (包括 gateway 类)就太好了。
出于这个原因, Spring 集成提供了 ,它为任何接口生成代理,并在内部调用如下所示的网关方法。
通过使用依赖关系注入,您可以向业务方法公开接口。GatewayProxyFactoryBean
下面的示例展示了一个可用于与 Spring 集成交互的接口:
public interface Cafe {
void placeOrder(Order order);
}
Gateway XML Namespace 支持
还提供了 Namespace 支持。 它允许您将接口配置为服务,如下例所示:
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
定义了这个配置后,现在可以注入到其他 bean 中,并且在接口的代理实例上调用方法的代码不知道 Spring 集成 API。
请参阅 “Samples” 附录 以获取使用 元素的示例(在 Cafe 演示中)。cafeService
Cafe
gateway
上述配置中的默认值将应用于网关接口上的所有方法。 如果未指定回复超时,则调用线程将等待回复 30 秒。 请参阅没有响应到达时的网关行为。
可以覆盖单个方法的默认值。 请参阅带有注释和 XML 的网关配置。
设置默认回复通道
通常,您无需指定 ,因为 Gateway 会自动创建一个临时的匿名回复通道,并在其中侦听回复。
但是,在某些情况下,可能会提示您定义 (或使用适配器网关,例如 HTTP、JMS 等)。default-reply-channel
default-reply-channel
reply-channel
对于一些背景,我们简要讨论了 gateway 的一些内部工作原理。
网关创建一个临时的点对点回复通道。
它是匿名的,并添加到邮件报头中,名称为 。
当提供显式(使用远程适配器网关)时,你可以指向一个发布-订阅通道,之所以这样命名,是因为你可以向其添加多个订阅者。
在内部, Spring Integration 在临时和显式定义之间创建了一个桥梁。replyChannel
default-reply-channel
reply-channel
replyChannel
default-reply-channel
假设您希望您的回复不仅发送到网关,还发送到其他某个使用者。 在这种情况下,您需要两样东西:
-
您可以订阅的命名频道
-
该通道将成为 publish-subscribe-channel
网关使用的默认策略不能满足这些需求,因为添加到 Headers 的回复通道是匿名的和点对点的。
这意味着其他订户无法获得该消息的句柄,即使可以,该通道也具有点对点行为,因此只有一个订户可以获取该消息。
通过定义 a,您可以指向您选择的通道。
在本例中,这是一个 .
网关会创建一个桥,从它到存储在 header 中的临时匿名回复通道。default-reply-channel
publish-subscribe-channel
您可能还希望显式提供一个回复通道,以便通过侦听器(例如,wiretap)进行监视或审计。 要配置通道拦截器,您需要一个命名通道。
从版本 5.4 开始,当 gateway method return type 为 时,如果未明确提供此类 Headers,则框架会将 Headers 填充为 Bean 引用。
这允许丢弃来自下游流的任何可能的回复,从而满足单向网关协定。void replyChannel nullChannel |
从版本 5.4 开始,当 gateway method return type 为 时,如果未明确提供此类 Headers,则框架会将 Headers 填充为 Bean 引用。
这允许丢弃来自下游流的任何可能的回复,从而满足单向网关协定。void replyChannel nullChannel |
带有注释和 XML 的网关配置
请考虑以下示例,该示例通过添加 annotation 对前面的接口示例进行了扩展:Cafe
@Gateway
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
该注释允许您添加解释为消息标头的值,如下例所示:@Header
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
如果您更喜欢使用 XML 方法来配置网关方法,则可以将元素添加到网关配置中,如下例所示:method
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
您还可以使用 XML 为每个方法调用提供单独的标头。
如果要设置的 Headers 本质上是静态的,并且您不想通过使用 annotation 将它们嵌入到网关的方法签名中,这可能很有用。
例如,在贷款经纪人示例中,我们希望根据发起的请求类型(单个报价或所有报价)来影响贷款报价的聚合方式。
通过评估调用的网关方法来确定请求的类型,尽管可能,但会违反关注点分离范式(该方法是一个 Java 工件)。
但是,在消息标头中表达您的意图(元信息)在消息传递架构中是很自然的。
以下示例演示如何为两种方法中的每一种方法添加不同的消息标头:@Header
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
在前面的示例中,根据网关的方法为 'RESPONSE_TYPE' 标头设置了不同的值。
例如,如果指定 in 和 in 注释,则注释值优先。requestChannel <int:method/> @Gateway |
如果在 XML 中指定了无参数网关,并且接口方法同时具有 a 和 annotation(元素中带有 a 或 a),则忽略该值。@Payload @Gateway payloadExpression payload-expression <int:method/> @Payload |
表达式和 “global” 标头
元素 support 作为 .
计算 SPEL 表达式以确定 header 的值。
从版本 5.2 开始,评估上下文的对象是 with and 访问器。
例如,如果您希望对简单方法名称进行路由,则可以使用以下表达式添加标头:。<header/>
expression
value
#root
MethodArgsHolder
getMethod()
getArgs()
method.name
这是不可序列化的。
如果稍后序列化消息,则表达式为 的标头将丢失。
因此,您可能希望使用 OR 在这些情况下。
该方法提供方法的表示形式,包括参数和返回类型。java.reflect.Method method method.name method.toString() toString() String |
从版本 3.0 开始,可以定义元素以向网关生成的所有消息添加 Headers,而不管调用的方法如何。
为方法定义的特定标头优先于默认标头。
在此处为方法定义的特定标头将覆盖服务接口中的任何注释。
但是,默认标头不会覆盖服务接口中的任何注释。<default-header/>
@Header
@Header
网关现在还支持 ,该 API 适用于所有方法(除非被覆盖)。default-payload-expression
例如,如果指定 in 和 in 注释,则注释值优先。requestChannel <int:method/> @Gateway |
如果在 XML 中指定了无参数网关,并且接口方法同时具有 a 和 annotation(元素中带有 a 或 a),则忽略该值。@Payload @Gateway payloadExpression payload-expression <int:method/> @Payload |
这是不可序列化的。
如果稍后序列化消息,则表达式为 的标头将丢失。
因此,您可能希望使用 OR 在这些情况下。
该方法提供方法的表示形式,包括参数和返回类型。java.reflect.Method method method.name method.toString() toString() String |
将方法参数映射到消息
使用上一节中的配置技术可以控制如何将方法参数映射到消息元素(有效负载和 Headers)。 如果未使用显式配置,则使用某些约定来执行映射。 在某些情况下,这些约定无法确定哪个参数是有效负载,哪些参数应该映射到 headers。 请考虑以下示例:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是 a ),第二个参数的内容成为 headers。Map
在第二种情况下(或第一种情况,当 parameter 的参数为 a 时),框架无法确定哪个参数应该是有效负载。
因此,映射失败。
这通常可以使用 、 注释 或 注释 来解决。thing1
Map
payload-expression
@Payload
@Headers
或者(每当约定被打破时),您可以承担将方法调用映射到消息的全部责任。
为此,请实现 an 并使用 属性将其提供给 。
映射器映射 a ,这是一个包装实例的简单类,而 an 包含参数。
提供自定义映射器时,不允许在网关上使用属性和元素。
同样,任何元素上都不允许使用 attribute 和 elements。MethodArgsMessageMapper
<gateway/>
mapper
MethodArgsHolder
java.reflect.Method
Object[]
default-payload-expression
<default-header/>
payload-expression
<header/>
<method/>
映射方法参数
以下示例显示了如何将方法参数映射到消息,并显示了无效配置的一些示例:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
1 | 请注意,在此示例中,SPEL 变量 , 引用参数 — 在本例中为 的值。#this s |
XML 等效项看起来略有不同,因为 method 参数没有上下文。
但是,表达式可以通过使用根对象的属性来引用方法参数(有关更多信息,请参阅表达式和“全局”标头),如下例所示:#this
args
MethodArgsHolder
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>
1 | 请注意,在此示例中,SPEL 变量 , 引用参数 — 在本例中为 的值。#this s |
@MessagingGateway
注解
从版本 4.0 开始,网关服务接口可以使用注释进行标记,而不需要定义 xml 元素进行配置。
以下一对示例比较了配置同一网关的两种方法:@MessagingGateway
<gateway />
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
与 XML 版本类似,当 Spring 集成在组件扫描期间发现这些注释时,它会使用其消息传递基础结构创建实现。
要执行此扫描并在应用程序上下文中注册 ,请将注释添加到类中。
标准基础结构不处理接口。
因此,我们引入了自定义 logic 来查找接口上的 annotation 并为它们注册实例。
另请参阅 注释支持。proxy BeanDefinition @IntegrationComponentScan @Configuration @ComponentScan @IntegrationComponentScan @MessagingGateway GatewayProxyFactoryBean |
除了注释之外,您还可以使用注释标记服务接口,以避免创建 Bean(如果此类配置文件未处于活动状态)。@MessagingGateway
@Profile
从版本 6.0 开始,带有 the 的接口也可以用相应 configuration logic 的 Comments 来标记,就像任何 Spring 定义一样。@MessagingGateway
@Primary
@Component
从版本 6.0 开始,可以在标准 Spring 配置中使用接口。
这可以用作 or 手动 bean 定义的替代方法。@MessagingGateway
@Import
@IntegrationComponentScan
AnnotationGatewayProxyFactoryBean
该使用 since 版本进行元注释,并且该属性实质上是别名为 .
这样,网关代理的 bean 名称生成策略将与扫描和导入组件的标准 Spring 注释配置重新保持一致。
可以通过 an 或 as 属性全局覆盖 default。@MessagingGateway
@MessageEndpoint
6.0
name()
@Compnent.value()
AnnotationBeanNameGenerator
AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR
@IntegrationComponentScan.nameGenerator()
如果您没有 XML 配置,那么至少一个类都需要注释。
有关更多信息,请参阅配置和@EnableIntegration 。@EnableIntegration @Configuration |
与 XML 版本类似,当 Spring 集成在组件扫描期间发现这些注释时,它会使用其消息传递基础结构创建实现。
要执行此扫描并在应用程序上下文中注册 ,请将注释添加到类中。
标准基础结构不处理接口。
因此,我们引入了自定义 logic 来查找接口上的 annotation 并为它们注册实例。
另请参阅 注释支持。proxy BeanDefinition @IntegrationComponentScan @Configuration @ComponentScan @IntegrationComponentScan @MessagingGateway GatewayProxyFactoryBean |
如果您没有 XML 配置,那么至少一个类都需要注释。
有关更多信息,请参阅配置和@EnableIntegration 。@EnableIntegration @Configuration |
调用无参数方法
在没有任何参数的 Gateway 接口上调用方法时,默认行为是从 a 接收 a 。Message
PollableChannel
但是,有时您可能希望触发无参数方法,以便可以与下游不需要用户提供的参数的其他组件进行交互,例如触发无参数 SQL 调用或存储过程。
要实现发送和接收语义,您必须提供有效负载。
要生成有效负载,接口上的方法参数不是必需的。
您可以在元素上使用 XML 中的注释或属性。
以下列表包括有效负载的几个示例:@Payload
payload-expression
method
-
文本字符串
-
#gatewayMethod.name
-
新的 java.util.Date()
-
@someBean.someMethod() 的返回值
以下示例演示如何使用注释:@Payload
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
您也可以使用注释。@Gateway
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
如果两个 Comments 都存在(并且提供了 ),则 win。payloadExpression @Gateway |
另请参阅使用注释和 XML 的网关配置。
如果方法没有参数和返回值,但包含有效负载表达式,则将其视为仅发送操作。
如果两个 Comments 都存在(并且提供了 ),则 win。payloadExpression @Gateway |
调用方法default
网关代理的接口也可能具有方法,从版本 5.3 开始,框架将 a 注入到代理中,以便使用方法而不是代理来调用方法。
JDK 中的接口(例如 )仍可用于网关代理,但由于针对 JDK 类进行实例化的内部 Java 安全原因,无法调用其方法。
这些方法也可以在方法上或注释或 XML 组件上使用显式注释进行代理(丢失它们的实现逻辑,同时恢复以前的网关代理行为)。default
DefaultMethodInvokingMethodInterceptor
default
java.lang.invoke.MethodHandle
java.util.function.Function
default
MethodHandles.Lookup
@Gateway
proxyDefaultMethods
@MessagingGateway
<gateway>
错误处理
网关调用可能会导致错误。 默认情况下,在网关的方法调用时,下游发生的任何错误都会“按原样”重新引发。 例如,请考虑以下简单流程:
gateway -> service-activator
如果服务激活器调用的服务抛出 a(例如),框架会将其包装在 a 中,并将传递给服务激活器的消息附加到属性中。
因此,框架执行的任何日志记录都具有完整的失败上下文。
默认情况下,当网关捕获到异常时,将解包并抛出给调用方。
您可以在网关方法声明上配置子句,以匹配原因链中的特定异常类型。
例如,如果要捕获包含下游错误原因的所有消息收发信息的整体,则应使用类似于以下内容的网关方法:MyException
MessagingException
failedMessage
MyException
throws
MessagingException
public interface MyGateway {
void performProcess() throws MessagingException;
}
由于我们鼓励 POJO 编程,因此您可能不希望将调用方暴露给消息传递基础设施。
如果您的网关方法没有子句,则网关将遍历原因树,查找不是 .
如果未找到,框架将抛出 .
如果前面的讨论中有 和 your method 的原因,则网关会进一步解包该 API 并将其抛给调用方。throws
RuntimeException
MessagingException
MessagingException
MyException
SomeOtherException
throws SomeOtherException
当声明网关时 no ,将使用内部框架接口。service-interface
RequestReplyExchanger
请考虑以下示例:
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
在版本 5.0 之前,此方法没有子句,因此,异常被解包。
如果你使用这个接口,并且想要恢复以前的 unwrap 行为,请使用 custom instead 或访问 yourself.exchange
throws
service-interface
cause
MessagingException
但是,您可能希望记录错误而不是传播错误,或者您可能希望将异常视为有效回复(通过将其映射到符合调用方理解的某个“错误消息”协定的消息)。
为了实现这一点,网关通过包含对属性的支持来为专用于错误的消息通道提供支持。
在以下示例中,“transformer”从 中创建回复:error-channel
Message
Exception
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
它可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。
这将成为发送回给调用方的有效负载。
如有必要,您可以在这样的 “错误流” 中做很多更复杂的事情。
它可能涉及路由器(包括 Spring Integration 的)、过滤器等。
然而,大多数时候,一个简单的 “transformer” 就足够了。exceptionTransformer
ErrorMessageExceptionTypeRouter
或者,您可能希望仅记录异常(或将其异步发送到某个位置)。
如果您提供单向流,则不会将任何内容发送回调用方。
如果要完全禁止显示异常,则可以提供对全局的引用(本质上是一种方法)。
最后,如上所述,如果定义了 no,则异常将照常传播。nullChannel
/dev/null
error-channel
当您使用 Annotation(请参阅 @MessagingGateway的 Annotation 属性。@MessagingGateway
), you can use an `errorChannel
从版本 5.0 开始,当您使用具有返回类型(单向流)的网关方法时,引用(如果提供)将填充到每条已发送消息的标准标头中。
此功能允许基于标准配置(或 a )的下游异步流覆盖默认的全局异常发送行为。
以前,您必须手动指定带有注释或元素的标题。
对于具有异步流的方法,该属性被忽略。
相反,错误消息已发送到默认 。void
error-channel
errorChannel
ExecutorChannel
QueueChannel
errorChannel
errorChannel
@GatewayHeader
<header>
error-channel
void
errorChannel
通过简单的 POJI 网关公开消息传递系统是有好处的,但“隐藏”底层消息传递系统的现实确实是有代价的,因此您应该考虑某些事项。
我们希望我们的 Java 方法尽快返回,而不是在调用者等待它返回时无限期挂起(无论是 void、返回值还是引发的 Exception)。
当常规方法用作消息传递系统前面的代理时,我们必须考虑底层消息传递的潜在异步性质。
这意味着由网关启动的消息有可能被过滤器丢弃,并且永远不会到达负责生成回复的组件。
某些服务激活器方法可能会导致异常,因此不提供回复(因为我们不生成 null 消息)。
换句话说,多种情况都可能导致回复消息永远不会出现。
这在消息传递系统中是非常自然的。
但是,请考虑对网关方法的含义。
网关的方法输入参数被合并到消息中并发送到下游。
回复消息将转换为网关方法的返回值。
因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。
否则,如果设置为负值,则网关方法可能永远不会返回并无限期挂起。
处理这种情况的一种方法是使用异步网关(本节稍后将介绍)。
另一种处理方法是依赖默认值作为秒。
这样,网关的挂起时间不会超过指定的时间,如果超时已过,则返回 'null'。
最后,你可能要考虑在服务激活器上设置下游标志,例如 'requires-reply' 或过滤器上的 'throw-exceptions-on-rejection'。
本章的最后一节将更详细地讨论这些选项。reply-timeout reply-timeout 30 reply-timeout |
如果下游流返回 ,则其 (a ) 将被视为常规下游错误。
如果存在已配置,则会将其发送到错误流。
否则,有效负载将抛给网关的调用方。
同样,如果 上的错误流返回 an ,则会将其有效负载抛出给调用方。
这同样适用于具有有效负载的任何消息。
这在异步情况下非常有用,当您需要将 直接传播到调用方时。
为此,您可以返回 an (as the from some service) 或抛出它。
通常,即使使用异步流,框架也会负责将下游流引发的异常传播回网关。
TCP 客户端-服务器多路复用示例演示了将异常返回给调用方的两种技术。
它通过在 discard 流上使用 with(请参阅 聚合器和组超时)和回复来模拟等待线程的套接字 IO 错误。ErrorMessage payload Throwable error-channel error-channel ErrorMessage Throwable Exception Exception reply aggregator group-timeout MessagingTimeoutException |
通过简单的 POJI 网关公开消息传递系统是有好处的,但“隐藏”底层消息传递系统的现实确实是有代价的,因此您应该考虑某些事项。
我们希望我们的 Java 方法尽快返回,而不是在调用者等待它返回时无限期挂起(无论是 void、返回值还是引发的 Exception)。
当常规方法用作消息传递系统前面的代理时,我们必须考虑底层消息传递的潜在异步性质。
这意味着由网关启动的消息有可能被过滤器丢弃,并且永远不会到达负责生成回复的组件。
某些服务激活器方法可能会导致异常,因此不提供回复(因为我们不生成 null 消息)。
换句话说,多种情况都可能导致回复消息永远不会出现。
这在消息传递系统中是非常自然的。
但是,请考虑对网关方法的含义。
网关的方法输入参数被合并到消息中并发送到下游。
回复消息将转换为网关方法的返回值。
因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。
否则,如果设置为负值,则网关方法可能永远不会返回并无限期挂起。
处理这种情况的一种方法是使用异步网关(本节稍后将介绍)。
另一种处理方法是依赖默认值作为秒。
这样,网关的挂起时间不会超过指定的时间,如果超时已过,则返回 'null'。
最后,你可能要考虑在服务激活器上设置下游标志,例如 'requires-reply' 或过滤器上的 'throw-exceptions-on-rejection'。
本章的最后一节将更详细地讨论这些选项。reply-timeout reply-timeout 30 reply-timeout |
如果下游流返回 ,则其 (a ) 将被视为常规下游错误。
如果存在已配置,则会将其发送到错误流。
否则,有效负载将抛给网关的调用方。
同样,如果 上的错误流返回 an ,则会将其有效负载抛出给调用方。
这同样适用于具有有效负载的任何消息。
这在异步情况下非常有用,当您需要将 直接传播到调用方时。
为此,您可以返回 an (as the from some service) 或抛出它。
通常,即使使用异步流,框架也会负责将下游流引发的异常传播回网关。
TCP 客户端-服务器多路复用示例演示了将异常返回给调用方的两种技术。
它通过在 discard 流上使用 with(请参阅 聚合器和组超时)和回复来模拟等待线程的套接字 IO 错误。ErrorMessage payload Throwable error-channel error-channel ErrorMessage Throwable Exception Exception reply aggregator group-timeout MessagingTimeoutException |
网关超时
网关有两个超时属性:和 .
仅当通道可以阻塞(例如,已满的 bounded)时,请求超时才适用。
该值是网关等待回复或返回 的时间。
它默认为无穷大。requestTimeout
replyTimeout
QueueChannel
replyTimeout
null
超时可以设置为网关 ( 和 ) 或接口注释上所有方法的默认值。
单个方法可以覆盖这些默认值(在子元素中)或注解上。defaultRequestTimeout
defaultReplyTimeout
MessagingGateway
<method/>
@Gateway
从版本 5.0 开始,超时可以定义为表达式,如下例所示:
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
评估上下文具有 (用于引用其他 bean),并且对象的 array 属性可用。
有关此根对象的更多信息,请参阅表达式和 “Global” Headers。
使用 XML 进行配置时,超时属性可以是长值或 SPEL 表达式,如下例所示:BeanResolver
@someBean
args
#root
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>
异步网关
作为一种模式,消息传递网关提供了一种很好的方法来隐藏特定于消息传递的代码,同时仍然公开消息传递系统的全部功能。
如前所述,它提供了一种通过服务接口公开代理的便捷方法,使您可以基于 POJO 访问消息传递系统(基于您自己的域中的对象、原始语/字符串或其他对象)。
但是,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。
由于消息传递系统本质上是异步的,因此您可能无法始终保证 “对于每个请求,总会有一个回复” 的合同。Spring Integration 2.0 引入了对异步网关的支持,当您可能不知道是否需要回复或回复到达需要多长时间时,它提供了一种方便的方式来启动流。GatewayProxyFactoryBean
为了处理这些类型的场景, Spring 集成使用实例来支持异步网关。java.util.concurrent.Future
在 XML 配置中,没有任何变化,您仍然以与定义常规网关相同的方式定义异步网关,如下例所示:
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
但是,网关接口(服务接口)略有不同,如下所示:
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
如前面的示例所示,网关方法的返回类型是 .
当看到网关方法的返回类型为 a 时,它立即使用 切换到异步模式。
这就是差异的程度。
对此类方法的调用始终立即返回实例。
然后,您可以按照自己的节奏与 交互以获取结果、取消等。
此外,与实例的任何其他用法一样,调用 可能会显示超时、执行异常等。
以下示例演示如何使用从异步网关返回的 a:Future
GatewayProxyFactoryBean
Future
AsyncTaskExecutor
Future
Future
Future
get()
Future
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
有关更详细的示例,请参阅 Spring 集成示例中的 async-gateway 示例。
AsyncTaskExecutor
默认情况下,在为返回类型为 .
但是,元素配置中的属性允许您提供对 Spring 应用程序上下文中 available 的任何实现的引用。GatewayProxyFactoryBean
org.springframework.core.task.SimpleAsyncTaskExecutor
AsyncInvocationTask
Future
async-executor
<gateway/>
java.util.concurrent.Executor
(默认)同时支持 和 return 类型。
请参阅 CompletableFuture
。
即使有默认的执行程序,提供外部执行程序通常也很有用,这样你就可以在日志中识别其线程(当使用 XML 时,线程名称基于执行程序的 Bean 名称),如下例所示:SimpleAsyncTaskExecutor
Future
CompletableFuture
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果您希望返回不同的实施,则可以提供自定义执行程序或完全禁用执行程序,并从下游流返回回复消息有效负载。
要禁用执行程序,请在 (通过使用) 中将其设置为 。
使用 XML 配置网关时,请使用 .
使用注释进行配置时,请使用类似于以下内容的代码:Future
Future
null
GatewayProxyFactoryBean
setAsyncTaskExecutor(null)
async-executor=""
@MessagingGateway
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果返回类型是特定的具体实现或配置的执行程序不支持的其他子接口,则流将在调用方的线程上运行,并且流必须在回复消息有效负载中返回所需的类型。Future |
CompletableFuture
从版本 4.2 开始,网关方法现在可以返回 .
返回此类型时有两种操作模式:CompletableFuture<?>
-
当提供了异步执行程序并且返回类型恰好是(不是子类)时,框架会在执行程序上运行任务,并立即将 a 返回给调用者。 用于创造未来。
CompletableFuture
CompletableFuture
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
-
当异步执行程序显式设置为 且返回类型为 或 返回类型 是 的子类 时,将在调用方的线程上调用流。 在这种情况下,下游流应返回适当类型的 a。
null
CompletableFuture
CompletableFuture
CompletableFuture
从 Spring Framework 开始,已弃用。
现在建议迁移到 提供类似处理功能的 。org.springframework.util.concurrent.ListenableFuture 6.0 CompletableFuture |
使用场景
在以下场景中,调用方线程立即返回 ,当下游流回复网关(带有对象)时,该响应完成。CompletableFuture<Invoice>
Invoice
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
在以下场景中,当下游流将其作为对网关的回复的负载提供时,调用方线程将返回 a。
当发票准备好时,必须完成其他一些流程。CompletableFuture<Invoice>
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
在以下场景中,当下游流将其作为对网关的回复的负载提供时,调用方线程将返回 a。
当发票准备好时,必须完成其他一些流程。
如果启用了日志记录,则会发出一个日志条目,指示异步执行程序不能用于此方案。CompletableFuture<Invoice>
DEBUG
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
CompletableFuture
实例可用于对回复执行其他操作,如下例所示:
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
反应器Mono
从版本 5.0 开始,允许使用 Mono<T>
返回类型将 Project Reactor 与网关接口方法一起使用。
内部包裹在 .GatewayProxyFactoryBean
AsyncInvocationTask
Mono.fromCallable()
A 可用于稍后检索结果(类似于 a ),或者您可以通过在结果返回到网关时调用 your 来与调度程序一起使用它。Mono
Future<?>
Consumer
框架不会立即刷新 。
因此,底层消息流不会在网关方法返回之前启动(就像任务一样)。
流在订阅时启动。
或者,当 与整个 .
以下示例显示了如何使用 Project Reactor 创建网关:Mono Future<?> Executor Mono Mono subscribe() Flux |
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);
}
@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}
其中,这样的网关可以用于处理 OF 数据的某些服务中:Flux
@Autowired
TestGateway testGateway;
public void hadnleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}
使用 Project Reactor 的另一个示例是一个简单的回调场景,如下例所示:
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
调用线程将继续,并在流完成时被调用。handleInvoice()
如需了解详情,另请参阅 Kotlin 协程。
返回异步类型的下游流
如上面的 AsyncTaskExecutor
部分所述,如果你希望某个下游组件返回带有异步有效负载(、 、 和其他)的消息,你必须显式地将异步 executor 设置为 (或使用 XML 配置时)。
然后,在调用方线程上调用该流,稍后可以检索结果。Future
Mono
null
""
异步返回类型void
消息网关方法可以按如下方式声明:
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
但是下游异常不会传播回调用方。
为了确保下游流调用和异常传播到调用方的异步行为,从版本 6.0 开始,框架提供了对 和 return 类型的支持。
该用例类似于前面描述的普通返回类型的发送和忘记行为,但不同的是,流执行是异步发生的,并且根据操作结果返回 (or ) 以 or 例外完成。Future<Void>
Mono<Void>
void
Future
Mono
null
send
如果 is exact downstream flow reply,则必须将网关的选项设置为 null(对于配置),并在生产者线程上执行该部分。
回复 1 取决于下游流配置。
这样,目标应用程序就可以正确生成回复。
用例已经超出了框架线程控制范围,因此设置为 null 没有意义。
作为 request-reply 网关操作的结果,必须将其配置为网关方法的返回类型。Future<Void> asyncExecutor AnnotationConstants.NULL @MessagingGateway send Future<Void> Mono asyncExecutor Mono<Void> Mono<?> |
如果返回类型是特定的具体实现或配置的执行程序不支持的其他子接口,则流将在调用方的线程上运行,并且流必须在回复消息有效负载中返回所需的类型。Future |
从 Spring Framework 开始,已弃用。
现在建议迁移到 提供类似处理功能的 。org.springframework.util.concurrent.ListenableFuture 6.0 CompletableFuture |
框架不会立即刷新 。
因此,底层消息流不会在网关方法返回之前启动(就像任务一样)。
流在订阅时启动。
或者,当 与整个 .
以下示例显示了如何使用 Project Reactor 创建网关:Mono Future<?> Executor Mono Mono subscribe() Flux |
如果 is exact downstream flow reply,则必须将网关的选项设置为 null(对于配置),并在生产者线程上执行该部分。
回复 1 取决于下游流配置。
这样,目标应用程序就可以正确生成回复。
用例已经超出了框架线程控制范围,因此设置为 null 没有意义。
作为 request-reply 网关操作的结果,必须将其配置为网关方法的返回类型。Future<Void> asyncExecutor AnnotationConstants.NULL @MessagingGateway send Future<Void> Mono asyncExecutor Mono<Void> Mono<?> |
未到达响应时的网关行为
如前所述,网关提供了一种通过 POJO 方法调用与消息传递系统交互的便捷方式。 但是,通常预期始终返回的典型方法调用(即使有 Exception)可能并不总是一对一地映射到消息交换(例如,回复消息可能未到达 — 相当于方法未返回)。
本节的其余部分介绍各种场景以及如何使网关的行为更具可预测性。
可以配置某些属性以使同步网关行为更具可预测性,但其中一些属性可能并不总是像您预期的那样工作。
其中之一是 (在方法级别或网关级别)。
我们检查该属性,以了解它在各种场景中如何能够和不能影响 synchronous gateway 的行为。
我们研究了单线程场景(下游的所有组件都通过直接通道连接)和多线程场景(例如,在下游的某个地方,你可能有一个打破单线程边界的 pollable 或 executor 通道)。reply-timeout
default-reply-timeout
reply-timeout
长时间运行的流程下游
- Sync Gateway,单线程
-
如果下游组件仍在运行(可能是由于无限循环或服务缓慢),则设置 a 不起作用,并且网关方法调用不会返回,直到下游服务退出(通过返回或抛出异常)。
reply-timeout
- Sync Gateway,多线程
-
如果下游组件仍在多线程消息流中运行(可能是由于无限循环或服务缓慢),则设置 () 的效果是允许网关方法调用在达到超时后返回,因为在回复通道上轮询,等待消息,直到超时到期。 但是,如果在生成实际回复之前已达到超时,则可能导致网关方法返回 'null'。 您应该了解,回复消息(如果生成)是在网关方法调用可能返回后发送到回复通道的,因此您必须了解这一点并在设计流程时牢记这一点。
reply-timeout
GatewayProxyFactoryBean
另请参阅属性 to throw a 而不是 return ,当发生超时时。errorOnTimeout
MessageTimeoutException
null
下游组件返回 'null'
- Sync Gateway — 单线程
-
如果组件下游返回 'null' 并且 已配置为负值,则网关方法调用将无限期挂起,除非已在可能返回 'null' 的下游组件(例如,服务激活器)上设置了该属性。 在这种情况下,将引发异常并将其传播到网关。
reply-timeout
requires-reply
- Sync Gateway — 多线程
-
该行为与前一种情况相同。
下游组件返回签名为“void”,而网关方法签名为非 void
- Sync Gateway — 单线程
-
如果下游组件返回 'void' 并且 已配置为负值,则网关方法调用将无限期挂起。
reply-timeout
- Sync Gateway — 多线程
-
该行为与前一种情况相同。
下游组件导致运行时异常
- Sync Gateway — 单线程
-
如果组件下游引发运行时异常,则异常将通过错误消息传播回网关并重新引发。
- Sync Gateway — 多线程
-
该行为与前一种情况相同。
您应该了解,默认情况下,它是无界的。
因此,如果您将 设置为 负值,则网关方法调用可能会无限期挂起。
因此,为了确保您分析了您的流程,并且如果其中一种情况发生的可能性很小,您应该将属性设置为 “'safe'” 值。
默认情况下是秒。
更好的是,您可以将下游组件的属性设置为 'true' 以确保及时响应,就像下游组件在内部返回 null 时引发异常而产生的响应。
但是,您还应该意识到,在某些情况下(请参阅第一个)没有帮助。
这意味着分析您的消息流并决定何时使用同步网关而不是异步网关也很重要。
如前所述,后一种情况是定义返回实例的网关方法的问题。
然后,您可以保证收到该返回值,并且您可以对调用结果进行更精细的控制。
此外,在处理路由器时,您应该记住,如果路由器无法解析特定通道,则将属性设置为 'true' 会导致路由器抛出异常。
同样,在处理 Filter 时,您可以设置该属性。
在这两种情况下,生成的流的行为类似于它包含具有 'requires-reply' 属性的服务激活器。
换句话说,它有助于确保网关方法调用的及时响应。reply-timeout reply-timeout reply-timeout 30 requires-reply reply-timeout Future resolution-required throw-exception-on-rejection |
您应该了解,计时器在线程返回到网关时启动,即当流完成或将消息传递给另一个线程时。 此时,调用线程开始等待回复。 如果流是完全同步的,则回复将立即可用。 对于异步流,线程将等待 this time。 |
从版本 6.2 开始,内部扩展的属性在 和 上公开。
此选项的含义与 Endpoint Summary 一章末尾解释的任何入站网关的含义完全相同。
换句话说,将此选项设置为 ,将导致从 send-and-receive 网关操作中抛出,而不是在接收超时用完时返回。errorOnTimeout
MethodInvocationGateway
MessagingGatewaySupport
@MessagingGateway
GatewayEndpointSpec
true
MessageTimeoutException
null
请参阅 Java DSL 一章中的 IntegrationFlow
as Gateway,了解通过 定义网关的选项。IntegrationFlow
您应该了解,默认情况下,它是无界的。
因此,如果您将 设置为 负值,则网关方法调用可能会无限期挂起。
因此,为了确保您分析了您的流程,并且如果其中一种情况发生的可能性很小,您应该将属性设置为 “'safe'” 值。
默认情况下是秒。
更好的是,您可以将下游组件的属性设置为 'true' 以确保及时响应,就像下游组件在内部返回 null 时引发异常而产生的响应。
但是,您还应该意识到,在某些情况下(请参阅第一个)没有帮助。
这意味着分析您的消息流并决定何时使用同步网关而不是异步网关也很重要。
如前所述,后一种情况是定义返回实例的网关方法的问题。
然后,您可以保证收到该返回值,并且您可以对调用结果进行更精细的控制。
此外,在处理路由器时,您应该记住,如果路由器无法解析特定通道,则将属性设置为 'true' 会导致路由器抛出异常。
同样,在处理 Filter 时,您可以设置该属性。
在这两种情况下,生成的流的行为类似于它包含具有 'requires-reply' 属性的服务激活器。
换句话说,它有助于确保网关方法调用的及时响应。reply-timeout reply-timeout reply-timeout 30 requires-reply reply-timeout Future resolution-required throw-exception-on-rejection |
您应该了解,计时器在线程返回到网关时启动,即当流完成或将消息传递给另一个线程时。 此时,调用线程开始等待回复。 如果流是完全同步的,则回复将立即可用。 对于异步流,线程将等待 this time。 |