WebFlux 支持
WebFlux 支持
WebFlux Spring 集成模块 () 允许以反应方式执行 HTTP 请求和处理入站 HTTP 请求。spring-integration-webflux
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.1.9"
如果服务器配置不是基于 Servlet 的,则必须包含依赖项。io.projectreactor.netty:reactor-netty
WebFlux 支持包括以下网关实现:和 .
该支持完全基于 Spring WebFlux 和 Project Reactor 基础。
有关更多信息,请参阅 HTTP 支持,因为许多选项在反应式和常规 HTTP 组件之间共享。WebFluxInboundEndpoint
WebFluxRequestExecutingMessageHandler
WebFlux 命名空间支持
Spring 集成提供了一个名称空间和相应的模式定义。
要将其包含在您的配置中,请在您的应用程序上下文配置文件中添加以下命名空间声明:webflux
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux 入站组件
从版本 5.0 开始,提供了 的实现。
此组件类似于基于 MVC 的组件,它通过新提取的 .
它用于 Spring WebFlux 反应式环境(而不是 MVC)。
以下示例显示了 WebFlux 端点的简单实现:WebFluxInboundEndpoint
WebHandler
HttpRequestHandlingEndpointSupport
BaseHttpInboundEndpoint
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
该配置类似于(在示例之前提到),不同之处在于我们用于将 WebFlux 基础设施添加到我们的集成应用程序中。
此外,它还使用反应式 HTTP 服务器实现提供的背压、基于需求的功能对下游流执行操作。HttpRequestHandlingEndpointSupport
@EnableWebFlux
WebFluxInboundEndpoint
sendAndReceive
回复部分也是非阻塞的,并且基于 internal ,它被平面映射到按需解析的回复。FutureReplyChannel Mono |
您可以使用自定义、 a 甚至 进行配置 。
后者提供了一种机制,您可以使用该机制将回复作为任何反应类型返回: Reactor , RxJava , 等。
这样,我们就可以使用 Spring 集成组件实现 Server Sent Events 场景,如下例所示:WebFluxInboundEndpoint
ServerCodecConfigurer
RequestedContentTypeResolver
ReactiveAdapterRegistry
Flux
Observable
Flowable
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
有关更多可能的配置选项,请参阅请求映射支持和跨域资源共享 (CORS) 支持。
当请求体为空或返回时,请求参数 () 用于要处理的目标消息。payloadExpression
null
MultiValueMap<String, String>
payload
有效载荷验证
从版本 5.2 开始,可以使用 配置一个 .
与 HTTP 支持中的 MVC 验证不同,它用于在执行回退和函数之前验证请求已转换为 的元素。
框架无法假设在构建最终有效负载后对象的复杂程度。
如果需要限制确切的最终有效负载(或其元素)的验证可见性,则验证应进入下游而不是 WebFlux 端点。
请参阅 Spring WebFlux 文档中的更多信息。
无效的有效负载被拒绝,其中包含所有验证 。
在 Spring Framework 参考手册中有关验证的更多信息。WebFluxInboundEndpoint
Validator
Publisher
HttpMessageReader
payloadExpression
Publisher
Publisher
IntegrationWebExchangeBindException
WebExchangeBindException
Errors
WebFlux 出站组件
(从版本 5.0 开始)实现类似于 。
它使用来自 Spring Framework WebFlux 模块的模块。
要配置它,请定义类似于以下内容的 Bean:WebFluxRequestExecutingMessageHandler
HttpRequestExecutingMessageHandler
WebClient
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="http://localhost/test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="http://localhost/example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
该操作返回一个 ,该 将映射(通过使用多个步骤)作为 的输出。
与 as an 一起,评估将推迟到进行下游订阅。
否则,它被视为一种模式,并且响应将适应 a 以用于来自 的异步应答。
输出消息的目标负载取决于配置。
的 or 标识响应正文元素转换的目标类型。
如果设置为 ,则响应正文将转换为 a 并为每个元素提供,并作为有效载荷向下游发送。
之后,你可以使用 splitter 以响应式方式迭代它。WebClient
exchange()
Mono<ClientResponse>
Mono.map()
AbstractIntegrationMessageBuilder
WebFluxRequestExecutingMessageHandler
ReactiveChannel
outputChannel
Mono<ClientResponse>
async
Mono
SettableListenableFuture
WebFluxRequestExecutingMessageHandler
WebFluxRequestExecutingMessageHandler
setExpectedResponseType(Class<?>)
setExpectedResponseTypeExpression(Expression)
replyPayloadToFlux
true
Flux
expectedResponseType
Flux
Flux
此外,可以将 a 注入到 the 而不是 和 属性中。
它可用于对 正文和 HTTP 标头转换进行低级访问和更多控制。
Spring 集成提供作为标识函数来生成(下游)整个和任何其他可能的自定义 logic。BodyExtractor<?, ClientHttpResponse>
WebFluxRequestExecutingMessageHandler
expectedResponseType
replyPayloadToFlux
ClientHttpResponse
ClientHttpResponseBodyExtractor
ClientHttpResponse
从版本 5.2 开始,支持反应式、 和 类型作为请求消息有效负载。
A 在内部用于填充到 .
当有效负载是 reactive 时,已配置 或 可用于确定发布者元素类型的类型。
表达式必须解析为 ,该表达式解析为 target 或 。WebFluxRequestExecutingMessageHandler
Publisher
Resource
MultiValueMap
BodyInserter
WebClient.RequestBodySpec
Publisher
publisherElementType
publisherElementTypeExpression
Class<?>
String
Class<?>
ParameterizedTypeReference
从版本 5.5 开始,它公开了一个标志(默认情况下是)以仅返回响应正文,或将整体作为回复消息负载返回,独立于提供的 或 。
如果 中不存在正文,则忽略此标志并返回整体。WebFluxRequestExecutingMessageHandler
extractResponseBody
true
ResponseEntity
expectedResponseType
replyPayloadToFlux
ResponseEntity
ResponseEntity
有关更多可能的配置选项,请参阅 HTTP 出站组件。
WebFlux 标头映射
由于 WebFlux 组件完全基于 HTTP 协议,因此 HTTP 标头映射没有区别。 请参阅 HTTP Header Mappings 了解用于映射 Headers 的更多可能选项和组件。
WebFlux 请求属性
从版本 6.0 开始,可以将 配置为通过 评估请求属性。
必须在 中评估此 SPEL 表达式。
然后,此类映射将传播到 HTTP 请求配置回调。
如果需要将键值对象形式的信息传递给请求,并且下游过滤器将可以访问这些属性以进行进一步处理,这将很有帮助。WebFluxRequestExecutingMessageHandler
setAttributeVariablesExpression()
Map
WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)
Message