此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cn

绑定属性

绑定属性使用 . 这表示正在配置的绑定的名称。spring.cloud.stream.bindings.<bindingName>.<property>=<value><bindingName>spring-doc.cn

例如,对于以下函数spring-doc.cn

@Bean
public Function<String, String> uppercase() {
	return v -> v.toUpperCase();
}

有两个名为 input 和 for output 的绑定。有关详细信息,请参阅 [Binding 和 Binding names]。uppercase-in-0uppercase-out-0spring-doc.cn

为避免重复,Spring Cloud Stream 支持以公共 binding 属性的格式设置所有绑定的值。spring.cloud.stream.default.<property>=<value>spring.cloud.stream.default.<producer|consumer>.<property>=<value>

当涉及到避免扩展绑定属性的重复时,应使用此格式 - 。spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>spring-doc.cn

通用绑定属性

这些属性通过org.springframework.cloud.stream.config.BindingPropertiesspring-doc.cn

以下绑定属性可用于输入和输出绑定,并且必须以 (为前缀,例如 )。spring.cloud.stream.bindings.<bindingName>.spring.cloud.stream.bindings.uppercase-in-0.destination=ticktockspring-doc.cn

可以使用前缀(例如 )设置默认值 。spring.cloud.stream.defaultspring.cloud.stream.default.contentType=application/jsonspring-doc.cn

目的地

绑定中间件上绑定的目标(例如,RabbitMQ 交换或 Kafka 主题)。 如果 binding 表示使用者绑定(输入),则可以将其绑定到多个目标,并且可以将目标名称指定为逗号分隔值。 否则,则改用实际的绑定名称。 无法覆盖此属性的默认值。Stringspring-doc.cn

绑定的 consumer 组。 仅适用于入站绑定。 请参见 Consumer Groupsspring-doc.cn

默认值:(表示匿名使用者)。nullspring-doc.cn

contentType

此绑定的内容类型。 看。Content Type Negotiationspring-doc.cn

违约:。application/jsonspring-doc.cn

粘结 剂

此绑定使用的 Binder。 有关详细信息,请参阅。Multiple Binders on the Classpathspring-doc.cn

Default:(如果存在,则使用默认 Binder)。nullspring-doc.cn

消费者属性

这些属性通过org.springframework.cloud.stream.binder.ConsumerPropertiesspring-doc.cn

以下绑定属性仅适用于输入绑定,并且必须以 (为前缀,例如 )。spring.cloud.stream.bindings.<bindingName>.consumer.spring.cloud.stream.bindings.input.consumer.concurrency=3spring-doc.cn

可以使用前缀(例如,)设置默认值。spring.cloud.stream.default.consumerspring.cloud.stream.default.consumer.headerMode=nonespring-doc.cn

自动启动

指示此使用者是否需要自动启动spring-doc.cn

违约:。truespring-doc.cn

并发

入站使用者的并发性。spring-doc.cn

违约:。1spring-doc.cn

分区

Consumer 是否从分区的 producer 接收数据。spring-doc.cn

违约:。falsespring-doc.cn

headerMode

设置为 时,将禁用输入时的标头解析。 仅对本机不支持消息标头且需要标头嵌入的消息中间件有效。 在不支持本机 Headers 的情况下使用来自非 Spring Cloud Stream 应用程序的数据时,此选项非常有用。 当设置为 时,它使用中间件的本机标头机制。 设置为 时,它会将标头嵌入到消息有效负载中。noneheadersembeddedHeadersspring-doc.cn

Default:取决于 Binder 实现。spring-doc.cn

最大尝试次数

如果处理失败,则为尝试处理消息的次数(包括第一次)。 设置为 to 以禁用重试。1spring-doc.cn

违约:。3spring-doc.cn

backOffInitialInterval

重试时的回退初始间隔。spring-doc.cn

违约:。1000spring-doc.cn

backOffMaxInterval

最大回退间隔。spring-doc.cn

违约:。10000spring-doc.cn

backOffMultiplier 的

回退乘数。spring-doc.cn

违约:。2.0spring-doc.cn

defaultRetryable

侦听器引发的未在 中列的异常是否可重试。retryableExceptionsspring-doc.cn

违约:。truespring-doc.cn

instanceCount

当设置为大于等于零的值时,它允许自定义此使用者的实例计数(如果不同于 )。 当设置为负值时,它默认为 。 有关更多信息,请参阅。spring.cloud.stream.instanceCountspring.cloud.stream.instanceCountInstance Index and Instance Countspring-doc.cn

违约:。-1spring-doc.cn

instanceIndex 实例索引

当设置为大于等于零的值时,它允许自定义此 Consumer 的实例索引(如果不同于 )。 当设置为负值时,它默认为 。 如果提供,则忽略。 有关更多信息,请参阅。spring.cloud.stream.instanceIndexspring.cloud.stream.instanceIndexinstanceIndexListInstance Index and Instance Countspring-doc.cn

违约:。-1spring-doc.cn

instanceIndexList 实例索引列表

与不支持本机分区的 Binders(例如 RabbitMQ)一起使用;允许应用程序实例使用多个分区。spring-doc.cn

默认值:空。spring-doc.cn

retryableExceptions

键中 Throwable 类名的映射,值中有一个布尔值。 指定将要或不会重试的异常 (和子类)。 另请参阅 。 例:。defaultRetriablespring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=falsespring-doc.cn

默认值:空。spring-doc.cn

useNative解码

