Apache Pulsar 通过为 Apache Pulsar 项目提供 Spring 的自动配置来支持。Spring中文文档

Spring Boot 将在类路径上自动配置和注册经典(命令式)Spring for Apache Pulsar 组件。 当位于类路径上时,它将对反应式组件执行相同的操作。org.springframework.pulsar:spring-pulsarorg.springframework.pulsar:spring-pulsar-reactiveSpring中文文档

有 和 “启动器”,分别用于方便地收集命令式和被动使用的依赖项。spring-boot-starter-pulsarspring-boot-starter-pulsar-reactiveSpring中文文档

连接到 Pulsar

当您使用 Pulsar 启动器时,Spring Boot 将自动配置并注册一个 bean。PulsarClientSpring中文文档

默认情况下,应用程序会尝试连接到位于 的本地 Pulsar 实例。 这可以通过将属性设置为不同的值来调整。pulsar://localhost:6650spring.pulsar.client.service-urlSpring中文文档

该值必须是有效的 Pulsar 协议 URL

您可以通过指定任何带前缀的应用程序属性来配置客户端。spring.pulsar.client.*Spring中文文档

如果您需要对配置进行更多控制,请考虑注册一个或多个 Bean。PulsarClientBuilderCustomizerSpring中文文档

认证

要连接到需要身份验证的 Pulsar 集群,您需要通过设置插件所需的 和 任何参数来指定要使用的身份验证插件。 您可以将参数设置为参数名称与参数值的映射。 以下示例演示如何配置插件。pluginClassNameAuthenticationOAuth2Spring中文文档

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

您需要确保 下定义的名称与您的身份验证插件(通常是驼峰大小写)所期望的名称完全匹配。 Spring Boot 不会尝试对这些条目进行任何形式的宽松绑定。spring.pulsar.client.authentication.param.*Spring中文文档

例如,如果要配置身份验证插件的颁发者 URL,则必须使用 . 如果您使用其他形式,例如 或 ,则该设置将不会应用于插件。AuthenticationOAuth2spring.pulsar.client.authentication.param.issuerUrlissuerurlissuer-urlSpring中文文档

这种缺乏宽松的绑定也使得使用环境变量作为身份验证参数成为问题,因为在转换过程中会丢失区分大小写。 如果对参数使用环境变量,则需要按照 Spring for Apache Pulsar 参考文档中的这些步骤操作,才能使其正常工作。Spring中文文档

SSL证书

默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务进行通信。 您可以按照 Spring for Apache Pulsar 参考文档中的以下步骤启用 TLS 加密。Spring中文文档

有关客户端和身份验证的完整详细信息,请参阅 Spring for Apache Pulsar 参考文档Spring中文文档

该值必须是有效的 Pulsar 协议 URL

您需要确保 下定义的名称与您的身份验证插件(通常是驼峰大小写)所期望的名称完全匹配。 Spring Boot 不会尝试对这些条目进行任何形式的宽松绑定。spring.pulsar.client.authentication.param.*Spring中文文档

例如,如果要配置身份验证插件的颁发者 URL,则必须使用 . 如果您使用其他形式,例如 或 ,则该设置将不会应用于插件。AuthenticationOAuth2spring.pulsar.client.authentication.param.issuerUrlissuerurlissuer-urlSpring中文文档

这种缺乏宽松的绑定也使得使用环境变量作为身份验证参数成为问题,因为在转换过程中会丢失区分大小写。 如果对参数使用环境变量,则需要按照 Spring for Apache Pulsar 参考文档中的这些步骤操作,才能使其正常工作。Spring中文文档

被动地连接到 Pulsar

当反应式自动配置被激活时,Spring Boot 将自动配置并注册一个 bean。ReactivePulsarClientSpring中文文档

将调整前面描述的 . 因此,请按照上一节配置 .ReactivePulsarClientPulsarClientPulsarClientReactivePulsarClientSpring中文文档

连接到 Pulsar 管理

Apache Pulsar 客户端的 Spring 也是自动配置的。PulsarAdministrationSpring中文文档

