此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

WebFlux Spring 集成模块 () 允许以响应方式执行 HTTP 请求和处理入站 HTTP 请求。spring-integration-webfluxSpring中文文档

您需要将此依赖项包含在项目中:Spring中文文档

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-webflux</artifactId>
    <version>6.3.2-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.3.2-SNAPSHOT"

在不基于 Servlet 的服务器配置的情况下,必须包含依赖关系。io.projectreactor.netty:reactor-nettySpring中文文档

WebFlux 支持由以下网关实现组成:和 . 该支持完全基于 Spring WebFluxProject Reactor 基础。 有关详细信息,请参阅 HTTP 支持,因为许多选项在响应式 HTTP 组件和常规 HTTP 组件之间共享。WebFluxInboundEndpointWebFluxRequestExecutingMessageHandlerSpring中文文档

WebFlux 命名空间支持

Spring Integration 提供了命名空间和相应的模式定义。 若要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:webfluxSpring中文文档

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/webflux
    https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
    ...
</beans>

WebFlux 入站组件

从 5.0 版开始,提供了 的实现。 这个组件类似于基于MVC的组件,它通过新提取的 . 它用于 Spring WebFlux 反应式环境(而不是 MVC)。 以下示例显示了 WebFlux 终结点的简单实现:WebFluxInboundEndpointWebHandlerHttpRequestHandlingEndpointSupportBaseHttpInboundEndpointSpring中文文档

