此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
绑定属性
绑定属性使用 .
这表示正在配置的绑定的名称。spring.cloud.stream.bindings.<bindingName>.<property>=<value>
<bindingName>
例如,对于以下函数
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
有两个名为 input 和 for output 的绑定。有关详细信息,请参阅 [Binding 和 Binding names]。uppercase-in-0
uppercase-out-0
为避免重复,Spring Cloud Stream 支持以公共 binding 属性的格式设置所有绑定的值。spring.cloud.stream.default.<property>=<value> spring.cloud.stream.default.<producer|consumer>.<property>=<value> |
当涉及到避免扩展绑定属性的重复时,应使用此格式 - 。spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>
通用绑定属性
这些属性通过org.springframework.cloud.stream.config.BindingProperties
以下绑定属性可用于输入和输出绑定,并且必须以 (为前缀,例如 )。spring.cloud.stream.bindings.<bindingName>.
spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock
可以使用前缀(例如 )设置默认值 。spring.cloud.stream.default
spring.cloud.stream.default.contentType=application/json
- 目的地
-
绑定中间件上绑定的目标(例如,RabbitMQ 交换或 Kafka 主题)。 如果 binding 表示使用者绑定(输入),则可以将其绑定到多个目标,并且可以将目标名称指定为逗号分隔值。 否则,则改用实际的绑定名称。 无法覆盖此属性的默认值。
String
- 群
-
绑定的 consumer 组。 仅适用于入站绑定。 请参见 Consumer Groups。
默认值:(表示匿名使用者)。
null
- contentType
-
此绑定的内容类型。 看。
Content Type Negotiation
违约:。
application/json
- 粘结 剂
-
此绑定使用的 Binder。 有关详细信息,请参阅。
Multiple Binders on the Classpath
Default:(如果存在,则使用默认 Binder)。
null
消费者属性
这些属性通过org.springframework.cloud.stream.binder.ConsumerProperties
以下绑定属性仅适用于输入绑定,并且必须以 (为前缀,例如 )。spring.cloud.stream.bindings.<bindingName>.consumer.
spring.cloud.stream.bindings.input.consumer.concurrency=3
可以使用前缀(例如,)设置默认值。spring.cloud.stream.default.consumer
spring.cloud.stream.default.consumer.headerMode=none
- 自动启动
-
指示此使用者是否需要自动启动
违约:。
true
- 并发
-
入站使用者的并发性。
违约:。
1
- 分区
-
Consumer 是否从分区的 producer 接收数据。
违约:。
false
- headerMode
-
设置为 时,将禁用输入时的标头解析。 仅对本机不支持消息标头且需要标头嵌入的消息中间件有效。 在不支持本机 Headers 的情况下使用来自非 Spring Cloud Stream 应用程序的数据时,此选项非常有用。 当设置为 时,它使用中间件的本机标头机制。 设置为 时,它会将标头嵌入到消息有效负载中。
none
headers
embeddedHeaders
Default:取决于 Binder 实现。
- 最大尝试次数
-
如果处理失败,则为尝试处理消息的次数(包括第一次)。 设置为 to 以禁用重试。
1
违约:。
3
- backOffInitialInterval
-
重试时的回退初始间隔。
违约:。
1000
- backOffMaxInterval
-
最大回退间隔。
违约:。
10000
- backOffMultiplier 的
-
回退乘数。
违约:。
2.0
- defaultRetryable
-
侦听器引发的未在 中列的异常是否可重试。
retryableExceptions
违约:。
true
- instanceCount
-
当设置为大于等于零的值时,它允许自定义此使用者的实例计数(如果不同于 )。 当设置为负值时,它默认为 。 有关更多信息,请参阅。
spring.cloud.stream.instanceCount
spring.cloud.stream.instanceCount
Instance Index and Instance Count
违约:。
-1
- instanceIndex 实例索引
-
当设置为大于等于零的值时,它允许自定义此 Consumer 的实例索引(如果不同于 )。 当设置为负值时,它默认为 。 如果提供,则忽略。 有关更多信息,请参阅。
spring.cloud.stream.instanceIndex
spring.cloud.stream.instanceIndex
instanceIndexList
Instance Index and Instance Count
违约:。
-1
- instanceIndexList 实例索引列表
-
与不支持本机分区的 Binders(例如 RabbitMQ)一起使用;允许应用程序实例使用多个分区。
默认值:空。
- retryableExceptions
-
键中 Throwable 类名的映射,值中有一个布尔值。 指定将要或不会重试的异常 (和子类)。 另请参阅 。 例:。
defaultRetriable
spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
默认值:空。
- useNative解码
-
当设置为 时,入站消息由客户端库直接反序列化,必须相应地配置该库(例如,设置适当的 Kafka 生产者值 deserializer)。 使用此配置时,入站消息解组不基于 binding。 使用本机解码时,创建者有责任使用适当的编码器(例如,Kafka 生产者值序列化器)来序列化出站消息。 此外,当使用本机编码和解码时,该属性将被忽略,并且标头不会嵌入到消息中。 请参阅 producer 属性 。
true
contentType
headerMode=embeddedHeaders
useNativeEncoding
违约:。
false
- 多重
-
当设置为 true 时,底层 Binder 将在同一 input 绑定上本地多路复用目标。
违约:。
false
高级消费者配置
对于消息驱动的使用者的基础消息侦听器容器的高级配置,请将单个 Bean 添加到应用程序上下文中。
它将在应用上述属性后调用,并可用于设置其他属性。
同样,对于轮询的使用者,添加一个 Bean。ListenerContainerCustomizer
MessageSourceCustomizer
以下是 RabbitMQ Binder 的示例:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}
@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}
生产者属性
这些属性通过org.springframework.cloud.stream.binder.ProducerProperties
以下绑定属性仅适用于输出绑定,并且必须以 (例如) 为前缀。spring.cloud.stream.bindings.<bindingName>.producer.
spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id
可以使用前缀(例如,)设置默认值。spring.cloud.stream.default.producer
spring.cloud.stream.default.producer.partitionKeyExpression=headers.id
- 自动启动
-
指示此使用者是否需要自动启动
违约:。
true
- partitionKeyExpression 的
-
一个 SPEL 表达式,用于确定如何对出站数据进行分区。 如果设置,则对此绑定上的出站数据进行分区。 必须设置为大于 1 的值才能生效。 看。
partitionCount
Partitioning
默认值:null。
- partitionKeyExtractorName
-
实现 .用于提取用于计算的 key 分区 ID(参见 'partitionSelector*')。与 'partitionKeyExpression' 互斥。
PartitionKeyExtractorStrategy
默认值:null。
- partitionSelectorName (分区选择器名称)
-
实现 .用于确定基于分区 ID 在分区键上(参见 'partitionKeyExtractor*')。与 'partitionSelectorExpression' 互斥。
PartitionSelectorStrategy
默认值:null。
- partitionSelectorExpression 的
-
用于自定义分区选择的 SPEL 表达式。 如果未设置,则选择分区作为 ,其中通过 .
hashCode(key) % partitionCount
key
partitionKeyExpression
违约:。
null
- partitionCount
-
数据的目标分区数 (如果启用了分区)。 如果生产者已分区,则必须设置为大于 1 的值。 在 Kafka 上,它被解释为提示。改用 this 和目标主题的分区计数中的较大者。
违约:。
1
- 必需组
-
一个以逗号分隔的组列表,创建者必须确保将消息传送到这些组,即使它们在创建消息后启动(例如,通过在 RabbitMQ 中预先创建持久队列)。
- headerMode
-
设置为 时,它将禁用输出上的标头嵌入。 它仅对本机不支持消息标头且需要嵌入标头的消息中间件有效。 在不支持本机标头的情况下为非 Spring Cloud Stream 应用程序生成数据时,此选项非常有用。 当设置为 时,它使用中间件的本机标头机制。 设置为 时,它会将标头嵌入到消息有效负载中。
none
headers
embeddedHeaders
Default:取决于 Binder 实现。
- useNativeEncoding
-
设置为 时,出站消息由客户端库直接序列化,必须相应地配置(例如,设置适当的 Kafka 生产者值序列化器)。 使用此配置时,出站消息封送处理不基于绑定。 使用本机编码时,使用者有责任使用适当的解码器(例如,Kafka 使用者值反序列化器)来反序列化入站消息。 此外,当使用本机编码和解码时,该属性将被忽略,并且标头不会嵌入到消息中。 请参阅 consumer 属性 。
true
contentType
headerMode=embeddedHeaders
useNativeDecoding
违约:。
false
- errorChannelEnabled
-
当设置为 true 时,如果 Binders 支持异步发送结果,则发送失败将发送到目标的错误通道。有关更多信息,请参阅错误处理。
默认值:false。
高级 Producer 配置
在某些情况下,Producer Properties 不足以在 Binder 中正确配置生产 MessageHandler,或者您可能更喜欢编程方法
在配置此类生成 MessageHandler 时。无论出于何种原因,spring-cloud-stream都可以完成它。ProducerMessageHandlerCustomizer
@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {
/**
* Configure a {@link MessageHandler} that is being created by the binder for the
* provided destination name.
* @param handler the {@link MessageHandler} from the binder.
* @param destinationName the bound destination name.
*/
void configure(H handler, String destinationName);
}
如您所见,它允许您访问实际的 produce 实例,您可以根据需要对其进行配置。
您需要做的就是提供此策略的实现并将其配置为 .MessageHandler
@Bean