Spring for Apache Pulsar provides basic suppport for Pulsar IO (connectors) and Pulsar Functions which allow users to define stream processing pipelines made up of sources, processors, and sinks. The sources and sinks are modeled by Pulsar IO (connectors) and the processors are represented by Pulsar Functions.spring-doc.cn

Because connectors are just special functions, and for simplicity, we refer to sources, sinks and functions collectively as "Pulsar Functions".
Pre-requisites

Familiarity - the audience is expected to be somewhat familiar w/ Pulsar IO and Pulsar Functions. If that is not the case it may be helpful to see their getting started guides.spring-doc.cn

Feature enabled - to use these features the functions support in Apache Pulsar must be enabled and configured (it is disabled by default). The built-in connectors may also need to be installed on the Pulsar cluster.spring-doc.cn

See the Pulsar IO and Pulsar Functions docs for more details.spring-doc.cn

Because connectors are just special functions, and for simplicity, we refer to sources, sinks and functions collectively as "Pulsar Functions".

1. Pulsar Function Administration

The framework provides the PulsarFunctionAdministration component to manage Pulsar functions. When you use the Pulsar Spring Boot starter, you get the PulsarFunctionAdministration auto-configured.spring-doc.cn

By default, the application tries to connect to a local Pulsar instance at localhost:8080. However, because it leverages the already configured PulsarAdministration, see Pulsar Admin Client for available client options (including authentication). Additional configuration options are available with the spring.pulsar.function.* application properties.spring-doc.cn

2. Automatic Function Management

On application startup, the framework finds all PulsarFunction, PulsarSink, and PulsarSource beans in the application context. For each bean, the corresponding Pulsar function is either created or updated. The proper API is called based on function type, function config, and whether the function already exists.spring-doc.cn

The PulsarFunction, PulsarSink, and PulsarSource beans are simple wrappers around the Apache Pulsar config objects FunctionConfig, SinkConfig, and SourceConfig, respectively. Due to the large number of supported connectors (and their varied configurations) the framework does not attempt to create a configuration properties hierarchy to mirror the varied Apache Pulsar connectors. Instead, the burden is on the user to supply the full config object and then the framework handles the management (create/update) using the supplied config.

On application shutdown, all functions that were processed during application startup have their stop policy enforced and are either left alone, stopped, or deleted from the Pulsar server.spring-doc.cn

The PulsarFunction, PulsarSink, and PulsarSource beans are simple wrappers around the Apache Pulsar config objects FunctionConfig, SinkConfig, and SourceConfig, respectively. Due to the large number of supported connectors (and their varied configurations) the framework does not attempt to create a configuration properties hierarchy to mirror the varied Apache Pulsar connectors. Instead, the burden is on the user to supply the full config object and then the framework handles the management (create/update) using the supplied config.

3. Limitations

3.1. No Magic Pulsar Functions

Pulsar functions and custom connectors are represented by custom application code (eg. a java.util.Function). There is no magic support to automatically register the custom code. While this would be amazing, it has some technical challenges and not yet been implemented. As such, it is up to the user to ensure the function (or custom connector) is available at the location specified in the function config. For example, if the function config has a jar value of ./some/path/MyFunction.jar then the function jar file must exist at the specified path.spring-doc.cn

3.2. Name Identifier

The name property from the function config is used as the identifier to determine if a function already exists in order to decide if an update or create operation is performed. As such, the name should not be modified if function updates are desired.spring-doc.cn

4. Configuration

4.1. Pulsar Function Archive

Each Pulsar function is represented by an actual archive (eg. jar file). The path to the archive is specified via the archive property for sources and sinks, and the jar property for functions.spring-doc.cn

The following rules determine the "type" of path:spring-doc.cn

  • The path is a URL when it starts w/ (file|http|https|function|sink|source)://spring-doc.cn

  • The path is built-in when it starts w/ builtin:// (points to one of the provided out-of-the-box connectors)spring-doc.cn

  • The path is local otherwise.spring-doc.cn

The action that occurs during the create/update operation is dependent on path "type" as follows:spring-doc.cn

  • When the path is a URL the content is downloaded by the serverspring-doc.cn

  • When the path is built-in the content is already available on the serverspring-doc.cn

  • When the path is local the content is uploaded to the serverspring-doc.cn

4.2. Built-in Source and Sinks

Apache Pulsar provides many source and sink connectors out-of-the-box, aka built-in connectors. To use a built-in connector simply set the archive to builtin://<connector-type> (eg builtin://rabbit).spring-doc.cn

5. Custom functions

The details on how to develop and package custom functions can be found in the Pulsar docs. However, at a high-level, the requirements are as follows:spring-doc.cn

Once the function is built and packaged, there are several ways to make it available for function registration.spring-doc.cn

5.1. file://

The jar file can be uploaded to the server and then referenced via file:// in the jar property of the function configspring-doc.cn

5.2. local

The jar file can remain local and then referenced via the local path in the jar property of the function config.spring-doc.cn

5.3. http://

The jar file can be made available via HTTP server and then referenced via http(s):// in the jar property of the function configspring-doc.cn

5.4. function://

The jar file can be uploaded to the Pulsar package manager and then referenced via function:// in the jar property of the function configspring-doc.cn

6. Examples

Here are some examples that show how to configure a PulsarSource bean which results in the PulsarFunctionAdministration auto-creating the backing Pulsar source connector.spring-doc.cn

PulsarSource using built-in Rabbit connector
@Bean
PulsarSource rabbitSource() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", "my.rabbit.host");
    configs.put("port", 5672);
    configs.put("virtualHost", "/");
    configs.put("username", "guest");
    configs.put("password", "guest");
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

This next example is the same as the previous, except that it uses the Spring Boot auto-configured RabbitProperties to ease the configuration burden. This of course requires the application to be using Spring Boot with Rabbit auto-configuration enabled.spring-doc.cn

PulsarSource using built-in Rabbit connector and Spring Boot RabbitProperties
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", props.determineHost());
    configs.put("port", props.determinePort());
    configs.put("virtualHost", props.determineVirtualHost());
    configs.put("username", props.determineUsername());
    configs.put("password", props.determinePassword());
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}
For a more elaborate example see the Sample Stream Pipeline with Pulsar Functions sample app
For a more elaborate example see the Sample Stream Pipeline with Pulsar Functions sample app