通过提供 Spring for Apache Pulsar 项目的自动配置来支持 Apache Pulsarspring-doc.cn

Spring Boot 将在 classpath 上时自动配置和注册经典(命令式)Spring for Apache Pulsar 组件。 当 在 Classpath 上时,它将对响应式组件执行相同的操作。org.springframework.pulsar:spring-pulsarorg.springframework.pulsar:spring-pulsar-reactivespring-doc.cn

有 和 starters 分别用于方便地收集用于命令式和反应式使用的依赖项。spring-boot-starter-pulsarspring-boot-starter-pulsar-reactivespring-doc.cn

连接到 Pulsar

当您使用 Pulsar Starters时, Spring Boot 将自动配置并注册一个 bean。PulsarClientspring-doc.cn

默认情况下,应用程序会尝试连接到 位于 的本地 Pulsar 实例。 这可以通过将属性设置为其他值来调整。pulsar://localhost:6650spring.pulsar.client.service-urlspring-doc.cn

该值必须是有效的 Pulsar 协议 URL

您可以通过指定任何带前缀的应用程序属性来配置客户端。spring.pulsar.client.*spring-doc.cn

如果需要对配置进行更多控制,请考虑注册一个或多个 bean。PulsarClientBuilderCustomizerspring-doc.cn

认证

要连接到需要身份验证的 Pulsar 集群,你需要通过设置插件所需的 the 和任何参数来指定要使用的身份验证插件。 您可以将参数设置为参数名称到参数值的映射。 以下示例显示如何配置插件。pluginClassNameAuthenticationOAuth2spring-doc.cn

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

您需要确保 下定义的名称与 auth 插件(通常是驼峰式大小写)所期望的名称完全匹配。 Spring Boot 不会尝试对这些条目进行任何类型的松散绑定。spring.pulsar.client.authentication.param.*spring-doc.cn

例如,如果要为 auth 插件配置颁发者 URL,则必须使用 。 如果使用其他形式(如 或 ),则设置将不会应用于插件。AuthenticationOAuth2spring.pulsar.client.authentication.param.issuerUrlissuerurlissuer-urlspring-doc.cn

这种缺乏松散的绑定还使得对身份验证参数使用环境变量成问题,因为在转换过程中会丢失区分大小写。 如果你对参数使用环境变量,则需要按照 Spring for Apache Pulsar 参考文档中的这些步骤进行操作,使其正常工作。spring-doc.cn

SSL认证

默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务通信。 你可以按照 Spring for Apache Pulsar 参考文档中的这些步骤来启用 TLS 加密。spring-doc.cn

有关客户端和身份验证的完整详细信息,请参阅 Spring for Apache Pulsar 参考文档spring-doc.cn

该值必须是有效的 Pulsar 协议 URL

您需要确保 下定义的名称与 auth 插件(通常是驼峰式大小写)所期望的名称完全匹配。 Spring Boot 不会尝试对这些条目进行任何类型的松散绑定。spring.pulsar.client.authentication.param.*spring-doc.cn

例如,如果要为 auth 插件配置颁发者 URL,则必须使用 。 如果使用其他形式(如 或 ),则设置将不会应用于插件。AuthenticationOAuth2spring.pulsar.client.authentication.param.issuerUrlissuerurlissuer-urlspring-doc.cn

这种缺乏松散的绑定还使得对身份验证参数使用环境变量成问题,因为在转换过程中会丢失区分大小写。 如果你对参数使用环境变量,则需要按照 Spring for Apache Pulsar 参考文档中的这些步骤进行操作,使其正常工作。spring-doc.cn

连接到 Pulsar Reactors

激活 Reactive 自动配置后, Spring Boot 将自动配置并注册一个 bean。ReactivePulsarClientspring-doc.cn

会适配前面描述的 . 因此,请按照上一节配置 使用的 。ReactivePulsarClientPulsarClientPulsarClientReactivePulsarClientspring-doc.cn

连接到 Pulsar Administration

Spring for Apache Pulsar 的客户端也是自动配置的。PulsarAdministrationspring-doc.cn

