此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cn

事件路由

在 Spring Cloud Stream 的上下文中,事件路由是 a) 将事件路由到特定事件订阅者b) 将事件订阅者生成的事件路由到特定目标的能力。 在这里,我们将其称为路由 'TO' 和路由 'FROM'。spring-doc.cn

路由到使用者

路由可以依靠 Spring Cloud Function 3.0 中的 available 来实现。您需要做的就是通过 application property 或 provide property 启用它。 启用后,将绑定到输入目标 接收所有消息,并根据提供的指令将它们路由到其他函数。RoutingFunction--spring.cloud.stream.function.routing.enabled=truespring.cloud.function.routing-expressionRoutingFunctionspring-doc.cn

为了绑定,路由目标的名称为 (请参阅 RoutingFunction.FUNCTION_NAME 和 binding naming convention [Functional binding names])。functionRouter-in-0

指令可以通过单个消息以及应用程序属性来提供。spring-doc.cn

以下是几个示例:spring-doc.cn

使用消息标头

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class,
                       "--spring.cloud.stream.function.routing.enabled=true");
	}

	@Bean
	public Consumer<String> even() {
		return value -> {
			System.out.println("EVEN: " + value);
		};
	}

	@Bean
	public Consumer<String> odd() {
		return value -> {
			System.out.println("ODD: " + value);
		};
    }
}

通过向 Binder 公开的目标(即 rabbit、kafka)发送消息, 此类消息将被路由到适当的 ('even' 或 'odd') Consumer 。functionRouter-in-0spring-doc.cn

默认情况下,将查找 or (对于 SPEL 的更动态场景) header 的 Header,如果找到,它的值将被视为路由指令。RoutingFunctionspring.cloud.function.definitionspring.cloud.function.routing-expressionspring-doc.cn

例如 将 header 设置为 value 最终会半随机地将请求路由到 OR 函数。 此外,对于 SPEL ,评估上下文的根对象是这样你也可以对单个 Headers(或消息)进行评估spring.cloud.function.routing-expressionT(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd'oddevenMessage…​.routing-expression=headers['type']spring-doc.cn

使用应用程序属性

and/or 可以作为应用程序属性传递(例如,.spring.cloud.function.routing-expressionspring.cloud.function.definitionspring.cloud.function.routing-expression=headers['type']spring-doc.cn

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}
通过应用程序属性传递指令对于响应式函数尤其重要,因为响应式 函数仅调用一次以传递 Publisher,因此对各个项目的访问是有限的。

路由功能和输出绑定

RoutingFunction是 a,因此其处理方式与任何其他函数没有什么不同。井。。。几乎。Functionspring-doc.cn

当路由到另一个 时,其输出将发送到 which 正如预期的那样。但是,如果路由到 ?换句话说,调用 可能不会产生任何要发送到 output 绑定的内容,因此甚至需要一个。 因此,我们在创建 Binding 时确实会有所不同。即使它对作为用户的您是透明的 (你真的没什么可做的),了解一些机制会帮助你理解它的内部运作。RoutingFunctionFunctionRoutingFunctionfunctionRouter-in-0RoutingFunctionConsumerRoutingFunctionRoutingFunctionspring-doc.cn

所以,规则是; 我们从不为 , 仅为 input 创建输出绑定。因此,当您路由到 时,实际上 变为 a 时,没有任何输出绑定。但是,如果碰巧路由到另一个生成 output 的 output 绑定将动态创建,此时将充当 Bindings 的常规 (同时具有 Input 和 Output Bindings)。RoutingFunctionConsumerRoutingFunctionConsumerRoutingFunctionFunctionRoutingFunctionRoutingFunctionFunctionspring-doc.cn

路由 FROM Consumer

除了静态目标之外,Spring Cloud Stream 还允许应用程序将消息发送到动态绑定的目标。 例如,当需要在运行时确定目标时,这非常有用。 应用程序可以通过以下两种方式之一来实现此目的。spring-doc.cn

spring.cloud.stream.sendto.destination

您还可以委托给框架,通过指定标头来动态解析输出目标 设置为 要解析的目标的名称。spring.cloud.stream.sendto.destinationspring-doc.cn

请考虑以下示例:spring-doc.cn

@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {

    @Bean
	public Function<String, Message<String>> destinationAsPayload() {
		return value -> {
			return MessageBuilder.withPayload(value)
				.setHeader("spring.cloud.stream.sendto.destination", value).build();};
	}
}

尽管在此示例中您可以清楚地看到,但我们的输出是一个带有 header 的 Message 设置为 He input 参数的值。框架将查阅此标头,并尝试创建或发现 具有该名称的目标,并将输出发送到该目标。spring.cloud.stream.sendto.destinationspring-doc.cn

如果事先知道目标名称,则可以像配置任何其他目标一样配置创建者属性。 或者,如果注册一个 Bean,则会在创建绑定之前调用它。 回调采用 Binder 使用的扩展生产者属性的泛型类型。 它有一个方法:NewDestinationBindingCallback<>spring-doc.cn

void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例显示了如何使用 RabbitMQ Binder:spring-doc.cn

@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}
如果需要支持具有多个 Binder 类型的动态目标,请使用 for the generic type 并根据需要强制转换参数。Objectextended

此外,请参阅 [使用 StreamBridge] 部分,了解如何将另一个选项 (StreamBridge) 用于类似情况。spring-doc.cn