在 Spring Cloud Stream 的上下文中,事件路由是 a) 将事件路由到特定事件订阅者或 b) 将事件订阅者生成的事件路由到特定目标的功能。 在这里,我们将其称为路由“TO”和路由“FROM”。
路由到消费者
路由可以依靠 Spring Cloud Function 3.0 中的 available。您需要做的就是通过应用程序属性或提供属性来启用它。
启用后将绑定到输入目标
接收所有消息,并根据提供的指令将它们路由到其他功能。RoutingFunction
--spring.cloud.stream.function.routing.enabled=true
spring.cloud.function.routing-expression
RoutingFunction
出于绑定的目的,路由目标的名称是(请参阅RoutingFunction.FUNCTION_NAME和绑定命名约定 [功能绑定名称])。functionRouter-in-0 |
指令可以与单个消息以及应用程序属性一起提供。
下面是几个示例:
使用邮件头
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class,
"--spring.cloud.stream.function.routing.enabled=true");
}
@Bean
public Consumer<String> even() {
return value -> {
System.out.println("EVEN: " + value);
};
}
@Bean
public Consumer<String> odd() {
return value -> {
System.out.println("ODD: " + value);
};
}
}
通过向活页夹暴露的目的地(即 rabbit、kafka)发送消息,
此类消息将被路由到相应的(“偶数”或“奇数”)消费者。functionRouter-in-0
默认情况下,将查找 或 (对于使用 SpEL 的更多动态方案)
header,如果找到,则其值将被视为路由指令。RoutingFunction
spring.cloud.function.definition
spring.cloud.function.routing-expression
例如
将标头设置为值最终将半随机地将请求路由到或函数。
此外,对于 SpEL,评估上下文的根对象是,您也可以对单个标头(或消息)进行评估spring.cloud.function.routing-expression
T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd'
odd
even
Message
….routing-expression=headers['type']
使用应用程序属性
和/或可以作为应用程序属性传递(例如,.spring.cloud.function.routing-expression
spring.cloud.function.definition
spring.cloud.function.routing-expression=headers['type']
@SpringBootApplication
public class RoutingStreamApplication {
public static void main(String[] args) {
SpringApplication.run(RoutingStreamApplication.class,
"--spring.cloud.function.routing-expression="
+ "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
}
@Bean
public Consumer<Integer> even() {
return value -> System.out.println("EVEN: " + value);
}
@Bean
public Consumer<Integer> odd() {
return value -> System.out.println("ODD: " + value);
}
}
通过应用程序属性传递指令对于响应式函数尤为重要,因为响应式函数 函数仅调用一次即可传递发布服务器,因此对单个项目的访问受到限制。 |
路由函数和输出绑定
RoutingFunction
是一个,因此与任何其他功能没有什么不同。井。。。几乎。Function
当路由到另一个时,它的输出被发送到
正如预期的那样。但是,如果路由到 ?换言之,调用的结果
的可能不产生任何要发送到输出绑定的内容,因此甚至有必要有一个。
因此,我们在创建绑定时确实会有所不同。即使它对作为用户的您来说是透明的
(你真的无事可做),了解一些机制将有助于你理解它的内部运作。RoutingFunction
Function
RoutingFunction
functionRouter-in-0
RoutingFunction
Consumer
RoutingFunction
RoutingFunction
所以,规则是;
我们从不为 ,只为 输入创建输出绑定。因此,当您路由 到 时,有效地
通过没有任何输出绑定而变为 a。但是,如果碰巧路由到另一个产生
输出,将动态创建的输出绑定,此时将充当绑定(同时具有输入和输出绑定)的常规。RoutingFunction
Consumer
RoutingFunction
Consumer
RoutingFunction
Function
RoutingFunction
RoutingFunction
Function
出于绑定的目的,路由目标的名称是(请参阅RoutingFunction.FUNCTION_NAME和绑定命名约定 [功能绑定名称])。functionRouter-in-0 |
通过应用程序属性传递指令对于响应式函数尤为重要,因为响应式函数 函数仅调用一次即可传递发布服务器,因此对单个项目的访问受到限制。 |
路由 FROM 使用者
除了静态目标之外,Spring Cloud Stream 还允许应用程序将消息发送到动态绑定的目标。 例如,当需要在运行时确定目标目标时,这很有用。 应用程序可以通过以下两种方式之一执行此操作。
spring.cloud.stream.sendto.destination
您还可以委托给框架,通过指定标头来动态解析输出目标
设置为要解析的目标的名称。spring.cloud.stream.sendto.destination
请看以下示例:
@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {
@Bean
public Function<String, Message<String>> destinationAsPayload() {
return value -> {
return MessageBuilder.withPayload(value)
.setHeader("spring.cloud.stream.sendto.destination", value).build();};
}
}
尽管在此示例中可以清楚地看到微不足道,但我们的输出是带有标头的消息
设置为 he input 参数的值。该框架将参考此标头,并尝试创建或发现
具有该名称的目标,并向其发送输出。spring.cloud.stream.sendto.destination
如果预先知道目标名称,则可以像配置任何其他目标一样配置生产者属性。
或者,如果注册一个 Bean,则会在创建绑定之前调用它。
回调采用绑定器使用的扩展生产者属性的泛型类型。
它有一种方法:NewDestinationBindingCallback<>
void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
T extendedProducerProperties);
以下示例演示如何使用 RabbitMQ 绑定程序:
@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
return (name, channel, props, extended) -> {
props.setRequiredGroups("bindThisQueue");
extended.setQueueNameGroupOnly(true);
extended.setAutoBindDlq(true);
extended.setDeadLetterQueueName("myDLQ");
};
}
如果需要支持具有多个活页夹类型的动态目标,请使用泛型类型并根据需要强制转换参数。Object extended |
另外,请参阅 [使用 StreamBridge] 部分,了解如何将另一个选项 (StreamBridge) 用于类似情况。
如果需要支持具有多个活页夹类型的动态目标,请使用泛型类型并根据需要强制转换参数。Object extended |