默认情况下,应用程序会尝试连接到 位于 的本地 Pulsar 实例。 这可以通过在表单中将属性设置为其他值来调整。http://localhost:8080spring.pulsar.admin.service-url(http|https)://<host>:<port>spring-doc.cn

如果需要对配置进行更多控制,请考虑注册一个或多个 bean。PulsarAdminBuilderCustomizerspring-doc.cn

认证

访问需要身份验证的 Pulsar 集群时,admin 客户端需要与常规 Pulsar 客户端相同的安全配置。 您可以通过将 .spring.pulsar.client.authenticationspring.pulsar.admin.authenticationspring-doc.cn

要在启动时创建主题,请添加 类型的 bean 。 如果主题已存在,则忽略该 Bean。PulsarTopic
要在启动时创建主题,请添加 类型的 bean 。 如果主题已存在,则忽略该 Bean。PulsarTopic

发送消息

Spring 的 是自动配置的,你可以使用它来发送消息,如以下示例所示:PulsarTemplatespring-doc.cn

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

它依赖于 a 来创建底层的 Pulsar 生产者。 Spring Boot 自动配置还提供了这个 producer 工厂,默认情况下,它会缓存它创建的 producer。 您可以通过指定任何 和 前缀应用程序属性来配置创建者出厂设置和缓存设置。PulsarTemplatePulsarProducerFactoryspring.pulsar.producer.*spring.pulsar.producer.cache.*spring-doc.cn

如果需要对生产者工厂配置进行更多控制,请考虑注册一个或多个 bean。 这些定制器将应用于所有创建的生产者。 您也可以传入 when sending a message 以仅影响当前生产者。ProducerBuilderCustomizerProducerBuilderCustomizerspring-doc.cn

如果您需要对正在发送的消息进行更多控制,则可以在发送消息时传入 。TypedMessageBuilderCustomizerspring-doc.cn

响应式发送消息

激活 Reactive 自动配置后, Spring 的自动配置是自动配置的,你可以使用它来发送消息,如以下示例所示:ReactivePulsarTemplatespring-doc.cn

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

这依赖于 a 来实际创建底层发件人。 Spring Boot 自动配置还提供了这个发送者工厂,默认情况下,它会缓存它创建的生产者。 您可以通过指定任何 和 前缀应用程序属性来配置发送程序出厂设置和缓存设置。ReactivePulsarTemplateReactivePulsarSenderFactoryspring.pulsar.producer.*spring.pulsar.producer.cache.*spring-doc.cn

如果您需要对发送方工厂配置进行更多控制,请考虑注册一个或多个 bean。 这些定制器将应用于所有创建的发件人。 你也可以传入 when sending a message 来只影响当前发件人。ReactiveMessageSenderBuilderCustomizerReactiveMessageSenderBuilderCustomizerspring-doc.cn

如果您需要对正在发送的消息进行更多控制,则可以在发送消息时传入 。MessageSpecBuilderCustomizerspring-doc.cn

接收消息

当 Apache Pulsar 基础设施存在时,任何 bean 都可以进行注释以创建侦听器端点。 以下组件在主题上创建侦听器终端节点:@PulsarListenersomeTopicspring-doc.cn

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自动配置提供了所有必要的组件,例如它用来构建底层 Pulsar 消费者的消费者工厂。 您可以通过指定任何 和 前缀应用程序属性来配置这些组件。PulsarListenerPulsarListenerContainerFactoryspring.pulsar.listener.*spring.pulsar.consumer.*spring-doc.cn

如果需要对 Consumer Factory 配置进行更多控制,请考虑注册一个或多个 bean。 这些定制器将应用于工厂创建的所有使用者,因此也应用于所有实例。 您还可以通过设置注释的属性来自定义单个侦听器。ConsumerBuilderCustomizer@PulsarListenerconsumerCustomizer@PulsarListenerspring-doc.cn

响应式接收消息

当 Apache Pulsar 基础设施存在并激活 Reactive 自动配置时,任何 bean 都可以被注释以创建反应式侦听器端点。 以下组件在主题上创建反应式侦听器终端节点:@ReactivePulsarListenersomeTopicspring-doc.cn

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot 自动配置提供了所有必要的组件,例如它用来构建底层反应式 Pulsar 消费者的消费者工厂。 您可以通过指定任何 和 前缀应用程序属性来配置这些组件。ReactivePulsarListenerReactivePulsarListenerContainerFactoryspring.pulsar.listener.*spring.pulsar.consumer.*spring-doc.cn

