Spring for Apache Pulsar provides basic suppport for Pulsar IO (connectors) and Pulsar Functions which allow users to define stream processing pipelines made up of sources
, processors
, and sinks
.
The sources
and sinks
are modeled by Pulsar IO (connectors) and the processors
are represented by Pulsar Functions.
Because connectors are just special functions, and for simplicity, we refer to sources, sinks and functions collectively as "Pulsar Functions". |
Because connectors are just special functions, and for simplicity, we refer to sources, sinks and functions collectively as "Pulsar Functions". |
1. Pulsar Function Administration
The framework provides the PulsarFunctionAdministration
component to manage Pulsar functions.
When you use the Pulsar Spring Boot starter, you get the PulsarFunctionAdministration
auto-configured.
By default, the application tries to connect to a local Pulsar instance at localhost:8080
.
However, because it leverages the already configured PulsarAdministration
, see Pulsar Admin Client for available client options (including authentication).
Additional configuration options are available with the spring.pulsar.function.*
application properties.
2. Automatic Function Management
On application startup, the framework finds all PulsarFunction
, PulsarSink
, and PulsarSource
beans in the application context.
For each bean, the corresponding Pulsar function is either created or updated.
The proper API is called based on function type, function config, and whether the function already exists.
The PulsarFunction , PulsarSink , and PulsarSource beans are simple wrappers around the Apache Pulsar config objects FunctionConfig , SinkConfig , and SourceConfig , respectively.
Due to the large number of supported connectors (and their varied configurations) the framework does not attempt to create a configuration properties hierarchy to mirror the varied Apache Pulsar connectors.
Instead, the burden is on the user to supply the full config object and then the framework handles the management (create/update) using the supplied config.
|
On application shutdown, all functions that were processed during application startup have their stop policy enforced and are either left alone, stopped, or deleted from the Pulsar server.
The PulsarFunction , PulsarSink , and PulsarSource beans are simple wrappers around the Apache Pulsar config objects FunctionConfig , SinkConfig , and SourceConfig , respectively.
Due to the large number of supported connectors (and their varied configurations) the framework does not attempt to create a configuration properties hierarchy to mirror the varied Apache Pulsar connectors.
Instead, the burden is on the user to supply the full config object and then the framework handles the management (create/update) using the supplied config.
|
3. Limitations
3.1. No Magic Pulsar Functions
Pulsar functions and custom connectors are represented by custom application code (eg. a java.util.Function
).
There is no magic support to automatically register the custom code.
While this would be amazing, it has some technical challenges and not yet been implemented.
As such, it is up to the user to ensure the function (or custom connector) is available at the location specified in the function config.
For example, if the function config has a jar
value of ./some/path/MyFunction.jar
then the function jar file must exist at the specified path.
4. Configuration
4.1. Pulsar Function Archive
Each Pulsar function is represented by an actual archive (eg. jar file).
The path to the archive is specified via the archive
property for sources and sinks, and the jar
property for functions.
The following rules determine the "type" of path:
-
The path is a URL when it starts w/
(file|http|https|function|sink|source)://
-
The path is built-in when it starts w/
builtin://
(points to one of the provided out-of-the-box connectors) -
The path is local otherwise.
The action that occurs during the create/update operation is dependent on path "type" as follows:
-
When the path is a URL the content is downloaded by the server
-
When the path is built-in the content is already available on the server
-
When the path is local the content is uploaded to the server
5. Custom functions
The details on how to develop and package custom functions can be found in the Pulsar docs. However, at a high-level, the requirements are as follows:
-
Code uses Java8
-
Code implements either
java.util.Function
ororg.apache.pulsar.functions.api.Function
-
Packaged as uber jar
Once the function is built and packaged, there are several ways to make it available for function registration.
5.1. file://
The jar file can be uploaded to the server and then referenced via file://
in the jar
property of the function config
5.2. local
The jar file can remain local and then referenced via the local path in the jar
property of the function config.
6. Examples
Here are some examples that show how to configure a PulsarSource
bean which results in the PulsarFunctionAdministration
auto-creating the backing Pulsar source connector.
@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
This next example is the same as the previous, except that it uses the Spring Boot auto-configured RabbitProperties
to ease the configuration burden. This of course requires the application to be using Spring Boot with Rabbit auto-configuration enabled.
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
For a more elaborate example see the Sample Stream Pipeline with Pulsar Functions sample app |
For a more elaborate example see the Sample Stream Pipeline with Pulsar Functions sample app |