对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
使用协议适配器
到目前为止显示的所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型来支持消息传递体系结构。 但是,我们还没有做任何真正的集成。 这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源,或者访问本地文件系统。 Spring 集成支持所有这些以及更多。 理想情况下,DSL 应该为所有这些提供一流的支持,但是实现所有这些并跟上 Spring Integration 中新适配器的添加是一项艰巨的任务。 因此,期望 DSL 不断赶上 Spring 集成。
因此,我们提供了高级 API 来无缝定义特定于协议的消息收发。
我们使用 Factory 和 Builder 模式以及 lambda 来执行此操作。
你可以将工厂类视为“名称空间工厂”,因为它们与来自具体协议特定的 Spring 集成模块的组件的 XML 名称空间起着相同的作用。
目前, Spring 集成 Java DSL 支持 、 和命名空间工厂。
以下示例演示如何使用其中的三个 (、 和 ):Amqp
Feed
Jms
Files
(S)Ftp
Http
JPA
MongoDb
TCP/UDP
Mail
WebFlux
Scripts
Amqp
Jms
Mail
@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlow.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}
@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlow.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}
前面的示例展示了如何使用“命名空间工厂”作为内联适配器声明。
但是,您可以在定义中使用它们,以使方法链更具可读性。@Bean
IntegrationFlow
在将精力投入到其他命名空间工厂之前,我们会征求社区对这些命名空间工厂的反馈。 我们也感谢对我们接下来应该支持的适配器和网关的优先级的任何意见。 |
您可以在本参考手册中特定于协议的章节中找到更多 Java DSL 示例。
所有其他协议通道适配器都可以配置为通用 bean 并连接到,如下例所示:IntegrationFlow
@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}
@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlow.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}
@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}