@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundChannelAdapter("/reactivePost")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
            .statusCodeFunction(m -> HttpStatus.ACCEPTED))
        .channel(c -> c.queue("storeChannel"))
        .get();
}
@Bean
fun inboundChannelAdapterFlow() =
    integrationFlow(
        WebFlux.inboundChannelAdapter("/reactivePost")
            .apply {
                requestMapping { m -> m.methods(HttpMethod.POST) }
                requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
                statusCodeFunction { m -> HttpStatus.ACCEPTED }
            })
    {
        channel { queue("storeChannel") }
    }
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {

    @Bean
    public WebFluxInboundEndpoint simpleInboundEndpoint() {
        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/test");
        endpoint.setRequestMapping(requestMapping);
        endpoint.setRequestChannelName("serviceChannel");
        return endpoint;
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    String service() {
        return "It works!";
    }

}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
    <int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>

该配置与(在示例之前提到的)类似,只是我们用于将 WebFlux 基础结构添加到我们的集成应用程序中。 此外,它还使用响应式 HTTP 服务器实现提供的基于按需的背压功能对下游流执行操作。HttpRequestHandlingEndpointSupport@EnableWebFluxWebFluxInboundEndpointsendAndReceiveSpring中文文档

回复部分也是非阻塞的,并且基于内部,它被平面映射到按需解决的回复。FutureReplyChannelMono

您可以使用自定义 、 甚至 . 后者提供了一种机制,您可以使用它作为任何反应式类型返回回复:Reactor、RxJava 等。 这样,我们就可以使用 Spring Integration 组件实现服务器发送事件场景,如以下示例所示:WebFluxInboundEndpointServerCodecConfigurerRequestedContentTypeResolverReactiveAdapterRegistryFluxObservableFlowableSpring中文文档

@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlow
            .from(WebFlux.inboundGateway("/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> Flux.just("foo", "bar", "baz"))
            .get();
}
@Bean
fun sseFlow() =
     integrationFlow(
            WebFlux.inboundGateway("/sse")
                       .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            {
                 handle { (p, h) -> Flux.just("foo", "bar", "baz") }
            }
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setPathPatterns("/sse");
    requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
    endpoint.setRequestMapping(requestMapping);
    endpoint.setRequestChannelName("requests");
    return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
                               path="test1"
                               auto-startup="false"
                               phase="101"
                               request-payload-type="byte[]"
                               error-channel="errorChannel"
                               payload-expression="payload"
                               supported-methods="PUT"
                               status-code-expression="'202'"
                               header-mapper="headerMapper"
                               codec-configurer="codecConfigurer"
                               reactive-adapter-registry="reactiveAdapterRegistry"
                               requested-content-type-resolver="requestedContentTypeResolver">
            <int-webflux:request-mapping headers="foo"/>
            <int-webflux:cross-origin origin="foo" method="PUT"/>
            <int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>

有关更多可能的配置选项,请参阅请求映射支持跨域资源共享 (CORS) 支持Spring中文文档

当请求正文为空或返回时,请求参数 () 用于要处理的目标消息。payloadExpressionnullMultiValueMap<String, String>payloadSpring中文文档

有效负载验证

从版本 5.2 开始,可以使用 . 与 HTTP 支持中的 MVC 验证不同,它用于在执行回退和函数之前验证请求已转换为的元素。 框架无法假设在构建最终有效负载后对象的复杂程度。 如果需要限制最终有效负载(或其元素)的验证可见性,则验证应转到下游,而不是 WebFlux 端点。 有关详细信息,请参阅 Spring WebFlux 文档。 无效的有效负载将被拒绝,其中包含所有验证。 在 Spring Framework 参考手册中查看有关验证的更多信息。WebFluxInboundEndpointValidatorPublisherHttpMessageReaderpayloadExpressionPublisherPublisherIntegrationWebExchangeBindExceptionWebExchangeBindExceptionErrorsSpring中文文档

回复部分也是非阻塞的,并且基于内部,它被平面映射到按需解决的回复。FutureReplyChannelMono

WebFlux 出站组件

(从版本 5.0 开始)实现类似于 。 它使用 Spring Framework WebFlux 模块中的 WebFlux 模块。 要配置它,请定义类似于以下内容的 Bean:WebFluxRequestExecutingMessageHandlerHttpRequestExecutingMessageHandlerWebClientSpring中文文档

@Bean
public IntegrationFlow outboundReactive() {
    return f -> f
        .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                        .queryParams(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
    integrationFlow {
        handle(
            WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                    .queryParams(m.getPayload())
                    .build()
                    .toUri()
            })
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String::class.java)
        )
    }
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
    WebFluxRequestExecutingMessageHandler handler =
        new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
    request-channel="requests"
    url="http://localhost/test"
    http-method-expression="headers.httpMethod"
    extract-request-payload="false"
    expected-response-type-expression="payload"
    charset="UTF-8"
    reply-timeout="1234"
    reply-channel="replies"/>

<int-webflux:outbound-channel-adapter id="reactiveExample2"
    url="http://localhost/example"
    http-method="GET"
    channel="requests"
    charset="UTF-8"
    extract-payload="false"
    expected-response-type="java.lang.String"
    order="3"
    auto-startup="false"/>

该操作返回 ,该 (通过使用多个步骤)映射到 an 作为 的输出。 与 as an 一起,评估将推迟到进行下游订阅。 否则,它被视为一种模式,并且响应将适应 a 以进行异步响应。 输出消息的目标有效负载取决于配置。 or 标识响应正文元素转换的目标类型。 如果设置为 ,则响应正文将转换为为每个元素提供的 a,并将其作为有效负载发送到下游。 之后,您可以使用拆分器以响应方式对其进行迭代。WebClientexchange()Mono<ClientResponse>Mono.map()AbstractIntegrationMessageBuilderWebFluxRequestExecutingMessageHandlerReactiveChanneloutputChannelMono<ClientResponse>asyncMonoSettableListenableFutureWebFluxRequestExecutingMessageHandlerWebFluxRequestExecutingMessageHandlersetExpectedResponseType(Class<?>)setExpectedResponseTypeExpression(Expression)replyPayloadToFluxtrueFluxexpectedResponseTypeFluxFluxSpring中文文档

此外,可以将 a 注入到 and 属性中。 它可用于对正文和 HTTP 标头转换的低级别访问和更多控制。 Spring Integration 提供标识函数来生成(下游)整体和任何其他可能的自定义逻辑。BodyExtractor<?, ClientHttpResponse>WebFluxRequestExecutingMessageHandlerexpectedResponseTypereplyPayloadToFluxClientHttpResponseClientHttpResponseBodyExtractorClientHttpResponseSpring中文文档

从版本 5.2 开始,支持响应式 、 和 类型作为请求消息有效负载。 在内部使用相应的 a 以填充到 . 当有效负载是反应式负载时,配置或可用于确定发布者的元素类型的类型。 表达式必须解析为 ,该解析为目标或 。WebFluxRequestExecutingMessageHandlerPublisherResourceMultiValueMapBodyInserterWebClient.RequestBodySpecPublisherpublisherElementTypepublisherElementTypeExpressionClass<?>StringClass<?>ParameterizedTypeReferenceSpring中文文档

从版本 5.5 开始,公开一个标志(默认情况下)以仅返回响应正文,或将整个作为回复消息有效负载返回,而与提供的 或 . 如果 中不存在正文,则忽略此标志并返回整体。WebFluxRequestExecutingMessageHandlerextractResponseBodytrueResponseEntityexpectedResponseTypereplyPayloadToFluxResponseEntityResponseEntitySpring中文文档

有关更多可能的配置选项,请参阅 HTTP 出站组件Spring中文文档

WebFlux 标头映射

由于 WebFlux 组件完全基于 HTTP 协议,因此 HTTP 标头映射没有区别。 请参阅 HTTP 标头映射,了解用于映射标头的更多可能选项和组件。Spring中文文档

WebFlux 请求属性

从版本 6.0 开始,可以配置为通过 评估请求属性。 此 SpEL 表达式必须在 中计算。 然后,此类映射将传播到 HTTP 请求配置回调。 如果需要将键值对象形式的信息传递到请求,并且下游筛选器将访问这些属性以进行进一步处理,这将很有帮助。WebFluxRequestExecutingMessageHandlersetAttributeVariablesExpression()MapWebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)MessageSpring中文文档