This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4! |
Event Routing
Event Routing, in the context of Spring Cloud Stream, is the ability to either a) route events to a particular event subscriber or b) route events produced by an event subscriber to a particular destination. Here we’ll refer to it as route ‘TO’ and route ‘FROM’.
Routing TO Consumer
Routing can be achieved by relying on RoutingFunction
available in Spring Cloud Function 3.0. All you need to do is enable it via
--spring.cloud.stream.function.routing.enabled=true
application property or provide spring.cloud.function.routing-expression
property.
Once enabled RoutingFunction
will be bound to input destination
receiving all the messages and route them to other functions based on the provided instruction.
For the purposes of binding the name of the routing destination is functionRouter-in-0
(see RoutingFunction.FUNCTION_NAME and binding naming convention [Functional binding names]).
|
Instruction could be provided with individual messages as well as application properties.
Here are couple of samples:
Using message headers
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class,
"--spring.cloud.stream.function.routing.enabled=true");
}
@Bean
public Consumer<String> even() {
return value -> {
System.out.println("EVEN: " + value);
};
}
@Bean
public Consumer<String> odd() {
return value -> {
System.out.println("ODD: " + value);
};
}
}
By sending a message to the functionRouter-in-0
destination exposed by the binder (i.e., rabbit, kafka),
such message will be routed to the appropriate (‘even’ or ‘odd’) Consumer.
By default RoutingFunction
will look for a spring.cloud.function.definition
or spring.cloud.function.routing-expression
(for more dynamic scenarios with SpEL)
header and if it is found, its value will be treated as the routing instruction.
For example,
setting spring.cloud.function.routing-expression
header to value T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd'
will end up semi-randomly routing request to either odd
or even
functions.
Also, for SpEL, the root object of the evaluation context is Message
so you can do evaluation on individual headers (or message) as well ….routing-expression=headers['type']
Using application properties
The spring.cloud.function.routing-expression
and/or spring.cloud.function.definition
can be passed as application properties (e.g., spring.cloud.function.routing-expression=headers['type']
.
@SpringBootApplication
public class RoutingStreamApplication {
public static void main(String[] args) {
SpringApplication.run(RoutingStreamApplication.class,
"--spring.cloud.function.routing-expression="
+ "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
}
@Bean
public Consumer<Integer> even() {
return value -> System.out.println("EVEN: " + value);
}
@Bean
public Consumer<Integer> odd() {
return value -> System.out.println("ODD: " + value);
}
}
Passing instructions via application properties is especially important for reactive functions given that a reactive function is only invoked once to pass the Publisher, so access to the individual items is limited. |
Routing Function and output binding
RoutingFunction
is a Function
and as such treated no differently than any other function. Well. . . almost.
When RoutingFunction
routes to another Function
, its output is sent to the output binding of the RoutingFunction
which
is functionRouter-in-0
as expected. But what if RoutingFunction
routes to a Consumer
? In other words the result of invocation
of the RoutingFunction
may not produce anything to be sent to the output binding, thus making it necessary to even have one.
So, we do treat RoutingFunction
a little bit differently when we create bindings. And even though it is transparent to you as a user
(there is really nothing for you to do), being aware of some of the mechanics would help you understand its inner workings.
So, the rule is;
We never create output binding for the RoutingFunction
, only input. So when you routing to Consumer
, the RoutingFunction
effectively
becomes as a Consumer
by not having any output bindings. However, if RoutingFunction
happen to route to another Function
which produces
the output, the output binding for the RoutingFunction
will be create dynamically at which point RoutingFunction
will act as a regular Function
with regards to bindings (having both input and output bindings).
Routing FROM Consumer
Aside from static destinations, Spring Cloud Stream lets applications send messages to dynamically bound destinations. This is useful, for example, when the target destination needs to be determined at runtime. Applications can do so in one of two ways.
spring.cloud.stream.sendto.destination
You can also delegate to the framework to dynamically resolve the output destination by specifying spring.cloud.stream.sendto.destination
header
set to the name of the destination to be resolved.
Consider the following example:
@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {
@Bean
public Function<String, Message<String>> destinationAsPayload() {
return value -> {
return MessageBuilder.withPayload(value)
.setHeader("spring.cloud.stream.sendto.destination", value).build();};
}
}
Albeit trivial you can clearly see in this example, our output is a Message with spring.cloud.stream.sendto.destination
header
set to the value of he input argument. The framework will consult this header and will attempt to create or discover
a destination with that name and send output to it.
If destination names are known in advance, you can configure the producer properties as with any other destination.
Alternatively, if you register a NewDestinationBindingCallback<>
bean, it is invoked just before the binding is created.
The callback takes the generic type of the extended producer properties used by the binder.
It has one method:
void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
T extendedProducerProperties);
The following example shows how to use the RabbitMQ binder:
@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
return (name, channel, props, extended) -> {
props.setRequiredGroups("bindThisQueue");
extended.setQueueNameGroupOnly(true);
extended.setAutoBindDlq(true);
extended.setDeadLetterQueueName("myDLQ");
};
}
If you need to support dynamic destinations with multiple binder types, use Object for the generic type and cast the extended argument as needed.
|
Also, please see [Using StreamBridge] section to see how yet another option (StreamBridge) can be utilized for similar cases.