Java DSL
Java DSL
Spring 集成 Java 配置和 DSL 提供了一组方便的构建器和一个 Fluent API,允许您从 Spring 类配置 Spring 集成消息流。@Configuration
(另请参阅 Kotlin DSL。
(另请参见 Groovy DSL。
用于 Spring Integration 的 Java DSL 本质上是 Spring Integration 的门面。
DSL 提供了一种简单的方法,通过将 Fluent 模式与 Spring Framework 和 Spring Integration 中的现有 Java 配置结合使用,将 Spring Integration 消息流嵌入到您的应用程序中。
我们还使用并支持 lambda(在 Java 8 中提供)来进一步简化 Java 配置。Builder
这家咖啡馆提供了使用 DSL 的一个很好的例子。
DSL 由 Fluent API 提供(请参阅 )。
这将生成组件,该组件应注册为 Spring bean(通过使用 Comments)。
构建器模式用于将任意复杂的结构表示为可以接受 lambda 作为参数的方法层次结构。IntegrationFlow
IntegrationFlowBuilder
IntegrationFlow
@Bean
唯一在 bean 中收集集成组件(实例、实例等),以便通过 .IntegrationFlowBuilder
MessageChannel
AbstractEndpoint
IntegrationFlow
IntegrationFlowBeanPostProcessor
Java DSL 直接使用 Spring 集成类,并绕过任何 XML 生成和解析。 然而,DSL 在 XML 之上提供的不仅仅是语法糖。 它最引人注目的功能之一是能够定义内联 lambda 来实现终端节点逻辑,无需外部类来实现自定义逻辑。 从某种意义上说,Spring 集成对 Spring 表达式语言 (SpEL) 和内联脚本的支持解决了这个问题,但 lambda 更简单、功能更强大。
下面的示例展示了如何使用 Java 配置进行 Spring 集成:
@Configuration
@EnableIntegration
public class MyConfiguration {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.fromSupplier(integerSource()::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.channel("inputChannel")
.filter((Integer p) -> p > 0)
.transform(Object::toString)
.channel(MessageChannels.queue())
.get();
}
}
前面的配置示例的结果是,它在启动后创建 Spring 集成端点和消息通道。
Java 配置可用于替换和增强 XML 配置。
您无需替换所有现有的 XML 配置即可使用 Java 配置。ApplicationContext
DSL 基础知识
该包包含前面提到的 API 和许多实现,这些实现也是构建器,并提供 Fluent API 来配置具体端点。
该基础设施为基于消息的应用程序(例如通道、终端节点、轮询程序和通道拦截器)提供通用的企业集成模式 (EIP)。org.springframework.integration.dsl
IntegrationFlowBuilder
IntegrationComponentSpec
IntegrationFlowBuilder
端点在 DSL 中表示为动词,以提高可读性。 以下列表包括常见的 DSL 方法名称和关联的 EIP Endpoint:
-
转换→
Transformer
-
筛选→
Filter
-
手柄 →
ServiceActivator
-
拆分→
Splitter
-
聚合→
Aggregator
-
路线 →
Router
-
桥接→
Bridge
从概念上讲,集成流程是通过将这些端点组合成一个或多个消息流来构建的。
请注意,EIP 并未正式定义术语“消息流”,但将其视为使用众所周知的消息传递模式的工作单元会很有用。
DSL 提供了一个组件来定义通道和它们之间的端点的组合,但现在只扮演配置角色,在应用程序上下文中填充真实的 bean,并且在运行时不使用。
但是,bean for 可以自动连接为 to control 和 for 整个 flow,它被委托给与此关联的所有 Spring Integration 组件。
以下示例使用 Fluent API 通过使用 EIP-methods 来定义 bean:IntegrationFlow
IntegrationFlow
IntegrationFlow
Lifecycle
start()
stop()
IntegrationFlow
IntegrationFlow
IntegrationFlow
IntegrationFlowBuilder
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<String, Integer>transform(Integer::parseInt)
.get();
}
该方法接受 lambda 作为终端节点参数,以对消息负载进行操作。
此方法的真正参数是实例。
因此,这里可以使用任何提供的转换器 (, , 和其他)。transform
GenericTransformer<S, T>
ObjectToJsonTransformer
FileToStringTransformer
在后台,可识别它的 和 终端节点,分别使用 和 。
考虑另一个例子:IntegrationFlowBuilder
MessageHandler
MessageTransformingHandler
ConsumerEndpointFactoryBean
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("input")
.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println)
.get();
}
前面的示例组成了一系列 .
流程是 “'单向'”。
也就是说,它不提供回复消息,而只将有效负载打印到 STDOUT。
终端节点使用直接通道自动连接在一起。Filter → Transformer → Service Activator
Lambda 和参数
Message<?> 在 EIP 方法中使用 lambda 表达式时,“input” 参数通常是消息负载。
如果要访问整个消息,请使用将 a 作为第一个参数的重载方法之一。
例如,this 将不起作用:
这在运行时将失败,因为 lambda 不保留参数类型,并且框架将尝试将有效负载转换为 . 相反,请使用:
|
Bean 定义覆盖
Java DSL 可以为流定义中内联定义的对象注册 bean,也可以重用现有的注入的 bean。
如果为内联对象和现有 bean 定义定义了相同的 bean 名称,则会抛出 a,指示此类配置是错误的。
但是,在处理 bean 时,无法从集成流处理器中检测现有的 bean 定义,因为每次我们从 调用 bean 时,都会得到一个新实例。
这样,提供的实例就可以按原样使用,而无需任何 bean 注册,也不需要对现有 bean 定义进行任何可能的检查。
但是,如果此对象具有显式,并且此名称的 bean 定义在范围内,则为此对象调用。 |
消息通道
除了 with EIP 方法之外,Java DSL 还提供了一个 Fluent API 来配置实例。
为此,提供了 builder factory。
以下示例演示如何使用它:IntegrationFlowBuilder
MessageChannel
MessageChannels
@Bean
public MessageChannel priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap())
.get();
}
同一个构建器工厂可以在 EIP 方法 from to wire endpoints 中使用,类似于在 XML 配置中连接 / 对。
默认情况下,端点与 Bean 名称基于以下模式的实例连接:。
此规则也适用于内联构建器工厂使用生成的未命名通道。
但是,所有方法都有一个变体,该变体知道 ,您可以使用它来设置实例的 bean 名称。
引用 和 可以用作 bean 方法调用。
以下示例显示了使用 EIP 方法的可能方法:MessageChannels
channel()
IntegrationFlowBuilder
input-channel
output-channel
DirectChannel
[IntegrationFlow.beanName].channel#[channelNameIndex]
MessageChannels
MessageChannels
channelId
MessageChannel
MessageChannel
beanName
channel()
@Bean
public MessageChannel queueChannel() {
return MessageChannels.queue().get();
}
@Bean
public MessageChannel publishSubscribe() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")
表示 “'查找并使用 '输入' ID 的 ,或创建一个 ID'”。MessageChannel
-
fixedSubscriberChannel()
生成 的实例并使用名称 .FixedSubscriberChannel
channelFlow.channel#0
-
channel("queueChannel")
工作方式相同,但使用现有的 Bean。queueChannel
-
channel(publishSubscribe())
是 Bean 方法引用。 -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))
是 公开 并将其注册为 的 。IntegrationFlowBuilder
IntegrationComponentSpec
ExecutorChannel
executorChannel
-
channel("output")
将 Bean 注册为其名称,只要不存在具有此名称的 Bean。DirectChannel
output
注意:前面的定义是有效的,它的所有通道都应用于具有实例的终端节点。IntegrationFlow
BridgeHandler
请注意,通过工厂从不同的实例使用相同的内联通道定义。
即使 DSL 解析器将不存在的对象注册为 bean,它也无法从不同的容器中确定相同的对象 ()。
以下示例是错误的:MessageChannels IntegrationFlow MessageChannel IntegrationFlow |
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
该错误示例的结果是以下异常:
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
要使其工作,您需要为该通道声明并从不同的实例中使用其 bean 方法。@Bean
IntegrationFlow
轮询器
Spring 集成还提供了一个 Fluent API,允许您配置实现。
您可以使用 builder Factory 配置常见的 bean 定义或通过 EIP 方法创建的 bean 定义,如下例所示:PollerMetadata
AbstractPollingEndpoint
Pollers
IntegrationFlowBuilder
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500)
.errorChannel("myErrors");
}
有关更多信息,请参见 Javadoc 中的 Pollers
和 PollerSpec
。
如果使用 DSL 将 a 构造为 ,请不要在 Bean 定义中调用该方法。
这是从规范生成对象并初始化其所有属性的 a。PollerSpec @Bean get() PollerSpec FactoryBean PollerMetadata |
端点reactive()
从版本 5.5 开始,提供带有可选 customizer 的 configuration 属性。
此选项将目标端点配置为实例,独立于输入通道类型,该类型将转换为 via 。
运算符使用提供的函数来自定义 (, , 等) 来自 input 通道的反应流源。ConsumerEndpointSpec
reactive()
Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
ReactiveStreamsConsumer
Flux
IntegrationReactiveUtils.messageChannelToFlux()
Flux.transform()
publishOn()
log()
doOnNext()
以下示例演示如何将发布线程从独立于最终订阅者和生成方的 input 通道更改为该 input 通道:DirectChannel
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
.get();
}
有关更多信息,请参阅 Reactive Streams Support 。
DSL 和端点配置
所有 EIP 方法都有一个变体,该变体应用 lambda 参数来为实例提供选项:、、 、 和其他实例。
它们中的每一个都有泛型参数,因此它允许您在上下文中配置终端节点,甚至配置终端节点,如下例所示:IntegrationFlowBuilder
AbstractEndpoint
SmartLifecycle
PollerMetadata
request-handler-advice-chain
MessageHandler
@Bean
public IntegrationFlow flow2() {
return IntegrationFlow.from(this.inputChannel)
.transform(new PayloadSerializingTransformer(),
c -> c.autoStartup(false).id("payloadSerializingTransformer"))
.transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
.get();
}
此外,还提供了一种方法,允许您使用给定的 bean 名称而不是生成的 bean 名称注册端点 bean。EndpointSpec
id()
如果 the 被引用为 bean,那么如果该方法存在于 DSL 定义中,那么任何现有的配置都将被覆盖:MessageHandler
adviceChain
.advice()
@Bean
public TcpOutboundGateway tcpOut() {
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(cf());
gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
return gateway;
}
@Bean
public IntegrationFlow clientTcpFlow() {
return f -> f
.handle(tcpOut(), e -> e.advice(testAdvice()))
.transform(Transformers.objectToString());
}
它们不会合并,在这种情况下只使用 bean。testAdvice()
变形金刚
DSL API 提供了一个方便、流畅的工厂,可用作 EIP 方法中的内联目标对象定义。
以下示例演示如何使用它:Transformers
.transform()
@Bean
public IntegrationFlow transformFlow() {
return IntegrationFlow.from("input")
.transform(Transformers.fromJson(MyPojo.class))
.transform(Transformers.serializer())
.get();
}
它避免了使用 setter 进行不方便的编码,并使流定义更加简单。
请注意,您可以使用 将目标实例声明为实例,并再次将它们从定义中用作 bean 方法。
尽管如此,如果内联对象尚未定义为 bean,则 DSL 解析器会处理这些对象的 bean 声明。Transformers
Transformer
@Bean
IntegrationFlow
有关更多信息和支持的工厂方法,请参阅 Javadoc 中的 Transformers。
另请参阅 Lambda 和 Message<?>
参数。
入站通道适配器
通常,消息流从入站通道适配器(例如 )开始。
适配器配置了 ,它要求 定期生成消息。
Java DSL 也允许从 .
为此,Fluent API 提供了一个重载方法。
您可以将 配置为 bean 并将其作为该方法的参数提供。
的第二个参数是一个 lambda,它允许您为 .
以下示例显示如何使用 Fluent API 和 lambda 创建:<int-jdbc:inbound-channel-adapter>
<poller>
MessageSource<?>
IntegrationFlow
MessageSource<?>
IntegrationFlow
IntegrationFlow.from(MessageSource<?> messageSource)
MessageSource<?>
IntegrationFlow.from()
Consumer<SourcePollingChannelAdapterSpec>
PollerMetadata
SmartLifecycle
SourcePollingChannelAdapter
IntegrationFlow
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}
对于不需要直接构建对象的情况,您可以使用基于 .
的结果会自动包装在 a 中(如果它还不是 a )。Message
IntegrationFlow.fromSupplier()
java.util.function.Supplier
Supplier.get()
Message
Message
消息路由器
Spring 集成原生提供了专门的路由器类型,包括:
-
HeaderValueRouter
-
PayloadTypeRouter
-
ExceptionTypeRouter
-
RecipientListRouter
-
XPathRouter
与许多其他 DSL EIP 方法一样,该方法可以应用任何实现,或者为方便起见,可以应用作为 SPEL 表达式或 - 对。
此外,您还可以使用 lambda 进行配置,并将 lambda 用于 .
Fluent API 还提供 pairs 等选项,如下例所示:IntegrationFlowBuilder
route()
AbstractMessageRouter
String
ref
method
route()
Consumer<RouterSpec<MethodInvokingRouter>>
AbstractMappingMessageRouter
channelMapping(String key, String channelName)
@Bean
public IntegrationFlow routeFlowByLambda() {
return IntegrationFlow.from("routerInput")
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.suffix("Channel")
.channelMapping(true, "even")
.channelMapping(false, "odd")
)
.get();
}
以下示例显示了一个简单的基于表达式的路由器:
@Bean
public IntegrationFlow routeFlowByExpression() {
return IntegrationFlow.from("routerInput")
.route("headers['destChannel']")
.get();
}
该方法采用 ,如下例所示:routeToRecipients()
Consumer<RecipientListRouterSpec>
@Bean
public IntegrationFlow recipientListFlow() {
return IntegrationFlow.from("recipientListInput")
.<String, String>transform(p -> p.replaceFirst("Payload", ""))
.routeToRecipients(r -> r
.recipient("thing1-channel", "'thing1' == payload")
.recipientMessageSelector("thing2-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("thing3"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"thing3".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
.get();
}
的 定义允许您将路由器设置为网关,以继续处理主流中不匹配的消息。.defaultOutputToParentFlow()
.routeToRecipients()
defaultOutput
另请参阅 Lambda 和 Message<?>
参数。
分配器
创建拆分器,请使用 EIP 方法。
默认情况下,如果有效负载是 、 、 、 、 或 a reactive ,则该方法将每个项目输出为单个消息。
它接受 lambda、SPEL 表达式或任何实现。
或者,您也可以在不带参数的情况下使用它来提供 .
以下示例显示如何通过提供 lambda 来使用该方法:split()
Iterable
Iterator
Array
Stream
Publisher
split()
AbstractMessageSplitter
DefaultMessageSplitter
split()
@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlow.from("splitInput")
.split(s -> s.applySequence(false).delimiters(","))
.channel(MessageChannels.executor(taskExecutor()))
.get();
}
前面的示例创建一个拆分器,该拆分器拆分包含逗号分隔的消息。String
另请参阅 Lambda 和 Message<?>
参数。
聚合器和 Resequencer
从概念上讲,An 与 .
它将一系列单独的消息聚合到一条消息中,并且必然更复杂。
默认情况下,聚合器返回一条消息,其中包含来自传入消息的有效负载集合。
相同的规则也适用于 .
以下示例显示了 splitter-aggregator 模式的规范示例:Aggregator
Splitter
Resequencer
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
该方法将列表拆分为单独的消息,并将它们发送到 .
该方法按消息标头中找到的序列详细信息对消息重新排序。
该方法收集这些消息。split()
ExecutorChannel
resequence()
aggregate()
但是,您可以通过指定发布策略和关联策略等来更改默认行为。 请考虑以下示例:
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
前面的示例将具有标头的消息关联起来,并在至少累积 10 封邮件后释放这些消息。myCorrelationKey
为 EIP 方法提供了类似的 lambda 配置。resequence()
Service Activator 和方法.handle()
EIP 方法的目标是调用某些 POJO 上的任何实现或任何方法。
另一种选择是使用 lambda 表达式定义 “活动”。
因此,我们引入了一个通用的函数式接口。
它的方法需要两个参数:和(从版本 5.1 开始)。
有了这个,我们可以定义一个 flow,如下所示:.handle()
MessageHandler
GenericHandler<P>
handle
P payload
MessageHeaders headers
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from("flow3Input")
.<Integer>handle((p, h) -> p * 2)
.get();
}
前面的示例将它接收的任何整数加倍。
但是, Spring Integration 的一个主要目标是通过运行时类型从消息有效负载到消息处理程序的目标参数的转换。
由于 Java 不支持 lambda 类的泛型类型解析,因此我们为大多数 EIP 方法引入了一种解决方法,其中包含一个附加参数。
这样做会将硬转换工作委托给 Spring 的,它使用提供的和请求的消息来定位方法参数。
下面的示例显示了结果可能是什么样子的:loose coupling
payloadType
LambdaMessageProcessor
ConversionService
type
IntegrationFlow
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.<byte[], String>transform(p - > new String(p, "UTF-8"))
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
我们也可以在其中注册一些来删除额外的 :BytesToIntegerConverter
ConversionService
.transform()
@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
return new BytesToIntegerConverter();
}
@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlow.from("input")
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
另请参阅 Lambda 和 Message<?>
参数。
操作员 gateway()
定义中的 operator 是一个特殊的服务激活器实现,通过其 input 通道调用其他端点或集成流并等待回复。
从技术上讲,它与定义中的嵌套组件起着相同的作用(参见 从 Chain 中调用 Chain),并允许流更清晰、更直接。
从逻辑上和业务角度来看,它是一个消息传递网关,允许在目标集成解决方案的不同部分之间分发和重用功能(请参阅 消息传递网关)。
此运算符具有多个用于不同目标的重载:gateway()
IntegrationFlow
<gateway>
<chain>
-
gateway(String requestChannel)
按其名称将消息发送到某个端点的 input 通道; -
gateway(MessageChannel requestChannel)
通过直接注入将消息发送到某个端点的 input 通道; -
gateway(IntegrationFlow flow)
将消息发送到提供的 的 input 通道。IntegrationFlow
所有这些都具有一个变体,其中包含第二个参数,用于配置 target 和相应的 。
此外,基于 -的方法允许调用现有 bean 或通过功能接口的就地 lambda 将流声明为子流,或者以方法清理器代码样式提取它:Consumer<GatewayEndpointSpec>
GatewayMessageHandler
AbstractEndpoint
IntegrationFlow
IntegrationFlow
IntegrationFlow
private
@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流并不总是返回回复,则应将 设置为 0 以防止无限期挂起调用线程。
在这种情况将在该点结束,并释放线程以进行进一步的工作。requestTimeout |
运算符 log()
为方便起见,为了通过 Spring 集成流 () 记录消息旅程,提供了一个运算符。
在内部,它由 表示,其中 a 作为其订阅者。
它负责将传入消息记录到下一个终端节点或当前通道中。
以下示例演示如何使用:<logging-channel-adapter>
log()
WireTap
ChannelInterceptor
LoggingHandler
LoggingHandler
.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)
在前面的示例中,标头仅在 on 级别记录通过筛选条件且在路由之前的消息。id
ERROR
test.category
从版本 6.0 开始,此运算符在 flow 末尾的行为与其在 middle 中的用法一致。
换句话说,即使删除了运算符,流的行为也保持不变。
因此,如果预计不会在流结束时生成回复,则建议在最后一个 之后使用 。log()
nullChannel()
log()
运算符 intercept()
从版本 5.3 开始,该 operator 允许在 flow 中的当前位置注册一个或多个实例。
这是通过 API 创建 explicit 的替代方法。
以下示例使用 a 拒绝某些邮件,但存在异常:intercept()
ChannelInterceptor
MessageChannel
MessageChannel
MessageChannels
MessageSelectingInterceptor
.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)
MessageChannelSpec.wireTap()
Spring 集成包括一个 Fluent API 构建器。
以下示例演示如何使用该方法记录 input:.wireTap()
MessageChannelSpec
wireTap
@Bean
public QueueChannelSpec myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input");
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
如果 是 的实例,则 、 或 运算符将应用于当前 。
否则,将中间节点注入到当前配置的终端节点的流程中。
在下面的示例中,拦截器被直接添加到 ,因为 implements :
|
当 current 未实现 时,隐式 和 被注入到 中,并且 被添加到这个新的 .
以下示例没有任何 channel 声明:MessageChannel
InterceptableChannel
DirectChannel
BridgeHandler
IntegrationFlow
WireTap
DirectChannel
.handle(...)
.log()
}
在前面的示例中(以及任何时候未声明通道时),将隐式注入到的当前位置,并用作当前配置的输出通道(来自前面描述的)。DirectChannel
IntegrationFlow
ServiceActivatingHandler
.handle()
使用消息流
IntegrationFlowBuilder
提供顶级 API 以生成连接到消息流的集成组件。
当您的集成可以通过单个流完成时(通常是这种情况),这很方便。
或者,可以通过实例联接实例。IntegrationFlow
MessageChannel
默认情况下,在 Spring 集成的说法中表现为“链”。
也就是说,终端节点由实例自动隐式连接。
消息流实际上并不是作为一个链构建的,这提供了更大的灵活性。
例如,如果您知道流中的任何组件名称(即,如果您明确定义它),则可以向流中的任何组件发送消息。
您还可以在流中引用外部定义的通道,以允许使用通道适配器(以启用远程传输协议、文件 I/O 等),而不是直接通道。
因此,DSL 不支持 Spring Integration 元素,因为在这种情况下它不会增加太多价值。MessageFlow
DirectChannel
inputChannel
chain
由于 Spring 集成 Java DSL 生成与任何其他配置选项相同的 bean 定义模型,并且基于现有的 Spring Framework 基础结构,因此它可以与 XML 定义一起使用,并与 Spring 集成消息传递注释配置连接。@Configuration
您还可以使用 lambda 定义直接实例。
以下示例显示了如何执行此操作:IntegrationFlow
@Bean
public IntegrationFlow lambdaFlow() {
return f -> f.filter("World"::equals)
.transform("Hello "::concat)
.handle(System.out::println);
}
此定义的结果是与隐式直接通道连接的同一组集成组件。
此处的唯一限制是此流从命名的直接渠道 - 启动。
此外,Lambda 流不能从 或 开始。lambdaFlow.input
MessageSource
MessageProducer
从版本 5.1 开始,这种类型被包装到代理中,以公开生命周期控制并提供对内部关联的 .IntegrationFlow
inputChannel
StandardIntegrationFlow
从版本 5.0.6 开始,为 中的组件生成的 bean 名称包括流 bean,后跟一个点 () 作为前缀。
例如,前面示例中的 for the 会导致 Bean 名称 。
(这是 to fit on the page 的缩写。
该端点的实现 Bean 的 Bean 名称为(从版本 5.1 开始),其中使用其组件类型,而不是类的完全限定名称。
当必须在流中生成 bean 名称时,相同的模式将应用于所有 s。
这些生成的 Bean 名称前面加上流 ID,用于解析日志或在某些分析工具中将组件分组在一起,以及避免在运行时同时注册集成流时出现争用情况。
有关更多信息,请参阅 动态和运行时集成流 。IntegrationFlow
.
ConsumerEndpointFactoryBean
.transform("Hello "::concat)
lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0
o.s.i
org.springframework.integration
Transformer
lambdaFlow.transformer#0
MethodInvokingTransformer
NamedComponent
FunctionExpression
我们引入了类(SpEL 接口的实现),让我们使用 lambda 和 .
当存在来自 Core Spring Integration 的隐式变体时,为 DSL 组件提供了一个选项以及一个选项。
以下示例演示如何使用函数表达式:FunctionExpression
Expression
generics
Function<T, R>
expression
Strategy
.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))
还支持运行时类型转换,如 中所示。FunctionExpression
SpelExpression
子流支持
某些 and 组件提供了使用子流指定其逻辑或映射的功能。
最简单的示例是 ,如下例所示:if…else
publish-subscribe
.publishSubscribeChannel()
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
您可以使用单独的定义获得相同的结果,但我们希望您发现 logic composition 的 sub-flow 风格很有用。
我们发现它会导致更短(因此更具可读性)的代码。IntegrationFlow
@Bean
从版本 5.3 开始,提供了一个基于 的实现,用于在代理支持的消息通道上配置子流订阅者。
例如,我们现在可以将多个订阅者配置为 :BroadcastCapableChannel
publishSubscribeChannel()
Jms.publishSubscribeChannel()
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub")
.get();
}
@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
.destination("pubsub")
.get();
}
类似的子流组合提供了该方法。publish-subscribe
.routeToRecipients()
另一个示例是 using instead of on 方法。.discardFlow()
.discardChannel()
.filter()
值得特别关注。
请考虑以下示例:.route()
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
它继续像在常规映射中一样工作,但将该子流绑定到主流。
换句话说,任何路由器的子流在 之后都会返回到主流。.channelMapping()
Router
.subFlowMapping()
.route()
有时,您需要从 中引用 existing 。
以下示例显示了如何执行此操作:
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. 当您将子流配置为 lambda 时,框架会处理与子流的请求-回复交互,并且不需要网关。 |
子流可以嵌套到任何深度,但我们不建议这样做。 事实上,即使在路由器的情况下,在流中添加复杂的子流也会很快开始看起来像一盘意大利面,人类很难解析。
在 DSL 支持子流配置的情况下,当正在配置的组件通常需要通道,并且该子流以元素开头时,框架会在组件输出通道和流的输入通道之间隐式放置一个。
例如,在此定义中:
框架在内部创建一个 Bean,用于注入到 .
然后,它将子流包装到订阅的此隐式渠道的开头,并将 a 放在流中指定的之前。
当现有 bean 用作子流引用(而不是内联子流,例如 lambda)时,不需要这样的桥,因为框架可以解析流 bean 中的第一个通道。
对于内联子流,输入通道尚不可用。 |
使用协议适配器
到目前为止显示的所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型来支持消息传递体系结构。 但是,我们还没有做任何真正的集成。 这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源,或者访问本地文件系统。 Spring 集成支持所有这些以及更多。 理想情况下,DSL 应该为所有这些提供一流的支持,但是实现所有这些并跟上 Spring Integration 中新适配器的添加是一项艰巨的任务。 因此,期望 DSL 不断赶上 Spring 集成。
因此,我们提供了高级 API 来无缝定义特定于协议的消息收发。
我们使用 Factory 和 Builder 模式以及 lambda 来执行此操作。
你可以将工厂类视为“名称空间工厂”,因为它们与来自具体协议特定的 Spring 集成模块的组件的 XML 名称空间起着相同的作用。
目前, Spring 集成 Java DSL 支持 、 和命名空间工厂。
以下示例演示如何使用其中的三个 (、 和 ):Amqp
Feed
Jms
Files
(S)Ftp
Http
JPA
MongoDb
TCP/UDP
Mail
WebFlux
Scripts
Amqp
Jms
Mail
@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlow.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}
@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlow.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}
前面的示例展示了如何使用“命名空间工厂”作为内联适配器声明。
但是,您可以在定义中使用它们,以使方法链更具可读性。@Bean
IntegrationFlow
在将精力投入到其他命名空间工厂之前,我们会征求社区对这些命名空间工厂的反馈。 我们也感谢对我们接下来应该支持的适配器和网关的优先级的任何意见。 |
您可以在本参考手册中特定于协议的章节中找到更多 Java DSL 示例。
所有其他协议通道适配器都可以配置为通用 bean 并连接到,如下例所示:IntegrationFlow
@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}
@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlow.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}
@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}
IntegrationFlowAdapter
该接口可以直接实现并指定为扫描组件,如下例所示:IntegrationFlow
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
它由 选取,并在应用程序上下文中正确解析和注册。IntegrationFlowBeanPostProcessor
为了方便并获得松散耦合架构的好处,我们提供了 Base Class 实现。
它需要一个方法实现来生成 by using one of methods,如下例所示:IntegrationFlowAdapter
buildFlow()
IntegrationFlowDefinition
from()
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new AtomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this::messageSource,
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("thing1", "THING1"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "T,H,I,N,G,2";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> thing1) {
return thing1.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String thing1) {
return payload + ":" + thing1;
}
}
动态和运行时集成流
IntegrationFlow
并且其所有依赖组件都可以在运行时注册。
在 5.0 版本之前,我们使用了 hook。
从 Spring Framework 开始,我们使用钩子进行编程注册。
下面的示例展示了如何以编程方式注册一个 Bean:BeanFactory.registerSingleton()
5.0
instanceSupplier
BeanDefinition
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
请注意,在前面的示例中,钩子是方法的最后一个参数,在本例中由 lambda 提供。instanceSupplier
genericBeanDefinition
所有必要的 bean 初始化和生命周期都是自动完成的,就像使用标准上下文配置 bean 定义一样。
为了简化开发体验, Spring 集成引入了在运行时注册和管理实例,如下例所示:IntegrationFlowContext
IntegrationFlow
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
当我们有多个配置选项并且必须创建类似流的多个实例时,这非常有用。
为此,我们可以迭代我们的选项并在循环中创建和注册实例。
另一种变体是当我们的数据源不是基于 Spring 的,因此我们必须动态创建它。
这样的示例是 Reactive Streams 事件源,如下例所示:IntegrationFlow
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
(作为 的结果)可用于为 to register 指定 bean 名称,以控制其 ,并注册非 Spring 集成 bean。
通常,这些额外的 bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和反序列化器或任何其他必需的支持组件。IntegrationFlowRegistrationBuilder
IntegrationFlowContext.registration()
IntegrationFlow
autoStartup
当您不再需要动态注册的 bean 及其所有依赖 bean 时,可以使用回调来删除它们。
有关更多信息,请参见 IntegrationFlowContext
Javadoc。IntegrationFlowRegistration.destroy()
IntegrationFlow
从版本 5.0.6 开始,定义中所有生成的 Bean 名称都以流 ID 作为前缀。
我们建议始终指定显式流 ID。
否则,将在 中启动同步屏障,以生成 的 bean 名称并注册其 bean。
我们在这两个操作上进行同步,以避免在相同的生成的 bean 名称可能用于不同的实例时出现竞争条件。IntegrationFlow IntegrationFlowContext IntegrationFlow IntegrationFlow |
此外,从版本 5.0.6 开始,注册生成器 API 有一个新方法:.
如果您希望声明同一流的多个实例,并在流中的组件具有相同的 ID 时避免 bean 名称冲突,这将非常有用,如下例所示:useFlowIdAsPrefix()
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
在这种情况下,可以使用 name 为 的 bean 引用第一个流的消息处理程序。tcp1.client.handler
使用 时,需要属性。id useFlowIdAsPrefix() |
IntegrationFlow
作为网关
可以从提供组件的服务接口开始,如下例所示:IntegrationFlow
GatewayProxyFactoryBean
public interface ControlBusGateway {
void send(String command);
}
...
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from(ControlBusGateway.class)
.controlBus()
.get();
}
接口方法的所有代理都随通道一起提供,用于将消息发送到 中的下一个集成组件。
您可以使用注释标记服务接口,并使用注释标记方法。
尽管如此,the 还是被 中的下一个组件的内部通道忽略并覆盖。
否则,使用 创建此类配置将没有意义。IntegrationFlow
@MessagingGateway
@Gateway
requestChannel
IntegrationFlow
IntegrationFlow
默认情况下, a 获取常规的 bean 名称,例如 .
您可以使用 attribute 或重载的 factory method 来更改该 ID。
此外,接口上标注中的所有属性都将应用于 target 。
当注释配置不适用时,该变体可用于为目标代理提供适当的选项。
此 DSL 方法从版本 5.2 开始可用。GatewayProxyFactoryBean
[FLOW_BEAN_NAME.gateway]
@MessagingGateway.name()
IntegrationFlow.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)
@MessagingGateway
GatewayProxyFactoryBean
Consumer<GatewayProxySpec>
使用 Java 8,您甚至可以创建具有接口的集成网关,如下例所示:java.util.function
@Bean
public IntegrationFlow errorRecovererFlow() {
return IntegrationFlow.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
.<Object>handle((p, h) -> {
throw new RuntimeException("intentional");
}, e -> e.advice(retryAdvice()))
.get();
}
可以按如下方式使用:errorRecovererFlow
@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;
DSL 扩展
从版本 5.3 开始,引入了一个,以允许使用自定义或组合的 EIP 运算符扩展现有的 Java DSL。
所需要的只是这个类的扩展,它提供可以在 bean 定义中使用的方法。
扩展类也可用于自定义配置;例如,可以在现有扩展中实施 missed 或 default 选项。
以下示例演示了复合自定义运算符和默认自定义扩展的用法 :IntegrationFlowExtension
IntegrationFlow
IntegrationComponentSpec
IntegrationComponentSpec
AggregatorSpec
outputProcessor
public class CustomIntegrationFlowDefinition
extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {
public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
return split()
.transform("payload.toUpperCase()");
}
public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
return register(new CustomAggregatorSpec(), aggregator);
}
}
public class CustomAggregatorSpec extends AggregatorSpec {
CustomAggregatorSpec() {
outputProcessor(group ->
group.getMessages()
.stream()
.map(Message::getPayload)
.map(String.class::cast)
.collect(Collectors.joining(", ")));
}
}
对于方法链流,这些扩展中的新 DSL 运算符必须返回扩展类。
这样,目标定义将与新的和现有的 DSL 运算符一起使用:IntegrationFlow
@Bean
public IntegrationFlow customFlowDefinition() {
return
new CustomIntegrationFlowDefinition()
.log()
.upperCaseAfterSplit()
.channel("innerChannel")
.customAggregate(customAggregatorSpec ->
customAggregatorSpec.expireGroupsUponCompletion(true))
.logAndReply();
}
集成流组合
由于 Spring Integration 中的抽象是一等公民,因此始终假定集成流的组合。
流中任何终端节点的输入通道都可用于从任何其他终端节点发送消息,而不仅仅是从将此通道作为输出的终端节点发送消息。
此外,使用 contract、Content Enricher 组件、复合端点(如 ,以及现在的 bean)(例如 ),在较短的、可重用的部分之间分配业务逻辑非常简单。
最终组合所需的只是有关 a 的 发送 或 接收 的知识。MessageChannel
@MessagingGateway
<chain>
IntegrationFlow
IntegrationFlowAdapter
MessageChannel
从 version 开始,为了从最终用户中抽象出更多内容并隐藏实现细节,引入了 factory 方法,以允许从现有流的输出中启动电流:5.5.4
MessageChannel
IntegrationFlow
from(IntegrationFlow)
IntegrationFlow
@Bean
IntegrationFlow templateSourceFlow() {
return IntegrationFlow.fromSupplier(() -> "test data")
.channel("sourceChannel")
.get();
}
@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
return IntegrationFlow.from(templateSourceFlow)
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("compositionMainFlowResult"))
.get();
}
另一方面,它添加了一个终端运算符,用于在一些其他流的输入通道处继续电流:IntegrationFlowDefinition
to(IntegrationFlow)
@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
return f -> f
.<String, String>transform(String::toUpperCase)
.to(otherFlow);
}
@Bean
IntegrationFlow otherFlow() {
return f -> f
.<String, String>transform(p -> p + " from other flow")
.channel(c -> c.queue("otherFlowResultChannel"));
}
流中间的组合可以通过现有的 EIP 方法轻松实现。
通过这种方式,我们可以从更简单、可重用的逻辑块组合流,从而构建任何复杂的流。
例如,你可以添加一个 bean 库作为依赖项,只需将它们的配置类导入到最终项目中并针对你的定义进行自动装配就足够了。gateway(IntegrationFlow)
IntegrationFlow
IntegrationFlow