This version is still in development and is not considered stable yet. For the latest stable version, please use spring-cloud-stream 4.1.4! |
Binding Properties
Binding properties are supplied by using the format of spring.cloud.stream.bindings.<bindingName>.<property>=<value>
.
The <bindingName>
represents the name of the binding being configured.
For example, for the following function
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
there are two bindings named uppercase-in-0
for input and uppercase-out-0
for output. See [Binding and Binding names] for more details.
To avoid repetition, Spring Cloud Stream supports setting values for all bindings, in the format of spring.cloud.stream.default.<property>=<value>
and spring.cloud.stream.default.<producer|consumer>.<property>=<value> for common binding properties.
|
When it comes to avoiding repetitions for extended binding properties, this format should be used - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>
.
Common Binding Properties
These properties are exposed via org.springframework.cloud.stream.config.BindingProperties
The following binding properties are available for both input and output bindings and must be prefixed with spring.cloud.stream.bindings.<bindingName>.
(for example, spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock
).
Default values can be set by using the spring.cloud.stream.default
prefix (for example spring.cloud.stream.default.contentType=application/json
).
- destination
-
The target destination of a binding on the bound middleware (for example, the RabbitMQ exchange or Kafka topic). If binding represents a consumer binding (input), it could be bound to multiple destinations, and the destination names can be specified as comma-separated
String
values. If not, the actual binding name is used instead. The default value of this property cannot be overridden. - group
-
The consumer group of the binding. Applies only to inbound bindings. See Consumer Groups.
Default:
null
(indicating an anonymous consumer). - contentType
-
The content type of this binding. See
Content Type Negotiation
.Default:
application/json
. - binder
-
The binder used by this binding. See
Multiple Binders on the Classpath
for details.Default:
null
(the default binder is used, if it exists).
Consumer Properties
These properties are exposed via org.springframework.cloud.stream.binder.ConsumerProperties
The following binding properties are available for input bindings only and must be prefixed with spring.cloud.stream.bindings.<bindingName>.consumer.
(for example, spring.cloud.stream.bindings.input.consumer.concurrency=3
).
Default values can be set by using the spring.cloud.stream.default.consumer
prefix (for example, spring.cloud.stream.default.consumer.headerMode=none
).
- autoStartup
-
Signals if this consumer needs to be started automatically
Default:
true
. - concurrency
-
The concurrency of the inbound consumer.
Default:
1
. - partitioned
-
Whether the consumer receives data from a partitioned producer.
Default:
false
. - headerMode
-
When set to
none
, disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. This option is useful when consuming data from non-Spring Cloud Stream applications when native headers are not supported. When set toheaders
, it uses the middleware’s native header mechanism. When set toembeddedHeaders
, it embeds headers into the message payload.Default: depends on the binder implementation.
- maxAttempts
-
If processing fails, the number of attempts to process the message (including the first). Set to
1
to disable retry.Default:
3
. - backOffInitialInterval
-
The backoff initial interval on retry.
Default:
1000
. - backOffMaxInterval
-
The maximum backoff interval.
Default:
10000
. - backOffMultiplier
-
The backoff multiplier.
Default:
2.0
. - defaultRetryable
-
Whether exceptions thrown by the listener that are not listed in the
retryableExceptions
are retryable.Default:
true
. - instanceCount
-
When set to a value greater than equal to zero, it allows customizing the instance count of this consumer (if different from
spring.cloud.stream.instanceCount
). When set to a negative value, it defaults tospring.cloud.stream.instanceCount
. SeeInstance Index and Instance Count
for more information.Default:
-1
. - instanceIndex
-
When set to a value greater than equal to zero, it allows customizing the instance index of this consumer (if different from
spring.cloud.stream.instanceIndex
). When set to a negative value, it defaults tospring.cloud.stream.instanceIndex
. Ignored ifinstanceIndexList
is provided. SeeInstance Index and Instance Count
for more information.Default:
-1
. - instanceIndexList
-
Used with binders that do not support native partitioning (such as RabbitMQ); allows an application instance to consume from more than one partition.
Default: empty.
- retryableExceptions
-
A map of Throwable class names in the key and a boolean in the value. Specify those exceptions (and subclasses) that will or won’t be retried. Also see
defaultRetriable
. Example:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
.Default: empty.
- useNativeDecoding
-
When set to
true
, the inbound message is deserialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value deserializer). When this configuration is being used, the inbound message unmarshalling is not based on thecontentType
of the binding. When native decoding is used, it is the responsibility of the producer to use an appropriate encoder (for example, the Kafka producer value serializer) to serialize the outbound message. Also, when native encoding and decoding is used, theheaderMode=embeddedHeaders
property is ignored and headers are not embedded in the message. See the producer propertyuseNativeEncoding
.Default:
false
. - multiplex
-
When set to true, the underlying binder will natively multiplex destinations on the same input binding.
Default:
false
.
Advanced Consumer Configuration
For advanced configuration of the underlying message listener container for message-driven consumers, add a single ListenerContainerCustomizer
bean to the application context.
It will be invoked after the above properties have been applied and can be used to set additional properties.
Similarly, for polled consumers, add a MessageSourceCustomizer
bean.
The following is an example for the RabbitMQ binder:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}
@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}
Producer Properties
These properties are exposed via org.springframework.cloud.stream.binder.ProducerProperties
The following binding properties are available for output bindings only and must be prefixed with spring.cloud.stream.bindings.<bindingName>.producer.
(for example, spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id
).
Default values can be set by using the prefix spring.cloud.stream.default.producer
(for example, spring.cloud.stream.default.producer.partitionKeyExpression=headers.id
).
- autoStartup
-
Signals if this consumer needs to be started automatically
Default:
true
. - partitionKeyExpression
-
A SpEL expression that determines how to partition outbound data. If set, outbound data on this binding is partitioned.
partitionCount
must be set to a value greater than 1 to be effective. SeePartitioning
.Default: null.
- partitionKeyExtractorName
-
The name of the bean that implements
PartitionKeyExtractorStrategy
. Used to extract a key used to compute the partition id (see 'partitionSelector*'). Mutually exclusive with 'partitionKeyExpression'.Default: null.
- partitionSelectorName
-
The name of the bean that implements
PartitionSelectorStrategy
. Used to determine partition id based on partition key (see 'partitionKeyExtractor*'). Mutually exclusive with 'partitionSelectorExpression'.Default: null.
- partitionSelectorExpression
-
A SpEL expression for customizing partition selection. If neither is set, the partition is selected as the
hashCode(key) % partitionCount
, wherekey
is computed through eitherpartitionKeyExpression
.Default:
null
. - partitionCount
-
The number of target partitions for the data, if partitioning is enabled. Must be set to a value greater than 1 if the producer is partitioned. On Kafka, it is interpreted as a hint. The larger of this and the partition count of the target topic is used instead.
Default:
1
. - requiredGroups
-
A comma-separated list of groups to which the producer must ensure message delivery even if they start after it has been created (for example, by pre-creating durable queues in RabbitMQ).
- headerMode
-
When set to
none
, it disables header embedding on output. It is effective only for messaging middleware that does not support message headers natively and requires header embedding. This option is useful when producing data for non-Spring Cloud Stream applications when native headers are not supported. When set toheaders
, it uses the middleware’s native header mechanism. When set toembeddedHeaders
, it embeds headers into the message payload.Default: Depends on the binder implementation.
- useNativeEncoding
-
When set to
true
, the outbound message is serialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on thecontentType
of the binding. When native encoding is used, it is the responsibility of the consumer to use an appropriate decoder (for example, the Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding and decoding is used, theheaderMode=embeddedHeaders
property is ignored and headers are not embedded in the message. See the consumer propertyuseNativeDecoding
.Default:
false
. - errorChannelEnabled
-
When set to true, if the binder supports asynchroous send results, send failures are sent to an error channel for the destination. See Error Handling for more information.
Default: false.
Advanced Producer Configuration
In some cases Producer Properties are not enough to properly configure a producing MessageHandler in the binder, or may be you prefer a programmatic approach
while configuring such producing MessageHandler. Regardless of the reason, spring-cloud-stream provides ProducerMessageHandlerCustomizer
to accomplish it.
@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {
/**
* Configure a {@link MessageHandler} that is being created by the binder for the
* provided destination name.
* @param handler the {@link MessageHandler} from the binder.
* @param destinationName the bound destination name.
*/
void configure(H handler, String destinationName);
}
As you can see it gives you access to an actual instance of producing MessageHandler
which you can configure as you wish.
All you need to do is provide implementation of this strategy and configure it as a @Bean
.