对于最新的稳定版本,请使用 Spring Integration 6.4.3! |
消息路由器
Spring 集成原生提供了专门的路由器类型,包括:
-
HeaderValueRouter
-
PayloadTypeRouter
-
ExceptionTypeRouter
-
RecipientListRouter
-
XPathRouter
与许多其他 DSL 一样IntegrationFlowBuilder
EIP 方法、route()
method 可以应用任何AbstractMessageRouter
implementation 或者为方便起见,使用String
作为 SPEL 表达式或ref
-method
双。
此外,您还可以配置route()
替换为 lambda,并将 lambda 用于Consumer<RouterSpec<MethodInvokingRouter>>
.
Fluent API 还提供AbstractMappingMessageRouter
选项,例如channelMapping(String key, String channelName)
对,如下例所示:
@Bean
public IntegrationFlow routeFlowByLambda() {
return IntegrationFlow.from("routerInput")
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.suffix("Channel")
.channelMapping(true, "even")
.channelMapping(false, "odd")
)
.get();
}
以下示例显示了一个简单的基于表达式的路由器:
@Bean
public IntegrationFlow routeFlowByExpression() {
return IntegrationFlow.from("routerInput")
.route("headers['destChannel']")
.get();
}
这routeToRecipients()
method 采用Consumer<RecipientListRouterSpec>
,如下例所示:
@Bean
public IntegrationFlow recipientListFlow() {
return IntegrationFlow.from("recipientListInput")
.<String, String>transform(p -> p.replaceFirst("Payload", ""))
.routeToRecipients(r -> r
.recipient("thing1-channel", "'thing1' == payload")
.recipientMessageSelector("thing2-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("thing3"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"thing3".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
.get();
}
这.defaultOutputToParentFlow()
的.routeToRecipients()
definition 允许您设置路由器的defaultOutput
作为网关,以继续处理 MAIN FLOW 中不匹配的消息。
另请参阅lambda 和Message<?>
参数.