This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4! |
Post processing (after sending message)
Once function is invoked, its result is sent by the framework to a target destination which effectively completes function invocation cycle.
However such cycle may not be fully complete from the business standpoint until some additional tasks are performed after completion of this cycle.
While this could be accomplished with a simple combination of Consumer
and StreamBridge
as described in this
Stack Overflow post, since version 4.0.3 the framework
provides a more idiomatic approach to solve this issue via PostProcessingFunction
provided by Spring Cloud Function project.
The PostProcessingFunction
is a special semi-marker function which contains one additional method postProcess(Message>)
designed
to provide a place for implementing such post processing task.
package org.springframework.cloud.function.context . . . public interface PostProcessingFunction<I, O> extends Function<I, O> { default void postProcess(Message<O> result) { } }
So, now you have two options.
Option 1: You can implement your function as PostProcessingFunction
and also include the additional post processing behavior by implementing its postProcess(Message>)
method.
private static class Uppercase implements PostProcessingFunction<String, String> { @Override public String apply(String input) { return input.toUpperCase(); } @Override public void postProcess(Message<String> result) { System.out.println("Function Uppercase has been successfully invoked and its result successfully sent to target destination"); } } . . . @Bean public Function<String, String> uppercase() { return new Uppercase(); }
Option 2: If you already have an existing function and don’t want to change its implementation or want to keep your function as POJO, you can simply implement only postProcess(Message>)
method and compose this new post processing function with your other function.
private static class Logger implements PostProcessingFunction<?, String> { @Override public void postProcess(Message<String> result) { System.out.println("Function has been successfully invoked and its result successfully sent to target destination"); } } . . . @Bean public Function<String, String> uppercase() { return v -> v.toUpperCase(); } @Bean public Function<String, String> logger() { return new Logger(); } . . . // and then have your function definition as such `uppercase|logger`
NOTE:
In case of function composition only the last instance of PostProcessingFunction
(if present) will take effect. For example, let’s say you have the
following function definition - foo|bar|baz
and both foo
and baz
are instances of PostProcessingFunction
. Only baz.postProcess(Message>)
will be invoked.
If baz
is not an instance of PostProcessingFunction
, then no post processing functionality will be performed.
One may argue that you can easily do that via function composition by simply composing a post-processor as just another Function
. That is indeed a possibility however
the post processing functionality in this case will be invoked right after invocation of the previous function and before the message is sent to a target destination
which is before the function invocation cycle is complete.