当设置为 时,入站消息由客户端库直接反序列化,必须相应地配置该库(例如,设置适当的 Kafka 生产者值 deserializer)。 使用此配置时,入站消息解组不基于 binding。 使用本机解码时,创建者有责任使用适当的编码器(例如,Kafka 生产者值序列化器)来序列化出站消息。 此外,当使用本机编码和解码时,该属性将被忽略,并且标头不会嵌入到消息中。 请参阅 producer 属性 。truecontentTypeheaderMode=embeddedHeadersuseNativeEncodingspring-doc.cn

违约:。falsespring-doc.cn

多重

当设置为 true 时,底层 Binder 将在同一 input 绑定上本地多路复用目标。spring-doc.cn

违约:。falsespring-doc.cn

高级消费者配置

对于消息驱动的使用者的基础消息侦听器容器的高级配置,请将单个 Bean 添加到应用程序上下文中。 它将在应用上述属性后调用,并可用于设置其他属性。 同样,对于轮询的使用者,添加一个 Bean。ListenerContainerCustomizerMessageSourceCustomizerspring-doc.cn

以下是 RabbitMQ Binder 的示例:spring-doc.cn

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
    return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}

@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
    return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}

生产者属性

这些属性通过org.springframework.cloud.stream.binder.ProducerPropertiesspring-doc.cn

以下绑定属性仅适用于输出绑定,并且必须以 (例如) 为前缀。spring.cloud.stream.bindings.<bindingName>.producer.spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.idspring-doc.cn

可以使用前缀(例如,)设置默认值。spring.cloud.stream.default.producerspring.cloud.stream.default.producer.partitionKeyExpression=headers.idspring-doc.cn

自动启动

指示此使用者是否需要自动启动spring-doc.cn

违约:。truespring-doc.cn

partitionKeyExpression 的

一个 SPEL 表达式,用于确定如何对出站数据进行分区。 如果设置,则对此绑定上的出站数据进行分区。 必须设置为大于 1 的值才能生效。 看。partitionCountPartitioningspring-doc.cn

默认值:null。spring-doc.cn

partitionKeyExtractorName

实现 .用于提取用于计算的 key 分区 ID(参见 'partitionSelector*')。与 'partitionKeyExpression' 互斥。PartitionKeyExtractorStrategyspring-doc.cn

默认值:null。spring-doc.cn

partitionSelectorName (分区选择器名称)

实现 .用于确定基于分区 ID 在分区键上(参见 'partitionKeyExtractor*')。与 'partitionSelectorExpression' 互斥。PartitionSelectorStrategyspring-doc.cn

默认值:null。spring-doc.cn

partitionSelectorExpression 的

用于自定义分区选择的 SPEL 表达式。 如果未设置,则选择分区作为 ,其中通过 .hashCode(key) % partitionCountkeypartitionKeyExpressionspring-doc.cn

违约:。nullspring-doc.cn

partitionCount

数据的目标分区数 (如果启用了分区)。 如果生产者已分区,则必须设置为大于 1 的值。 在 Kafka 上,它被解释为提示。改用 this 和目标主题的分区计数中的较大者。spring-doc.cn

违约:。1spring-doc.cn

必需组

一个以逗号分隔的组列表,创建者必须确保将消息传送到这些组,即使它们在创建消息后启动(例如,通过在 RabbitMQ 中预先创建持久队列)。spring-doc.cn

headerMode

设置为 时,它将禁用输出上的标头嵌入。 它仅对本机不支持消息标头且需要嵌入标头的消息中间件有效。 在不支持本机标头的情况下为非 Spring Cloud Stream 应用程序生成数据时,此选项非常有用。 当设置为 时,它使用中间件的本机标头机制。 设置为 时,它会将标头嵌入到消息有效负载中。noneheadersembeddedHeadersspring-doc.cn

Default:取决于 Binder 实现。spring-doc.cn

useNativeEncoding

设置为 时,出站消息由客户端库直接序列化,必须相应地配置(例如,设置适当的 Kafka 生产者值序列化器)。 使用此配置时,出站消息封送处理不基于绑定。 使用本机编码时,使用者有责任使用适当的解码器(例如,Kafka 使用者值反序列化器)来反序列化入站消息。 此外,当使用本机编码和解码时,该属性将被忽略,并且标头不会嵌入到消息中。 请参阅 consumer 属性 。truecontentTypeheaderMode=embeddedHeadersuseNativeDecodingspring-doc.cn

违约:。falsespring-doc.cn

errorChannelEnabled

当设置为 true 时,如果 Binders 支持异步发送结果,则发送失败将发送到目标的错误通道。有关更多信息,请参阅错误处理。spring-doc.cn

默认值:false。spring-doc.cn

高级 Producer 配置

在某些情况下,Producer Properties 不足以在 Binder 中正确配置生产 MessageHandler,或者您可能更喜欢编程方法 在配置此类生成 MessageHandler 时。无论出于何种原因,spring-cloud-stream都可以完成它。ProducerMessageHandlerCustomizerspring-doc.cn

@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {

	/**
	 * Configure a {@link MessageHandler} that is being created by the binder for the
	 * provided destination name.
	 * @param handler the {@link MessageHandler} from the binder.
	 * @param destinationName the bound destination name.
	 */
	void configure(H handler, String destinationName);

}

如您所见,它允许您访问实际的 produce 实例,您可以根据需要对其进行配置。 您需要做的就是提供此策略的实现并将其配置为 .MessageHandler@Beanspring-doc.cn