默认情况下,应用程序会尝试连接到位于 的本地 Pulsar 实例。 这可以通过在 形式 中将属性设置为不同的值来调整。http://localhost:8080spring.pulsar.admin.service-url(http|https)://<host>:<port>Spring中文文档

如果您需要对配置进行更多控制,请考虑注册一个或多个 Bean。PulsarAdminBuilderCustomizerSpring中文文档

认证

当访问需要认证的 Pulsar 集群时,管理客户端需要与常规 Pulsar 客户端相同的安全配置。 您可以通过将 替换为 来使用上述身份验证配置spring.pulsar.client.authenticationspring.pulsar.admin.authenticationSpring中文文档

要在启动时创建主题,请添加类型为 的 Bean 。 如果该主题已存在,则忽略该 bean。PulsarTopic
要在启动时创建主题,请添加类型为 的 Bean 。 如果该主题已存在,则忽略该 bean。PulsarTopic

发送消息

Spring's 是自动配置的,您可以使用它来发送消息,如以下示例所示:PulsarTemplateSpring中文文档

import org.apache.pulsar.client.api.PulsarClientException;

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() throws PulsarClientException {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

它依赖于 a 来创建底层的 Pulsar 生产者。 Spring Boot 自动配置还提供了此生产者工厂,默认情况下,它缓存它创建的生产者。 您可以通过指定任何带有前缀的应用程序属性来配置生产者工厂和缓存设置。PulsarTemplatePulsarProducerFactoryspring.pulsar.producer.*spring.pulsar.producer.cache.*Spring中文文档

如果您需要对生产者工厂配置进行更多控制,请考虑注册一个或多个 Bean。 这些定制器将应用于所有创建的生产者。 您还可以在发送消息时传入 仅影响当前生产者。ProducerBuilderCustomizerProducerBuilderCustomizerSpring中文文档

如果您需要对正在发送的消息进行更多控制,则可以在发送消息时传入。TypedMessageBuilderCustomizerSpring中文文档

被动发送消息

当 Reactive 自动配置被激活时,Spring 的会自动配置,你可以用它来发送消息,如以下示例所示:ReactivePulsarTemplateSpring中文文档

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

它依赖于 a 来实际创建基础发送方。 Spring Boot 自动配置还提供了此发送方工厂,默认情况下,该工厂会缓存它创建的生产者。 您可以通过指定任何带前缀的应用程序属性来配置发件人工厂和缓存设置。ReactivePulsarTemplateReactivePulsarSenderFactoryspring.pulsar.producer.*spring.pulsar.producer.cache.*Spring中文文档

如果您需要对发送方工厂配置进行更多控制,请考虑注册一个或多个 Bean。 这些定制器将应用于所有创建的发件人。 您还可以在发送消息时传入 仅影响当前发件人。ReactiveMessageSenderBuilderCustomizerReactiveMessageSenderBuilderCustomizerSpring中文文档

如果您需要对正在发送的消息进行更多控制,则可以在发送消息时传入。MessageSpecBuilderCustomizerSpring中文文档

接收消息

当 Apache Pulsar 基础设施存在时,任何 Bean 都可以被注释以创建侦听器端点。 以下组件在主题上创建侦听器终结点:@PulsarListenersomeTopicSpring中文文档

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自动配置提供了 所需的所有组件,例如它用来构建底层 Pulsar 消费者的 和 消费者工厂。 您可以通过指定任何带有前缀的应用程序属性来配置这些组件。PulsarListenerPulsarListenerContainerFactoryspring.pulsar.listener.*spring.pulsar.consumer.*Spring中文文档

如果您需要对使用者工厂配置进行更多控制,请考虑注册一个或多个 Bean。 这些定制器适用于工厂创建的所有使用者,因此也适用于所有实例。 您还可以通过设置注释的属性来自定义单个侦听器。ConsumerBuilderCustomizer@PulsarListenerconsumerCustomizer@PulsarListenerSpring中文文档

被动接收消息

当 Apache Pulsar 基础设施存在并激活了反应式自动配置时,可以对任何 Bean 进行注释以创建响应式侦听器端点。 以下组件在主题上创建反应式侦听器终结点:@ReactivePulsarListenersomeTopicSpring中文文档

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot 自动配置提供了 所需的所有组件,例如它用来构建底层反应式 Pulsar 消费者的 和 消费者工厂。 您可以通过指定任何 和 spring.pulsar.consumer. 前缀的应用程序属性来配置这些组件。ReactivePulsarListenerReactivePulsarListenerContainerFactoryspring.pulsar.listener.Spring中文文档

如果您需要对使用者工厂配置进行更多控制,请考虑注册一个或多个 Bean。 这些定制器适用于工厂创建的所有使用者,因此也适用于所有实例。 您还可以通过设置注释的属性来自定义单个侦听器。ReactiveMessageConsumerBuilderCustomizer@ReactivePulsarListenerconsumerCustomizer@ReactivePulsarListenerSpring中文文档

阅读消息

Pulsar 读取器界面使应用程序能够手动管理光标。 使用阅读器连接到主题时,需要指定阅读器在连接到主题时从哪条消息开始阅读。Spring中文文档

当 Apache Pulsar 基础设施存在时,任何 Bean 都可以使用读取器进行注释以使用消息。 以下组件创建一个读取器终结点,该终结点从主题的开头开始读取消息:@PulsarReadersomeTopicSpring中文文档

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

它依赖于 a 来创建底层的 Pulsar 读取器。 Spring Boot 自动配置提供了此读取器工厂,可以通过设置任何前缀应用程序属性来自定义。@PulsarReaderPulsarReaderFactoryspring.pulsar.reader.*Spring中文文档

如果您需要对读卡器工厂配置进行更多控制,请考虑注册一个或多个 Bean。 这些定制器适用于工厂创建的所有读取器,因此也适用于所有实例。 您还可以通过设置注释的属性来自定义单个侦听器。ReaderBuilderCustomizer@PulsarReaderreaderCustomizer@PulsarReaderSpring中文文档

被动阅读消息

当 Apache Pulsar 基础设施存在并且激活了反应式自动配置时,将提供 Spring 的,您可以使用它来创建一个阅读器,以便以响应式方式读取消息。 以下组件使用提供的工厂创建读取器,并从主题中读取 5 分钟前的一条消息:ReactivePulsarReaderFactorysomeTopicSpring中文文档

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot 自动配置提供了此读取器工厂,可以通过设置任何前缀应用程序属性来自定义。spring.pulsar.reader.*Spring中文文档

如果需要对读取器出厂配置进行更多控制,请考虑在使用出厂创建读取器时传入一个或多个实例。ReactiveMessageReaderBuilderCustomizerSpring中文文档

如果您需要对读卡器工厂配置进行更多控制,请考虑注册一个或多个 Bean。 这些定制器将应用于所有创建的读取器。 您还可以在创建读取器时传递一个或多个,以仅将自定义项应用于创建的读取器。ReactiveMessageReaderBuilderCustomizerReactiveMessageReaderBuilderCustomizerSpring中文文档

有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档
有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档

交易支持

Spring for Apache Pulsar 在使用 和 时支持事务。PulsarTemplate@PulsarListenerSpring中文文档

使用反应式变体时,当前不支持事务。

将属性设置为:spring.pulsar.transaction.enabledtrueSpring中文文档

的属性可用于微调何时应将事务用于侦听器。transactional@PulsarListenerSpring中文文档

为了更好地控制 Spring for Apache Pulsar 事务功能,您应该定义自己的和/或 bean。 如果默认的自动配置不合适,也可以定义 Bean。PulsarTemplateConcurrentPulsarListenerContainerFactoryPulsarAwareTransactionManagerPulsarTransactionManagerSpring中文文档

使用反应式变体时,当前不支持事务。

Pulsar 的其他属性

自动配置支持的属性显示在附录的“集成属性”部分。 请注意,在大多数情况下,这些属性(连字符或 camelCase)直接映射到 Apache Pulsar 配置属性。 有关详细信息,请参阅 Apache Pulsar 文档。Spring中文文档

只有 Pulsar 支持的属性子集可以直接通过该类获得。 如果希望使用不直接支持的其他属性来调整自动配置的组件,则可以使用上述每个组件支持的定制器。PulsarPropertiesSpring中文文档