如果需要对 Consumer Factory 配置进行更多控制,请考虑注册一个或多个 bean。 这些定制器将应用于工厂创建的所有使用者,因此也应用于所有实例。 您还可以通过设置注释的属性来自定义单个侦听器。ReactiveMessageConsumerBuilderCustomizer@ReactivePulsarListenerconsumerCustomizer@ReactivePulsarListenerspring-doc.cn

阅读消息

Pulsar 读取器接口使应用程序能够手动管理游标。 当您使用读取器连接到主题时,您需要指定读取器在连接到主题时从哪条消息开始读取。spring-doc.cn

当存在 Apache Pulsar 基础设施时,任何 bean 都可以被注释以使用读取器来消费消息。 以下组件创建一个读取器终端节点,该终端节点从主题的开头开始读取消息:@PulsarReadersomeTopicspring-doc.cn

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

它依赖于 a 来创建底层的 Pulsar 读取器。 Spring Boot 自动配置提供了这个读取器工厂,可以通过设置任何带前缀的应用程序属性来自定义。@PulsarReaderPulsarReaderFactoryspring.pulsar.reader.*spring-doc.cn

如果需要对 reader 出厂配置进行更多控制,请考虑注册一个或多个 bean。 这些定制器应用于工厂创建的所有读取器,因此应用于所有实例。 您还可以通过设置注释的属性来自定义单个侦听器。ReaderBuilderCustomizer@PulsarReaderreaderCustomizer@PulsarReaderspring-doc.cn

响应式读取消息

当 Apache Pulsar 基础设施存在并激活 Reactive 自动配置时,会提供 Spring 的,你可以使用它来创建一个 reader,以便以 reactive 方式读取消息。 以下组件使用提供的工厂创建一个读取器,并从主题中读取 5 分钟前的一条消息:ReactivePulsarReaderFactorysomeTopicspring-doc.cn

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot 自动配置提供了这个读取器工厂,可以通过设置任何带前缀的应用程序属性来自定义。spring.pulsar.reader.*spring-doc.cn

如果您需要对 reader 出厂配置进行更多控制,请考虑在使用工厂创建 reader 时传入一个或多个实例。ReactiveMessageReaderBuilderCustomizerspring-doc.cn

如果需要对 reader 出厂配置进行更多控制,请考虑注册一个或多个 bean。 这些定制器将应用于所有创建的读取器。 您还可以在创建读取器时传递一个或多个,以仅将自定义项应用于创建的读取器。ReactiveMessageReaderBuilderCustomizerReactiveMessageReaderBuilderCustomizerspring-doc.cn

有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档
有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档

事务支持

Spring for Apache Pulsar 在使用 和 时支持事务。PulsarTemplate@PulsarListenerspring-doc.cn

使用 reactive 变体时,当前不支持 Transactions。

将 property 设置为 将:spring.pulsar.transaction.enabledtruespring-doc.cn

的属性 of 可用于微调何时应将事务与侦听器一起使用。transactional@PulsarListenerspring-doc.cn

为了更好地控制 Spring for Apache Pulsar 事务功能,您应该定义自己的 and/或 bean。 如果默认的 auto-configured 不合适,您还可以定义一个 bean。PulsarTemplateConcurrentPulsarListenerContainerFactoryPulsarAwareTransactionManagerPulsarTransactionManagerspring-doc.cn

使用 reactive 变体时,当前不支持 Transactions。

其他 Pulsar 属性

自动配置支持的属性显示在附录的 Integration Properties 部分中。 请注意,在大多数情况下,这些属性(带 hyphened 或 camelCase)直接映射到 Apache Pulsar 配置属性。 有关详细信息,请参阅 Apache Pulsar 文档。spring-doc.cn

只有 Pulsar 支持的 property 子集可以直接通过 class 获得。 如果您希望使用不直接支持的其他属性来优化自动配置的组件,则可以使用上述每个组件支持的定制器。PulsarPropertiesspring-doc.cn