Applications
5. Sources
5.1. CDC Source
Change Data Capture (CDC) source
that captures and streams change events from various databases.
Currently, it supports MySQL
, PostgreSQL
, MongoDB
, Oracle
and SQL Server
databases.
Build upon Debezium Embedded Connector, the CDC Source
allows capturing and streaming database changes over different message binders such Apache Kafka, RabbitMQ and all Spring Cloud Stream supporter brokers.
It supports all Debezium configuration properties. Just add the cdc.config.
prefix to the existing Debezium properties. For example to set the Debezium’s connector.class
property use the cdc.config.connector.class
source property instead.
We provide convenient shortcuts for the most frequently used Debezium properties. For example instead of the long cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector
Debezium property you can use our cdc.connector=mysql
shortcut. The table below lists all available shortcuts along with the Debezium properties they represent.
The Debezium properties (e.g. cdc.config.XXX
) always have precedence over the shortcuts!
The CDC Source introduces a new default BackingOffsetStore
configuration, based on the MetadataStore service. Later provides various microservices friendly ways for storing the offset metadata.
5.1.1. Options
Properties grouped by prefix:
cdc
- config
-
Spring pass-trough wrapper for debezium configuration properties. All properties with a 'cdc.config.' prefix are native Debezium properties. The prefix is removed, converting them into Debezium io.debezium.config.Configuration. (Map<String, String>, default:
<none>
) - connector
-
Shortcut for the cdc.config.connector.class property. Either of those can be used as long as they do not contradict with each other. (ConnectorType, default:
<none>
, possible values:mysql
,postgres
,mongodb
,oracle
,sqlserver
) - name
-
Unique name for this sourceConnector instance. (String, default:
<none>
) - schema
-
Include the schema's as part of the outbound message. (Boolean, default:
false
)
cdc.flattening
- add-fields
-
Comma separated list of metadata fields to add to the flattened message. The fields will be prefixed with "__" or "__[<]struct]__", depending on the specification of the struct. (String, default:
<none>
) - add-headers
-
Comma separated list specify a list of metadata fields to add to the header of the flattened message. The fields will be prefixed with "__" or "__[struct]__". (String, default:
<none>
) - delete-handling-mode
-
Options for handling deleted records: (1) none - pass the records through, (2) drop - remove the records and (3) rewrite - add a '__deleted' field to the records. (DeleteHandlingMode, default:
<none>
, possible values:drop
,rewrite
,none
) - drop-tombstones
-
By default Debezium generates tombstone records to enable Kafka compaction on deleted records. The dropTombstones can suppress the tombstone records. (Boolean, default:
true
) - enabled
-
Enable flattening the source record events (https://debezium.io/docs/configuration/event-flattening). (Boolean, default:
true
)
cdc.offset
- commit-timeout
-
Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. (Duration, default:
5000ms
) - flush-interval
-
Interval at which to try committing offsets. The default is 1 minute. (Duration, default:
60000ms
) - policy
-
Offset storage commit policy. (OffsetPolicy, default:
<none>
) - storage
-
Kafka connector tracks the number processed records and regularly stores the count (as "offsets") in a preconfigured metadata storage. On restart the connector resumes the reading from the last recorded source offset. (OffsetStorageType, default:
<none>
, possible values:memory
,file
,kafka
,metadata
)
cdc.stream.header
- convert-connect-headers
-
When true the {@link org.apache.kafka.connect.header.Header} are converted into message headers with the {@link org.apache.kafka.connect.header.Header#key()} as name and {@link org.apache.kafka.connect.header.Header#value()}. (Boolean, default:
true
) - offset
-
Serializes the source record's offset metadata into the outbound message header under cdc.offset. (Boolean, default:
false
)
metadata.store.dynamo-db
- create-delay
-
Delay between create table retries. (Integer, default:
1
) - create-retries
-
Retry number for create table request. (Integer, default:
25
) - read-capacity
-
Read capacity on the table. (Long, default:
1
) - table
-
Table name for metadata. (String, default:
<none>
) - time-to-live
-
TTL for table entries. (Integer, default:
<none>
) - write-capacity
-
Write capacity on the table. (Long, default:
1
)
metadata.store.jdbc
- region
-
Unique grouping identifier for messages persisted with this store. (String, default:
DEFAULT
) - table-prefix
-
Prefix for the custom table name. (String, default:
<none>
)
metadata.store.mongo-db
- collection
-
MongoDB collection name for metadata. (String, default:
metadataStore
)
metadata.store
- type
-
Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default:
<none>
, possible values:mongodb
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
metadata.store.zookeeper
- connect-string
-
Zookeeper connect string in form HOST:PORT. (String, default:
127.0.0.1:2181
) - encoding
-
Encoding to use when storing data in Zookeeper. (Charset, default:
UTF-8
) - retry-interval
-
Retry interval for Zookeeper operations in milliseconds. (Integer, default:
1000
) - root
-
Root node - store entries are children of this node. (String, default:
/SpringIntegration-MetadataStore
)
Debezium property Shortcut mapping
The table below lists all available shortcuts along with the Debezium properties they represent.
Shortcut | Original | Description |
---|---|---|
cdc.connector |
cdc.config.connector.class |
|
cdc.name |
cdc.config.name |
|
cdc.offset.flush-interval |
cdc.config.offset.flush.interval.ms |
|
cdc.offset.commit-timeout |
cdc.config.offset.flush.timeout.ms |
|
cdc.offset.policy |
cdc.config.offset.commit.policy |
|
cdc.offset.storage |
cdc.config.offset.storage |
|
cdc.flattening.drop-tombstones |
cdc.config.drop.tombstones |
|
cdc.flattening.delete-handling-mode |
cdc.config.delete.handling.mode |
|
5.1.2. Database Support
The CDC Source
uses the Debezium utilities, and currently supports CDC for five datastores: MySQL
, PostgreSQL
, MongoDB
, Oracle
and SQL Server
databases.
5.1.3. Examples and Testing
The [CdcSourceIntegrationTest](), [CdcDeleteHandlingIntegrationTest]() and [CdcFlatteningIntegrationTest]() integration tests use test databases fixtures, running on the local machine.
We use pre-build debezium docker database images.
The Maven builds create the test databases fixtures with the help of the docker-maven-plugin
.
To run and debug the tests from your IDE you need to deploy the required database images from the command line. Instructions below explains how to run pre-configured test databases form Docker images.
MySQL
Start the debezium/example-mysql
in a docker:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0
(optional) Use
|
Use following properties to connect the CDC Source to MySQL DB:
cdc.connector=mysql (1)
cdc.name=my-sql-connector (2)
cdc.config.database.server.id=85744 (2)
cdc.config.database.server.name=my-app-connector (2)
cdc.config.database.user=debezium (3)
cdc.config.database.password=dbz (3)
cdc.config.database.hostname=localhost (3)
cdc.config.database.port=3306 (3)
cdc.schema=true (4)
cdc.flattening.enabled=true (5)
1 | Configures the CDC Source to use MySqlConnector. (equivalent to setting cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector ). |
2 | Metadata used to identify and dispatch the incoming events. |
3 | Connection to the MySQL server running on localhost:3306 as debezium user. |
4 | Includes the Change Event Value schema in the SourceRecord events. |
5 | Enables the CDC Event Flattening. |
You can run also the CdcSourceIntegrationTests#CdcMysqlTests
using this mysql configuration.
PostgreSQL
Start a pre-configured postgres server from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0
You can connect to this server like this:
psql -U postgres -h localhost -p 5432
Use following properties to connect the CDC Source to PostgreSQL:
cdc.connector=postgres (1)
cdc.offset.storage=memory (2)
cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)
cdc.config.database.user=postgres (4)
cdc.config.database.password=postgres (4)
cdc.config.database..dbname=postgres (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=5432 (4)
cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 | Configures CDC Source to use PostgresConnector. Equivalent for setting cdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector . |
2 | Configures the Debezium engine to use memory (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store. |
3 | Metadata used to identify and dispatch the incoming events. |
4 | Connection to the PostgreSQL server running on localhost:5432 as postgres user. |
5 | Includes the Change Event Value schema in the SourceRecord events. |
6 | Enables the CDC Event Flattening. |
You can run also the CdcSourceIntegrationTests#CdcPostgresTests
using this mysql configuration.
MongoDB
Start a pre-configured mongodb from the debezium/example-mongodb:0.10
Docker image:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:0.10
Initialize the inventory collections
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
In the mongodb
terminal output, search for a log entry like host: "3f95a8a6516e:27017"
:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
Add 127.0.0.1 3f95a8a6516e
entry to your /etc/hosts
Use following properties to connect the CDC Source to MongoDB:
cdc.connector=mongodb (1)
cdc.offset.storage=memory (2)
cdc.config.mongodb.hosts=rs0/localhost:27017 (3)
cdc.config.mongodb.name=dbserver1 (3)
cdc.config.mongodb.user=debezium (3)
cdc.config.mongodb.password=dbz (3)
cdc.config.database.whitelist=inventory (3)
cdc.config.tasks.max=1 (4)
cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 | Configures CDC Source to use MongoDB Connector. This maps into cdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector . |
2 | Configures the Debezium engine to use memory (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store. |
3 | Connection to the MongoDB running on localhost:27017 as debezium user. |
4 | debezium.io/docs/connectors/mongodb/#tasks |
5 | Includes the Change Event Value schema in the SourceRecord events. |
6 | Enables the CDC Event Flattening. |
You can run also the CdcSourceIntegrationTests#CdcPostgresTests
using this mysql configuration.
SQL Server
Start a sqlserver
from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
Populate with sample data form debezium’s sqlserver tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
Use following properties to connect the CDC Source to SQLServer:
cdc.connector=sqlserver (1)
cdc.offset.storage=memory (2)
cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)
cdc.config.database.user=sa (4)
cdc.config.database.password=Password! (4)
cdc.config.database..dbname=testDB (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=1433 (4)
1 | Configures CDC Source to use SqlServerConnector. Equivalent for setting cdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector . |
2 | Configures the Debezium engine to use memory (e.g. `cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore) backing offset store. |
3 | Metadata used to identify and dispatch the incoming events. |
4 | Connection to the SQL Server running on localhost:1433 as sa user. |
You can run also the CdcSourceIntegrationTests#CdcSqlServerTests
using this mysql configuration.
Oracle
Start Oracle reachable from localhost and set up with the configuration, users and grants described in the Debezium Vagrant set-up
Populate with sample data form Debezium’s Oracle tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. File Source
This application polls a directory and sends new files or their contents to the output channel. The file source provides the contents of a File as a byte array by default. However, this can be customized using the --file.supplier.mode option:
-
ref Provides a java.io.File reference
-
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --file.supplier.mode=lines
, you can also provide the additional option --file.supplier.withMarkers=true
.
If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers defaults to false if not explicitly set.
5.2.1. Options
The file source has the following options:
Properties grouped by prefix:
file.consumer
- markers-json
-
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - mode
-
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values:ref
,lines
,contents
) - with-markers
-
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
)
file.supplier
- delay-when-empty
-
Duration of delay when no new files are detected. (Duration, default:
1s
) - directory
-
The directory to poll for new files. (File, default:
<none>
) - filename-pattern
-
A simple ant pattern to match files. (String, default:
<none>
) - filename-regex
-
A regex pattern to match files. (Pattern, default:
<none>
) - prevent-duplicates
-
Set to true to include an AcceptOnceFileListFilter which prevents duplicates. (Boolean, default:
true
)
metadata.store.dynamo-db
- create-delay
-
Delay between create table retries. (Integer, default:
1
) - create-retries
-
Retry number for create table request. (Integer, default:
25
) - read-capacity
-
Read capacity on the table. (Long, default:
1
) - table
-
Table name for metadata. (String, default:
<none>
) - time-to-live
-
TTL for table entries. (Integer, default:
<none>
) - write-capacity
-
Write capacity on the table. (Long, default:
1
)
metadata.store.jdbc
- region
-
Unique grouping identifier for messages persisted with this store. (String, default:
DEFAULT
) - table-prefix
-
Prefix for the custom table name. (String, default:
<none>
)
metadata.store.mongo-db
- collection
-
MongoDB collection name for metadata. (String, default:
metadataStore
)
metadata.store
- type
-
Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default:
<none>
, possible values:mongodb
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
metadata.store.zookeeper
- connect-string
-
Zookeeper connect string in form HOST:PORT. (String, default:
127.0.0.1:2181
) - encoding
-
Encoding to use when storing data in Zookeeper. (Charset, default:
UTF-8
) - retry-interval
-
Retry interval for Zookeeper operations in milliseconds. (Integer, default:
1000
) - root
-
Root node - store entries are children of this node. (String, default:
/SpringIntegration-MetadataStore
)
5.3. FTP Source
This source application supports transfer of files using the FTP protocol.
Files are transferred from the remote
directory to the local
directory where the app is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
-
ref Provides a
java.io.File
reference -
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.
5.3.2. Output
mode = contents
5.3.3. Options
The ftp source has the following options:
Properties grouped by prefix:
file.consumer
- markers-json
-
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - mode
-
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values:ref
,lines
,contents
) - with-markers
-
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
)
ftp.factory
- cache-sessions
-
Cache sessions. (Boolean, default:
<none>
) - client-mode
-
The client mode to use for the FTP session. (ClientMode, default:
<none>
, possible values:ACTIVE
,PASSIVE
) - host
-
The host name of the server. (String, default:
localhost
) - password
-
The password to use to connect to the server. (String, default:
<none>
) - port
-
The port of the server. (Integer, default:
21
) - username
-
The username to use to connect to the server. (String, default:
<none>
)
ftp.supplier
- auto-create-local-dir
-
Set to true to create the local directory if it does not exist. (Boolean, default:
true
) - delay-when-empty
-
Duration of delay when no new files are detected. (Duration, default:
1s
) - delete-remote-files
-
Set to true to delete remote files after successful transfer. (Boolean, default:
false
) - filename-pattern
-
A filter pattern to match the names of files to transfer. (String, default:
<none>
) - filename-regex
-
A filter regex pattern to match the names of files to transfer. (Pattern, default:
<none>
) - local-dir
-
The local directory to use for file transfers. (File, default:
<none>
) - preserve-timestamp
-
Set to true to preserve the original timestamp. (Boolean, default:
true
) - remote-dir
-
The remote FTP directory. (String, default:
/
) - remote-file-separator
-
The remote file separator. (String, default:
/
) - tmp-file-suffix
-
The suffix to use while the transfer is in progress. (String, default:
.tmp
)
metadata.store.dynamo-db
- create-delay
-
Delay between create table retries. (Integer, default:
1
) - create-retries
-
Retry number for create table request. (Integer, default:
25
) - read-capacity
-
Read capacity on the table. (Long, default:
1
) - table
-
Table name for metadata. (String, default:
<none>
) - time-to-live
-
TTL for table entries. (Integer, default:
<none>
) - write-capacity
-
Write capacity on the table. (Long, default:
1
)
metadata.store.jdbc
- region
-
Unique grouping identifier for messages persisted with this store. (String, default:
DEFAULT
) - table-prefix
-
Prefix for the custom table name. (String, default:
<none>
)
metadata.store.mongo-db
- collection
-
MongoDB collection name for metadata. (String, default:
metadataStore
)
metadata.store
- type
-
Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default:
<none>
, possible values:mongodb
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
metadata.store.zookeeper
- connect-string
-
Zookeeper connect string in form HOST:PORT. (String, default:
127.0.0.1:2181
) - encoding
-
Encoding to use when storing data in Zookeeper. (Charset, default:
UTF-8
) - retry-interval
-
Retry interval for Zookeeper operations in milliseconds. (Integer, default:
1000
) - root
-
Root node - store entries are children of this node. (String, default:
/SpringIntegration-MetadataStore
)
5.3.4. Examples
java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
--ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo
Unresolved directive in sources.adoc - include::https://raw.githubusercontent.com/spring-cloud/stream-applications/main/applications/source/geode-source/README.adoc[tags=ref-doc]
5.4. Http Source
A source application that listens for HTTP requests and emits the body as a message payload.
If the Content-Type matches text/*
or application/json
, the payload will be a String,
otherwise the payload will be a byte array.
Payload:
If content type matches text/*
or application/json
-
String
If content type does not match text/*
or application/json
-
byte array
5.4.2. Options
The http source supports the following configuration properties:
Properties grouped by prefix:
http.cors
- allow-credentials
-
Whether the browser should include any cookies associated with the domain of the request being annotated. (Boolean, default:
<none>
) - allowed-headers
-
List of request headers that can be used during the actual request. (String[], default:
<none>
) - allowed-origins
-
List of allowed origins, e.g. https://domain1.com. (String[], default:
<none>
)
5.5. JDBC Source
This source polls data from an RDBMS.
This source is fully based on the DataSourceAutoConfiguration
, so refer to the Spring Boot JDBC Support for more information.
Payload
-
Map<String, Object>
whenjdbc.split == true
(default) andList<Map<String, Object>>
otherwise
5.5.2. Options
The jdbc source has the following options:
Properties grouped by prefix:
jdbc.supplier
- max-rows
-
Max numbers of rows to process for query. (Integer, default:
0
) - query
-
The query to use to select data. (String, default:
<none>
) - split
-
Whether to split the SQL result as individual messages. (Boolean, default:
true
) - update
-
An SQL update statement to execute for marking polled messages as 'seen'. (String, default:
<none>
)
spring.datasource
- driver-class-name
-
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - password
-
Login password of the database. (String, default:
<none>
) - url
-
JDBC URL of the database. (String, default:
<none>
) - username
-
Login username of the database. (String, default:
<none>
)
spring.integration.poller
- cron
-
Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default:
<none>
) - fixed-delay
-
Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default:
<none>
) - fixed-rate
-
Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default:
<none>
) - initial-delay
-
Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default:
<none>
) - max-messages-per-poll
-
Maximum number of messages to poll per polling cycle. (Integer, default:
<none>
) - receive-timeout
-
How long to wait for messages on poll. (Duration, default:
1s
)
Also see the Spring Boot Documentation
for addition DataSource
properties and TriggerProperties
and MaxMessagesProperties
for polling options.
5.6. JMS Source
The JMS source enables receiving messages from JMS.
5.6.1. Options
The JMS source has the following options:
Properties grouped by prefix:
jms.supplier
- client-id
-
Client id for durable subscriptions. (String, default:
<none>
) - destination
-
The destination from which to receive messages (queue or topic). (String, default:
<none>
) - message-selector
-
A selector for messages. (String, default:
<none>
) - session-transacted
-
True to enable transactions and select a DefaultMessageListenerContainer, false to select a SimpleMessageListenerContainer. (Boolean, default:
true
) - subscription-durable
-
True for a durable subscription. (Boolean, default:
<none>
) - subscription-name
-
The name of a durable or shared subscription. (String, default:
<none>
) - subscription-shared
-
True for a shared subscription. (Boolean, default:
<none>
)
spring.jms
- jndi-name
-
Connection factory JNDI name. When set, takes precedence to others connection factory auto-configurations. (String, default:
<none>
) - pub-sub-domain
-
Whether the default destination type is topic. (Boolean, default:
false
)
spring.jms.listener
- acknowledge-mode
-
Acknowledge mode of the container. By default, the listener is transacted with automatic acknowledgment. (AcknowledgeMode, default:
<none>
, possible values:AUTO
,CLIENT
,DUPS_OK
) - auto-startup
-
Start the container automatically on startup. (Boolean, default:
true
) - concurrency
-
Minimum number of concurrent consumers. (Integer, default:
<none>
) - max-concurrency
-
Maximum number of concurrent consumers. (Integer, default:
<none>
) - receive-timeout
-
Timeout to use for receive calls. Use -1 for a no-wait receive or 0 for no timeout at all. The latter is only feasible if not running within a transaction manager and is generally discouraged since it prevents clean shutdown. (Duration, default:
1s
)
5.7. Load Generator Source
A source that sends generated data and dispatches it to the stream.
5.7.1. Options
The load-generator source has the following options:
- load-generator.generate-timestamp
-
Whether timestamp generated. (Boolean, default:
false
) - load-generator.message-count
-
Message count. (Integer, default:
1000
) - load-generator.message-size
-
Message size. (Integer, default:
1000
) - load-generator.producers
-
Number of producers. (Integer, default:
1
)
5.8. Mail Source
A source application that listens for Emails and emits the message body as a message payload.
5.8.1. Options
The mail source has the following options:
- mail.supplier.charset
-
The charset for byte[] mail-to-string transformation. (String, default:
UTF-8
) - mail.supplier.delete
-
Set to true to delete email after download. (Boolean, default:
false
) - mail.supplier.expression
-
Configure a SpEL expression to select messages. (String, default:
true
) - mail.supplier.idle-imap
-
Set to true to use IdleImap Configuration. (Boolean, default:
false
) - mail.supplier.java-mail-properties
-
JavaMail properties as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - mail.supplier.mark-as-read
-
Set to true to mark email as read. (Boolean, default:
false
) - mail.supplier.url
-
Mail connection URL for connection to Mail server e.g. 'imaps://username:[email protected]:993/Inbox'. (URLName, default:
<none>
) - mail.supplier.user-flag
-
The flag to mark messages when the server does not support \Recent. (String, default:
<none>
)
5.9. MongoDB Source
This source polls data from MongoDB.
This source is fully based on the MongoDataAutoConfiguration
, so refer to the
Spring Boot MongoDB Support
for more information.
5.9.1. Options
The mongodb source has the following options:
Properties grouped by prefix:
mongodb.supplier
- collection
-
The MongoDB collection to query. (String, default:
<none>
) - query
-
The MongoDB query. (String, default:
{ }
) - query-expression
-
The SpEL expression in MongoDB query DSL style. (Expression, default:
<none>
) - split
-
Whether to split the query result as individual messages. (Boolean, default:
true
) - update-expression
-
The SpEL expression in MongoDB update DSL style. (Expression, default:
<none>
)
spring.data.mongodb
- additional-hosts
-
Additional server hosts. Cannot be set with URI or if 'host' is not specified. Additional hosts will use the default mongo port of 27017, if you want to use a different port you can use the "host:port" syntax. (List<String>, default:
<none>
) - authentication-database
-
Authentication database name. (String, default:
<none>
) - auto-index-creation
-
Whether to enable auto-index creation. (Boolean, default:
<none>
) - database
-
Database name. (String, default:
<none>
) - field-naming-strategy
-
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default:
<none>
) - host
-
Mongo server host. Cannot be set with URI. (String, default:
<none>
) - password
-
Login password of the mongo server. Cannot be set with URI. (Character[], default:
<none>
) - port
-
Mongo server port. Cannot be set with URI. (Integer, default:
<none>
) - replica-set-name
-
Required replica set name for the cluster. Cannot be set with URI. (String, default:
<none>
) - uri
-
Mongo database URI. Overrides host, port, username, password, and database. (String, default:
mongodb://localhost/test
) - username
-
Login user of the mongo server. Cannot be set with URI. (String, default:
<none>
) - uuid-representation
-
Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default:
java-legacy
, possible values:UNSPECIFIED
,STANDARD
,C_SHARP_LEGACY
,JAVA_LEGACY
,PYTHON_LEGACY
)
Also see the Spring Boot Documentation for additional MongoProperties
properties.
See and TriggerProperties
for polling options.
5.10. MQTT Source
Source that enables receiving messages from MQTT.
5.10.2. Options
The mqtt source has the following options:
Properties grouped by prefix:
mqtt
- clean-session
-
whether the client and server should remember state across restarts and reconnects. (Boolean, default:
true
) - connection-timeout
-
the connection timeout in seconds. (Integer, default:
30
) - keep-alive-interval
-
the ping interval in seconds. (Integer, default:
60
) - password
-
the password to use when connecting to the broker. (String, default:
guest
) - persistence
-
'memory' or 'file'. (String, default:
memory
) - persistence-directory
-
Persistence directory. (String, default:
/tmp/paho
) - ssl-properties
-
MQTT Client SSL properties. (Map<String, String>, default:
<none>
) - url
-
location of the mqtt broker(s) (comma-delimited list). (String[], default:
[tcp://localhost:1883]
) - username
-
the username to use when connecting to the broker. (String, default:
guest
)
mqtt.supplier
- binary
-
true to leave the payload as bytes. (Boolean, default:
false
) - charset
-
the charset used to convert bytes to String (when binary is false). (String, default:
UTF-8
) - client-id
-
identifies the client. (String, default:
stream.client.id.source
) - qos
-
the qos; a single value for all topics or a comma-delimited list to match the topics. (Integer[], default:
[0]
) - topics
-
the topic(s) (comma-delimited) to which the source will subscribe. (String[], default:
[stream.mqtt]
)
5.11. RabbitMQ Source
The "rabbit" source enables receiving messages from RabbitMQ.
The queue(s) must exist before the stream is deployed; they are not created automatically. You can easily create a Queue using the RabbitMQ web UI.
5.11.3. Options
The rabbit source has the following options:
Properties grouped by prefix:
rabbit.supplier
- enable-retry
-
true to enable retry. (Boolean, default:
false
) - initial-retry-interval
-
Initial retry interval when retry is enabled. (Integer, default:
1000
) - mapped-request-headers
-
Headers that will be mapped. (String[], default:
[STANDARD_REQUEST_HEADERS]
) - max-attempts
-
The maximum delivery attempts when retry is enabled. (Integer, default:
3
) - max-retry-interval
-
Max retry interval when retry is enabled. (Integer, default:
30000
) - own-connection
-
When true, use a separate connection based on the boot properties. (Boolean, default:
false
) - queues
-
The queues to which the source will listen for messages. (String[], default:
<none>
) - requeue
-
Whether rejected messages should be requeued. (Boolean, default:
true
) - retry-multiplier
-
Retry backoff multiplier when retry is enabled. (Double, default:
2
) - transacted
-
Whether the channel is transacted. (Boolean, default:
false
)
spring.rabbitmq
- address-shuffle-mode
-
Mode used to shuffle configured addresses. (AddressShuffleMode, default:
none
, possible values:NONE
,RANDOM
,INORDER
) - addresses
-
Comma-separated list of addresses to which the client should connect. When set, the host and port are ignored. (String, default:
<none>
) - channel-rpc-timeout
-
Continuation timeout for RPC calls in channels. Set it to zero to wait forever. (Duration, default:
10m
) - connection-timeout
-
Connection timeout. Set it to zero to wait forever. (Duration, default:
<none>
) - host
-
RabbitMQ host. Ignored if an address is set. (String, default:
localhost
) - password
-
Login to authenticate against the broker. (String, default:
guest
) - port
-
RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is enabled. (Integer, default:
<none>
) - publisher-confirm-type
-
Type of publisher confirms to use. (ConfirmType, default:
<none>
, possible values:SIMPLE
,CORRELATED
,NONE
) - publisher-returns
-
Whether to enable publisher returns. (Boolean, default:
false
) - requested-channel-max
-
Number of channels per connection requested by the client. Use 0 for unlimited. (Integer, default:
2047
) - requested-heartbeat
-
Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default:
<none>
) - username
-
Login user to authenticate to the broker. (String, default:
guest
) - virtual-host
-
Virtual host to use when connecting to the broker. (String, default:
<none>
)
Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.
A Note About Retry
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried
indefinitely.
Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless
the downstream Binder is not connected for some reason.
Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter
Exchange/Queue if the broker is so configured).
The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and
eventually discarded (or dead-lettered) when retries are exhausted.
The delivery thread is suspended during the retry interval(s).
Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval.
Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message
could not be converted on the first attempt, subsequent attempts will also fail.
Such messages are discarded (or dead-lettered).
|
5.12. Amazon S3 Source
This source app supports transfer of files using the Amazon S3 protocol.
Files are transferred from the remote
directory (S3 bucket) to the local
directory where the application is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
-
ref Provides a
java.io.File
reference -
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.
mode = lines
5.12.3. Options
The s3 source has the following options:
Properties grouped by prefix:
file.consumer
- markers-json
-
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - mode
-
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values:ref
,lines
,contents
) - with-markers
-
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
)
metadata.store.dynamo-db
- create-delay
-
Delay between create table retries. (Integer, default:
1
) - create-retries
-
Retry number for create table request. (Integer, default:
25
) - read-capacity
-
Read capacity on the table. (Long, default:
1
) - table
-
Table name for metadata. (String, default:
<none>
) - time-to-live
-
TTL for table entries. (Integer, default:
<none>
) - write-capacity
-
Write capacity on the table. (Long, default:
1
)
metadata.store.jdbc
- region
-
Unique grouping identifier for messages persisted with this store. (String, default:
DEFAULT
) - table-prefix
-
Prefix for the custom table name. (String, default:
<none>
)
metadata.store.mongo-db
- collection
-
MongoDB collection name for metadata. (String, default:
metadataStore
)
metadata.store
- type
-
Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default:
<none>
, possible values:mongodb
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
metadata.store.zookeeper
- connect-string
-
Zookeeper connect string in form HOST:PORT. (String, default:
127.0.0.1:2181
) - encoding
-
Encoding to use when storing data in Zookeeper. (Charset, default:
UTF-8
) - retry-interval
-
Retry interval for Zookeeper operations in milliseconds. (Integer, default:
1000
) - root
-
Root node - store entries are children of this node. (String, default:
/SpringIntegration-MetadataStore
)
s3.common
- endpoint-url
-
Optional endpoint url to connect to s3 compatible storage. (String, default:
<none>
) - path-style-access
-
Use path style access. (Boolean, default:
false
)
s3.supplier
- auto-create-local-dir
-
Create or not the local directory. (Boolean, default:
true
) - delete-remote-files
-
Delete or not remote files after processing. (Boolean, default:
false
) - filename-pattern
-
The pattern to filter remote files. (String, default:
<none>
) - filename-regex
-
The regexp to filter remote files. (Pattern, default:
<none>
) - list-only
-
Set to true to return s3 object metadata without copying file to a local directory. (Boolean, default:
false
) - local-dir
-
The local directory to store files. (File, default:
<none>
) - preserve-timestamp
-
To transfer or not the timestamp of the remote file to the local one. (Boolean, default:
true
) - remote-dir
-
AWS S3 bucket resource. (String, default:
bucket
) - remote-file-separator
-
Remote File separator. (String, default:
/
) - tmp-file-suffix
-
Temporary file suffix. (String, default:
.tmp
)
5.12.4. Amazon AWS common options
The Amazon S3 Source (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.
Some of them are about AWS credentials:
-
cloud.aws.credentials.accessKey
-
cloud.aws.credentials.secretKey
-
cloud.aws.credentials.instanceProfile
-
cloud.aws.credentials.profileName
-
cloud.aws.credentials.profilePath
Other are for AWS Region
definition:
-
cloud.aws.region.auto
-
cloud.aws.region.static
And for AWS Stack
:
-
cloud.aws.stack.auto
-
cloud.aws.stack.name
5.13. SFTP Source
This source application supports transfer of files using the SFTP protocol.
Files are transferred from the remote
directory to the local
directory where the app is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
-
ref Provides a
java.io.File
reference -
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
See sftp-supplier
for advanced configuration options.
See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.
5.13.2. Output
mode = contents
mode = lines
mode = ref
5.13.3. Options
The ftp source has the following options:
Properties grouped by prefix:
file.consumer
- markers-json
-
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
) - mode
-
The FileReadingMode to use for file reading sources. Values are 'ref' - The File object, 'lines' - a message per line, or 'contents' - the contents as bytes. (FileReadingMode, default:
<none>
, possible values:ref
,lines
,contents
) - with-markers
-
Set to true to emit start of file/end of file marker messages before/after the data. Only valid with FileReadingMode 'lines'. (Boolean, default:
<none>
)
metadata.store.dynamo-db
- create-delay
-
Delay between create table retries. (Integer, default:
1
) - create-retries
-
Retry number for create table request. (Integer, default:
25
) - read-capacity
-
Read capacity on the table. (Long, default:
1
) - table
-
Table name for metadata. (String, default:
<none>
) - time-to-live
-
TTL for table entries. (Integer, default:
<none>
) - write-capacity
-
Write capacity on the table. (Long, default:
1
)
metadata.store.jdbc
- region
-
Unique grouping identifier for messages persisted with this store. (String, default:
DEFAULT
) - table-prefix
-
Prefix for the custom table name. (String, default:
<none>
)
metadata.store.mongo-db
- collection
-
MongoDB collection name for metadata. (String, default:
metadataStore
)
metadata.store
- type
-
Indicates the type of metadata store to configure (default is 'memory'). You must include the corresponding Spring Integration dependency to use a persistent store. (StoreType, default:
<none>
, possible values:mongodb
,redis
,dynamodb
,jdbc
,zookeeper
,hazelcast
,memory
)
metadata.store.zookeeper
- connect-string
-
Zookeeper connect string in form HOST:PORT. (String, default:
127.0.0.1:2181
) - encoding
-
Encoding to use when storing data in Zookeeper. (Charset, default:
UTF-8
) - retry-interval
-
Retry interval for Zookeeper operations in milliseconds. (Integer, default:
1000
) - root
-
Root node - store entries are children of this node. (String, default:
/SpringIntegration-MetadataStore
)
sftp.supplier
- auto-create-local-dir
-
Set to true to create the local directory if it does not exist. (Boolean, default:
true
) - delay-when-empty
-
Duration of delay when no new files are detected. (Duration, default:
1s
) - delete-remote-files
-
Set to true to delete remote files after successful transfer. (Boolean, default:
false
) - directories
-
A list of factory "name.directory" pairs. (String[], default:
<none>
) - factories
-
A map of factory names to factories. (Map<String, Factory>, default:
<none>
) - fair
-
True for fair rotation of multiple servers/directories. This is false by default so if a source has more than one entry, these will be received before the other sources are visited. (Boolean, default:
false
) - filename-pattern
-
A filter pattern to match the names of files to transfer. (String, default:
<none>
) - filename-regex
-
A filter regex pattern to match the names of files to transfer. (Pattern, default:
<none>
) - list-only
-
Set to true to return file metadata without the entire payload. (Boolean, default:
false
) - local-dir
-
The local directory to use for file transfers. (File, default:
<none>
) - max-fetch
-
The maximum number of remote files to fetch per poll; default unlimited. Does not apply when listing files or building task launch requests. (Integer, default:
<none>
) - preserve-timestamp
-
Set to true to preserve the original timestamp. (Boolean, default:
true
) - remote-dir
-
The remote FTP directory. (String, default:
/
) - remote-file-separator
-
The remote file separator. (String, default:
/
) - rename-remote-files-to
-
A SpEL expression resolving to the new name remote files must be renamed to after successful transfer. (Expression, default:
<none>
) - stream
-
Set to true to stream the file rather than copy to a local directory. (Boolean, default:
false
) - tmp-file-suffix
-
The suffix to use while the transfer is in progress. (String, default:
.tmp
)
sftp.supplier.factory
- allow-unknown-keys
-
True to allow an unknown or changed key. (Boolean, default:
false
) - host
-
The host name of the server. (String, default:
localhost
) - known-hosts-expression
-
A SpEL expression resolving to the location of the known hosts file. (Expression, default:
<none>
) - pass-phrase
-
Passphrase for user's private key. (String, default:
<empty string>
) - password
-
The password to use to connect to the server. (String, default:
<none>
) - port
-
The port of the server. (Integer, default:
22
) - private-key
-
Resource location of user's private key. (Resource, default:
<none>
) - username
-
The username to use to connect to the server. (String, default:
<none>
)
5.14. SYSLOG
The syslog source receives SYSLOG packets over UDP, TCP, or both. RFC3164 (BSD) and RFC5424 formats are supported.
5.14.1. Options
- syslog.supplier.buffer-size
-
the buffer size used when decoding messages; larger messages will be rejected. (Integer, default:
2048
) - syslog.supplier.nio
-
whether or not to use NIO (when supporting a large number of connections). (Boolean, default:
false
) - syslog.supplier.port
-
The port to listen on. (Integer, default:
1514
) - syslog.supplier.protocol
-
Protocol used for SYSLOG (tcp or udp). (Protocol, default:
<none>
, possible values:tcp
,udp
,both
) - syslog.supplier.reverse-lookup
-
whether or not to perform a reverse lookup on the incoming socket. (Boolean, default:
false
) - syslog.supplier.rfc
-
'5424' or '3164' - the syslog format according to the RFC; 3164 is aka 'BSD' format. (String, default:
3164
) - syslog.supplier.socket-timeout
-
the socket timeout. (Integer, default:
0
)
5.15. TCP
The tcp
source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.
Messages produced by the TCP source application have a byte[]
payload.
5.15.1. Options
Properties grouped by prefix:
tcp
- nio
-
Whether or not to use NIO. (Boolean, default:
false
) - port
-
The port on which to listen; 0 for the OS to choose a port. (Integer, default:
1234
) - reverse-lookup
-
Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default:
false
) - socket-timeout
-
The timeout (ms) before closing the socket when no data is received. (Integer, default:
120000
) - use-direct-buffers
-
Whether or not to use direct buffers. (Boolean, default:
false
)
5.15.2. Available Decoders
- CRLF (default)
-
text terminated by carriage return (0x0d) followed by line feed (0x0a)
- LF
-
text terminated by line feed (0x0a)
- NULL
-
text terminated by a null byte (0x00)
- STXETX
-
text preceded by an STX (0x02) and terminated by an ETX (0x03)
- RAW
-
no structure - the client indicates a complete message by closing the socket
- L1
-
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
- L2
-
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
- L4
-
data preceded by a four byte (signed) length field (up to 231-1 bytes)
5.16. Time Source
The time source will simply emit a String with the current time every so often.
5.16.1. Options
The time source has the following options:
- spring.integration.poller.cron
-
Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default:
<none>
) - spring.integration.poller.fixed-delay
-
Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default:
<none>
) - spring.integration.poller.fixed-rate
-
Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default:
<none>
) - spring.integration.poller.initial-delay
-
Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default:
<none>
) - spring.integration.poller.max-messages-per-poll
-
Maximum number of messages to poll per polling cycle. (Integer, default:
<none>
) - spring.integration.poller.receive-timeout
-
How long to wait for messages on poll. (Duration, default:
1s
) - time.date-format
-
Format for the date value. (String, default:
MM/dd/yy HH:mm:ss
)
5.17. Twitter Message Source
Repeatedly retrieves the direct messages (both sent and received) within the last 30 days, sorted in reverse-chronological order.
The relieved messages are cached (in a MetadataStore
cache) to prevent duplications.
By default an in-memory SimpleMetadataStore
is used.
The twitter.message.source.count
controls the number or returned messages.
The spring.cloud.stream.poller
properties control the message poll interval.
Must be aligned with used APIs rate limit
5.17.1. Options
Properties grouped by prefix:
spring.integration.poller
- cron
-
Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default:
<none>
) - fixed-delay
-
Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default:
<none>
) - fixed-rate
-
Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default:
<none>
) - initial-delay
-
Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default:
<none>
) - max-messages-per-poll
-
Maximum number of messages to poll per polling cycle. (Integer, default:
<none>
) - receive-timeout
-
How long to wait for messages on poll. (Duration, default:
1s
)
twitter.connection
- access-token
-
Your Twitter token. (String, default:
<none>
) - access-token-secret
-
Your Twitter token secret. (String, default:
<none>
) - consumer-key
-
Your Twitter key. (String, default:
<none>
) - consumer-secret
-
Your Twitter secret. (String, default:
<none>
) - debug-enabled
-
Enables Twitter4J debug mode. (Boolean, default:
false
) - raw-json
-
Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default:
true
)
5.18. Twitter Search Source
The Twitter’s Standard search API (search/tweets) allows simple queries against the indices of recent or popular Tweets. This Source
provides continuous searches against a sampling of recent Tweets published in the past 7 days. Part of the 'public' set of APIs.
Returns a collection of relevant Tweets matching a specified query.
Use the spring.cloud.stream.poller
properties to control the interval between consecutive search requests. Rate Limit - 180 requests per 30 min. window (e.g. ~6 r/m, ~ 1 req / 10 sec.)
The twitter.search
query properties allows querying by keywords and filter the result by time and geolocation.
The twitter.search.count
and twitter.search.page
control the result pagination in accordance with to the Search API.
Note: Twitter’s search service and, by extension, the Search API is not meant to be an exhaustive source of Tweets. Not all Tweets will be indexed or made available via the search interface.
5.18.1. Options
Properties grouped by prefix:
spring.integration.poller
- cron
-
Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default:
<none>
) - fixed-delay
-
Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default:
<none>
) - fixed-rate
-
Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default:
<none>
) - initial-delay
-
Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default:
<none>
) - max-messages-per-poll
-
Maximum number of messages to poll per polling cycle. (Integer, default:
<none>
) - receive-timeout
-
How long to wait for messages on poll. (Duration, default:
1s
)
twitter.connection
- access-token
-
Your Twitter token. (String, default:
<none>
) - access-token-secret
-
Your Twitter token secret. (String, default:
<none>
) - consumer-key
-
Your Twitter key. (String, default:
<none>
) - consumer-secret
-
Your Twitter secret. (String, default:
<none>
) - debug-enabled
-
Enables Twitter4J debug mode. (Boolean, default:
false
) - raw-json
-
Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default:
true
)
twitter.search
- count
-
Number of tweets to return per page (e.g. per single request), up to a max of 100. (Integer, default:
100
) - lang
-
Restricts searched tweets to the given language, given by an http://en.wikipedia.org/wiki/ISO_639-1 . (String, default:
<none>
) - page
-
Number of pages (e.g. requests) to search backwards (from most recent to the oldest tweets) before start the search from the most recent tweets again. The total amount of tweets searched backwards is (page * count) (Integer, default:
3
) - query
-
Search tweets by search query string. (String, default:
<none>
) - restart-from-most-recent-on-empty-response
-
Restart search from the most recent tweets on empty response. Applied only after the first restart (e.g. when since_id != UNBOUNDED) (Boolean, default:
false
) - result-type
-
Specifies what type of search results you would prefer to receive. The current default is "mixed." Valid values include: mixed : Include both popular and real time results in the response. recent : return only the most recent results in the response popular : return only the most popular results in the response (ResultType, default:
<none>
, possible values:popular
,mixed
,recent
) - since
-
If specified, returns tweets with since the given date. Date should be formatted as YYYY-MM-DD. (String, default:
<none>
)
5.19. Twitter Stream Source
-
The
Filter API
returns public statuses that match one or more filter predicates. Multiple parameters allows using a single connection to the Streaming API. TIP: Thetrack
,follow
, andlocations
fields are combined with an OR operator! Queries withtrack=foo
andfollow=1234
returns Tweets matchingtest
OR created by user1234
. -
The
Sample API
returns a small random sample of all public statuses. The Tweets returned by the default access level are the same, so if two different clients connect to this endpoint, they will see the same Tweets.
The default access level allows up to 400 track keywords, 5,000 follow user Ids and 25 0.1-360 degree location boxes.
5.19.1. Options
Properties grouped by prefix:
twitter.connection
- access-token
-
Your Twitter token. (String, default:
<none>
) - access-token-secret
-
Your Twitter token secret. (String, default:
<none>
) - consumer-key
-
Your Twitter key. (String, default:
<none>
) - consumer-secret
-
Your Twitter secret. (String, default:
<none>
) - debug-enabled
-
Enables Twitter4J debug mode. (Boolean, default:
false
) - raw-json
-
Enable caching the original (raw) JSON objects as returned by the Twitter APIs. When set to False the result will use the Twitter4J's json representations. When set to True the result will use the original Twitter APISs json representations. (Boolean, default:
true
)
twitter.stream.filter
- count
-
Indicates the number of previous statuses to stream before transitioning to the live stream. (Integer, default:
0
) - filter-level
-
The filter level limits what tweets appear in the stream to those with a minimum filterLevel attribute value. One of either none, low, or medium. (FilterLevel, default:
<none>
) - follow
-
Specifies the users, by ID, to receive public tweets from. (List<Long>, default:
<none>
) - language
-
Specifies the tweets language of the stream. (List<String>, default:
<none>
) - locations
-
Locations to track. Internally represented as 2D array. Bounding box is invalid: 52.38, 4.90, 51.51, -0.12. The first pair must be the SW corner of the box (List<BoundingBox>, default:
<none>
) - track
-
Specifies keywords to track. (List<String>, default:
<none>
)
5.20. Websocket Source
The Websocket
source that produces messages through web socket.
5.20.1. Options
Properties grouped by prefix:
5.21. ZeroMQ Source
The "zeromq" source enables receiving messages from ZeroMQ.
5.21.3. Options
The zeromq source has the following options:
- zeromq.supplier.bind-port
-
Bind Port for creating a ZeroMQ Socket; 0 selects a random port. (Integer, default:
0
) - zeromq.supplier.connect-url
-
Connection URL for to the ZeroMQ Socket. (String, default:
<none>
) - zeromq.supplier.consume-delay
-
The delay to consume from the ZeroMQ Socket when no data received. (Duration, default:
1s
) - zeromq.supplier.socket-type
-
The Socket Type the connection should make. (SocketType, default:
<none>
, possible values:PAIR
,PUB
,SUB
,REQ
,REP
,DEALER
,ROUTER
,PULL
,PUSH
,XPUB
,XSUB
,STREAM
) - zeromq.supplier.topics
-
The Topics to subscribe to. (String[], default:
[]
)
Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.
6. Processors
6.1. Aggregator Processor
Aggregator processor enables an application to aggregates incoming messages into groups and release them into an output destination.
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
Change kafka to rabbit if you want to run it against RabbitMQ.
6.1.2. Options
Properties grouped by prefix:
aggregator
- aggregation
-
SpEL expression for aggregation strategy. Default is collection of payloads. (Expression, default:
<none>
) - correlation
-
SpEL expression for correlation key. Default to correlationId header. (Expression, default:
<none>
) - group-timeout
-
SpEL expression for timeout to expiring uncompleted groups. (Expression, default:
<none>
) - message-store-entity
-
Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc. (String, default:
<none>
) - message-store-type
-
Message store type. (String, default:
<none>
) - release
-
SpEL expression for release strategy. Default is based on the sequenceSize header. (Expression, default:
<none>
)
spring.data.mongodb
- authentication-database
-
Authentication database name. (String, default:
<none>
) - auto-index-creation
-
Whether to enable auto-index creation. (Boolean, default:
<none>
) - database
-
Database name. (String, default:
<none>
) - field-naming-strategy
-
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default:
<none>
) - grid-fs-database
-
<documentation missing> (String, default:
<none>
) - host
-
Mongo server host. Cannot be set with URI. (String, default:
<none>
) - password
-
Login password of the mongo server. Cannot be set with URI. (Character[], default:
<none>
) - port
-
Mongo server port. Cannot be set with URI. (Integer, default:
<none>
) - replica-set-name
-
Required replica set name for the cluster. Cannot be set with URI. (String, default:
<none>
) - uri
-
Mongo database URI. Cannot be set with host, port, credentials and replica set name. (String, default:
mongodb://localhost/test
) - username
-
Login user of the mongo server. Cannot be set with URI. (String, default:
<none>
) - uuid-representation
-
Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default:
java-legacy
, possible values:UNSPECIFIED
,STANDARD
,C_SHARP_LEGACY
,JAVA_LEGACY
,PYTHON_LEGACY
)
spring.datasource
- continue-on-error
-
Whether to stop if an error occurs while initializing the database. (Boolean, default:
false
) - data
-
Data (DML) script resource references. (List<String>, default:
<none>
) - data-password
-
Password of the database to execute DML scripts (if different). (String, default:
<none>
) - data-username
-
Username of the database to execute DML scripts (if different). (String, default:
<none>
) - driver-class-name
-
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - embedded-database-connection
-
Connection details for an embedded database. Defaults to the most suitable embedded database that is available on the classpath. (EmbeddedDatabaseConnection, default:
<none>
, possible values:NONE
,H2
,DERBY
,HSQL
,HSQLDB
) - generate-unique-name
-
Whether to generate a random datasource name. (Boolean, default:
true
) - initialization-mode
-
Mode to apply when determining if DataSource initialization should be performed using the available DDL and DML scripts. (DataSourceInitializationMode, default:
embedded
, possible values:ALWAYS
,EMBEDDED
,NEVER
) - jndi-name
-
JNDI location of the datasource. Class, url, username and password are ignored when set. (String, default:
<none>
) - name
-
Datasource name to use if "generate-unique-name" is false. Defaults to "testdb" when using an embedded database, otherwise null. (String, default:
<none>
) - password
-
Login password of the database. (String, default:
<none>
) - platform
-
Platform to use in the DDL or DML scripts (such as schema-${platform}.sql or data-${platform}.sql). (String, default:
all
) - schema
-
Schema (DDL) script resource references. (List<String>, default:
<none>
) - schema-password
-
Password of the database to execute DDL scripts (if different). (String, default:
<none>
) - schema-username
-
Username of the database to execute DDL scripts (if different). (String, default:
<none>
) - separator
-
Statement separator in SQL initialization scripts. (String, default:
;
) - sql-script-encoding
-
SQL scripts encoding. (Charset, default:
<none>
) - type
-
Fully qualified name of the connection pool implementation to use. By default, it is auto-detected from the classpath. (Class<DataSource>, default:
<none>
) - url
-
JDBC URL of the database. (String, default:
<none>
) - username
-
Login username of the database. (String, default:
<none>
)
spring.mongodb.embedded
- features
-
Comma-separated list of features to enable. Uses the defaults of the configured version by default. (Set<Feature>, default:
[sync_delay]
) - version
-
Version of Mongo to use. (String, default:
3.5.5
)
spring.redis
- client-name
-
Client name to be set on connections with CLIENT SETNAME. (String, default:
<none>
) - client-type
-
Type of client to use. By default, auto-detected according to the classpath. (ClientType, default:
<none>
, possible values:LETTUCE
,JEDIS
) - connect-timeout
-
Connection timeout. (Duration, default:
<none>
) - database
-
Database index used by the connection factory. (Integer, default:
0
) - host
-
Redis server host. (String, default:
localhost
) - password
-
Login password of the redis server. (String, default:
<none>
) - port
-
Redis server port. (Integer, default:
6379
) - ssl
-
Whether to enable SSL support. (Boolean, default:
false
) - timeout
-
Read timeout. (Duration, default:
<none>
) - url
-
Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default:
<none>
) - username
-
Login username of the redis server. (String, default:
<none>
)
6.2. Bridge Processor
A processor that bridges the input and ouput by simply passing the incoming payload to the outbound.
6.3. Filter Processor
Filter processor enables an application to examine the incoming payload and then applies a predicate against it which decides if the record needs to be continued.
For example, if the incoming payload is of type String
and you want to filter out anything that has less than five characters, you can run the filter processor as below.
java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
Change kafka to rabbit if you want to run it against RabbitMQ.
6.4. Groovy Processor
A Processor that applies a Groovy script against messages.
6.4.1. Options
The groovy-processor processor has the following options:
- groovy-processor.script
-
Reference to a script used to process messages. (Resource, default:
<none>
) - groovy-processor.variables
-
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - groovy-processor.variables-location
-
The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
6.5. Header Enricher Processor
Use the header-enricher app to add message headers.
The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions.
For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'
.
6.5.1. Options
The header-enricher processor has the following options:
- header.enricher.headers
-
\n separated properties representing headers in which values are SpEL expressions, e.g foo='bar' \n baz=payload.baz. (Properties, default:
<none>
) - header.enricher.overwrite
-
set to true to overwrite any existing message headers. (Boolean, default:
false
)
6.6. Http Request Processor
A processor app that makes requests to an HTTP resource and emits the response body as a message payload.
6.6.1. Input
Headers
Any Required HTTP headers must be explicitly set via the headers
or headers-expression
property. See examples below.
Header values may also be used to construct:
-
the request body when referenced in the
body-expression
property. -
the HTTP method when referenced in the
http-method-expression
property. -
the URL when referenced in the
url-expression
property.
Payload
The payload is used as the request body for a POST request by default, and can be any Java type. It should be an empty String for a GET request. The payload may also be used to construct:
-
the request body when referenced in the
body-expression
property. -
the HTTP method when referenced in the
http-method-expression
property. -
the URL when referenced in the
url-expression
property.
The underlying WebClient supports Jackson JSON serialization to support any request and response types if necessary.
The expected-response-type
property, String.class
by default, may be set to any class in your application class path.
Note that user defined payload types will require adding required dependencies to your pom file.
6.6.2. Output
Payload
The raw output object is ResponseEntity<?> any of its fields (e.g., body
, headers
) or accessor methods (statusCode
) may be referenced as part of the reply-expression
.
By default the outbound Message payload is the response body.
Note that ResponseEntity (referenced by the expression #root
) cannot be deserialized by Jackson by default, but may be rendered as a HashMap
.
6.6.4. Options
Properties grouped by prefix:
http.request
- body-expression
-
A SpEL expression to derive the request body from the incoming message. (Expression, default:
<none>
) - expected-response-type
-
The type used to interpret the response. (Class<?>, default:
<none>
) - headers-expression
-
A SpEL expression used to derive the http headers map to use. (Expression, default:
<none>
) - http-method-expression
-
A SpEL expression to derive the request method from the incoming message. (Expression, default:
<none>
) - reply-expression
-
A SpEL expression used to compute the final result, applied against the whole http {@link org.springframework.http.ResponseEntity}. (Expression, default:
<none>
) - timeout
-
Request timeout in milliseconds. (Long, default:
30000
) - url-expression
-
A SpEL expression against incoming message to determine the URL to use. (Expression, default:
<none>
)
spring.codec
- log-request-details
-
Whether to log form data at DEBUG level, and headers at TRACE level. (Boolean, default:
false
) - max-in-memory-size
-
Limit on the number of bytes that can be buffered whenever the input stream needs to be aggregated. This applies only to the auto-configured WebFlux server and WebClient instances. By default this is not set, in which case individual codec defaults apply. Most codecs are limited to 256K by default. (DataSize, default:
<none>
)
6.7. Image Recognition Processor
A processor that uses an Inception model to classify in real-time images into different categories (e.g. labels).
Model implements a deep Convolutional Neural Network that can achieve reasonable performance on hard visual recognition tasks - matching or exceeding human performance in some domains like image recognition.
The input of the model is an image as binary array.
The output is a JSON message in this format:
{
"labels" : [
{"giant panda":0.98649305}
]
}
Result contains the name of the recognized category (e.g. label) along with the confidence (e.g. confidence) that the image represents this category.
If the response-seize
is set to value higher then 1, then the result will include the top response-seize
probable labels. For example response-size=3
would return:
{
"labels": [
{"giant panda":0.98649305},
{"badger":0.010562794},
{"ice bear":0.001130851}
]
}
Payload
If the incoming type is byte[]
and the content type is set to application/octet-stream
, then the application process the input byte[]
image into and outputs augmented byte[]
image payload and json header.
6.7.2. Options
- image.recognition.cache-model
-
cache the pre-trained tensorflow model. (Boolean, default:
true
) - image.recognition.debug-output
-
<documentation missing> (Boolean, default:
false
) - image.recognition.debug-output-path
-
<documentation missing> (String, default:
image-recognition-result.png
) - image.recognition.model
-
pre-trained tensorflow image recognition model. Note that the model must match the selected model type! (String, default:
https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb
) - image.recognition.model-type
-
Supports three different pre-trained tensorflow image recognition models: Inception, MobileNetV1 and MobileNetV2 1. Inception graph uses 'input' as input and 'output' as output. 2. MobileNetV2 pre-trained models: https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - normalized image size is always square (e.g. H=W) - graph uses 'input' as input and 'MobilenetV2/Predictions/Reshape_1' as output. 3. MobileNetV1 pre-trained models: https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - graph uses 'input' as input and 'MobilenetV1/Predictions/Reshape_1' as output. (ModelType, default:
<none>
, possible values:inception
,mobilenetv1
,mobilenetv2
) - image.recognition.normalized-image-size
-
Normalized image size. (Integer, default:
224
) - image.recognition.response-size
-
number of recognized images. (Integer, default:
5
)
6.8. Object Detection Processor
The Object Detection processor provides out-of-the-box support for the TensorFlow Object Detection API. It allows for real-time localization and identification of multiple objects in a single image or image stream. The Object Detection processor is built on top of the Object Detection Function.
You have to provide the Processor with a pre-trained object detection model, and the corresponding object labels.
Here are some sensible configuration defaults:
-
object.detection.model
:storage.googleapis.com/scdf-tensorflow-models/object-detection/faster_rcnn_resnet101_coco_2018_01_28_frozen_inference_graph.pb
-
object.detection.labels
:storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt
-
object.detection.with-masks
:false
The following diagram shows a Spring Cloud Data Flow, streaming pipeline, that predicts, in real-time, the object types in input image stream.
Processor’s input is an image byte array, and the output is an augmented image, and a header, called detected_objects
, that provides textual description of the detected objects:
{
"labels" : [
{"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
{"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
{"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
{"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
]
}
The detected_objects
header format is:
-
object-name:confidence - human readable name of the detected object (e.g. label) with its confidence as a float between [0-1]
-
x1, y1, x2, y2 - Response also provides the bounding box of the detected objects represented as
(x1, y1, x2, y2)
. The coordinates are relative to the size of the image size. -
cid - Classification identifier as defined in the provided labels configuration file.
Payload
The incoming type is byte[]
, and the content type is application/octet-stream
. The processor processes the input byte[]
image and outputs an augmented byte[]
image payload and a JSON header (detected_objects
).
6.8.2. Options
- object.detection.cache-model
-
<documentation missing> (Boolean, default:
true
) - object.detection.confidence
-
<documentation missing> (Float, default:
0.4
) - object.detection.debug-output
-
<documentation missing> (Boolean, default:
false
) - object.detection.debug-output-path
-
<documentation missing> (String, default:
object-detection-result.png
) - object.detection.labels
-
Labels URI. (String, default:
https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt
) - object.detection.model
-
pre-trained tensorflow object detection model. (String, default:
https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb
) - object.detection.response-size
-
<documentation missing> (Integer, default:
<none>
) - object.detection.with-masks
-
<documentation missing> (Boolean, default:
false
)
6.9. Semantic Segmentation Processor
Image Semantic Segmentation based on the state-of-art DeepLab Tensorflow model.
The Semantic Segmentation
is the process of associating each pixel of an image with a class label, (such as flower, person, road, sky, ocean, or car).
Unlike the Instance Segmentation
, which produces instance-aware region masks, the Semantic Segmentation
produces class-aware masks.
For implementing Instance Segmentation
consult the Object Detection Service instead.
The Semantic Segmentation Processor
uses the Semantic Segmentation Function library and the TensorFlow Service.
Payload
The incoming type is byte[]
, and the content type is application/octet-stream
. The processor processes the input byte[]
image and outputs augmented byte[]
image payload and json header.
Processor’s input is an image byte array, and the output is an augmented image byte array, and a JSON header semantic_segmentation
in this format:
[
[ 0, 0, 0 ],
[ 127, 127, 127 ],
[ 255, 255, 255 ]
...
]
The output header json format represents the color pixel map computed from the input image.
6.9.2. Options
- semantic.segmentation.color-map-uri
-
Every pre-trained model is based on certain object color maps. The pre-defined options are: - classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json (String, default:
classpath:/colormap/citymap_colormap.json
) - semantic.segmentation.debug-output
-
save output image inn the local debugOutputPath path. (Boolean, default:
false
) - semantic.segmentation.debug-output-path
-
<documentation missing> (String, default:
semantic-segmentation-result.png
) - semantic.segmentation.mask-transparency
-
The alpha color of the computed segmentation mask image. (Float, default:
0.45
) - semantic.segmentation.model
-
pre-trained tensorflow semantic segmentation model. (String, default:
https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb
) - semantic.segmentation.output-type
-
Specifies the output image type. You can return either the input image with the computed mask overlay, or the mask alone. (OutputType, default:
<none>
, possible values:blended
,mask
)
6.10. Script Processor
Processor that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).
6.10.1. Options
The script-processor processor has the following options:
- script-processor.language
-
Language of the text in the script property. Supported: groovy, javascript, ruby, python. (String, default:
<none>
) - script-processor.script
-
Text of the script. (String, default:
<none>
) - script-processor.variables
-
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - script-processor.variables-location
-
The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
6.11. Splitter Processor
The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.
The processor uses a function that takes a Message<?>
as input and then produces a List<Message<?>
as output based on various properties (see below).
You can use a SpEL expression or a delimiter to specify how you want to split the incoming message.
Payload
-
Incoming payload -
Message<?
>
If the incoming type is byte[]
and the content type is set to text/plain
or application/json
, then the application converts the byte[]
into String
.
-
Outgoing payload -
List<Message<?>
6.11.2. Options
- splitter.apply-sequence
-
Add correlation/sequence information in headers to facilitate later aggregation. (Boolean, default:
true
) - splitter.charset
-
The charset to use when converting bytes in text-based files to String. (String, default:
<none>
) - splitter.delimiters
-
When expression is null, delimiters to use when tokenizing {@link String} payloads. (String, default:
<none>
) - splitter.expression
-
A SpEL expression for splitting payloads. (String, default:
<none>
) - splitter.file-markers
-
Set to true or false to use a {@code FileSplitter} (to split text-based files by line) that includes (or not) beginning/end of file markers. (Boolean, default:
<none>
) - splitter.markers-json
-
When 'fileMarkers == true', specify if they should be produced as FileSplitter.FileMarker objects or JSON. (Boolean, default:
true
)
6.12. Transform Processor
Transformer processor allows you to convert the message payload structure based on a SpEL expression.
Here is an example of how you can run this application.
java -jar transform-processor-kafka-<version>.jar --spel.function.expression=payload.toUpperCase()
Change kafka to rabbit if you want to run it against RabbitMQ.
6.13. Twitter Trend and Trend Locations Processor
Processor that can return either trending topic or the Locations of the trending topics.
The twitter.trend.trend-query-type
property allow to select the query type.
6.13.1. Retrieve trending topic in a location (optionally)
For this mode set twitter.trend.trend-query-type
to trend
.
Processor based on Trends API. Returns the trending topics near a specific latitude, longitude location.
6.13.2. Retrieve trend Locations
For this mode set twitter.trend.trend-query-type
to trendLocation
.
Retrieve a full or nearby locations list of trending topics by location.
If the latitude
, longitude
parameters are NOT provided the processor performs the Trends Available API and returns the locations that Twitter has trending topic information for.
If the latitude
, longitude
parameters are provided the processor performs the Trends Closest API and returns the locations that Twitter has trending topic information for, closest to a specified location.
Response is an array of locations
that encode the location’s WOEID and some other human-readable information such as a canonical name and country the location belongs in.
6.13.3. Options
Properties grouped by prefix:
twitter.trend.closest
- lat
-
If provided with a long parameter the available trend locations will be sorted by distance, nearest to furthest, to the co-ordinate pair. The valid ranges for longitude is -180.0 to +180.0 (West is negative, East is positive) inclusive. (Expression, default:
<none>
) - lon
-
If provided with a lat parameter the available trend locations will be sorted by distance, nearest to furthest, to the co-ordinate pair. The valid ranges for longitude is -180.0 to +180.0 (West is negative, East is positive) inclusive. (Expression, default:
<none>
)
twitter.trend
- location-id
-
The Yahoo! Where On Earth ID of the location to return trending information for. Global information is available by using 1 as the WOEID. (Expression, default:
payload
) - trend-query-type
-
<documentation missing> (TrendQueryType, default:
<none>
, possible values:trend
,trendLocation
)
7. Sinks
7.1. Cassandra Sink
This sink application writes the content of each message it receives into Cassandra.
It expects a payload of JSON String and uses it’s properties to map to table columns.
Payload
A JSON String or byte array representing the entity (or a list of entities) to be persisted.
7.1.2. Options
The cassandra sink has the following options:
- spring.cassandra.compression
-
Compression supported by the Cassandra binary protocol. (Compression, default:
none
, possible values:LZ4
,SNAPPY
,NONE
) - spring.cassandra.config
-
Location of the configuration file to use. (Resource, default:
<none>
) - spring.cassandra.contact-points
-
Cluster node addresses in the form 'host:port', or a simple 'host' to use the configured port. (List<String>, default:
[127.0.0.1:9042]
) - spring.cassandra.keyspace-name
-
Keyspace name to use. (String, default:
<none>
) - spring.cassandra.local-datacenter
-
Datacenter that is considered "local". Contact points should be from this datacenter. (String, default:
<none>
) - spring.cassandra.password
-
Login password of the server. (String, default:
<none>
) - spring.cassandra.port
-
Port to use if a contact point does not specify one. (Integer, default:
9042
) - spring.cassandra.schema-action
-
Schema action to take at startup. (String, default:
none
) - spring.cassandra.session-name
-
Name of the Cassandra session. (String, default:
<none>
) - spring.cassandra.ssl
-
Enable SSL support. (Boolean, default:
false
) - spring.cassandra.username
-
Login user of the server. (String, default:
<none>
)
7.2. Analytics Sink
Sink application, built on top of the Analytics Consumer, that computes analytics from the input messages and publishes the analytics as metrics to various monitoring systems. It leverages the micrometer library for providing a uniform programming experience across the most popular monitoring systems and exposes Spring Expression Language (SpEL) properties for defining how the metric Name, Values and Tags are computed from the input data.
The analytics sink can produce two metrics types:
-
Counter - reports a single metric, a count, that increments by a fixed, positive amount. Counters can be used for computing the rates of how the data changes in time.
-
Gauge - reports the current value. Typical examples for gauges would be the size of a collection or map or number of threads in a running state.
A Meter (e.g Counter or Gauge) is uniquely identified by its name
and dimensions
(the term dimensions and tags is used interchangeably). Dimensions allow a particular named metric to be sliced to drill down and reason about the data.
As a metrics is uniquely identified by its name and dimensions , you can assign multiple tags (e.g. key/value pairs) to every metric, but you cannot randomly change those tags afterwards! Monitoring systems such as Prometheus will complain if a metric with the same name has different sets of tags.
|
Use the analytics.name
or analytics.name-expression
properties set the name of the output analytics metrics. If not set the metrics name defaults to the applications name.
Use the analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>
, property for adding one or many tags to your metrics. the TAG_NAME
used in the property definition will appear as tag name in the metrics. The TAG_VALUE is a SpEL
expression that dynamically computes the tag value from the incoming message.
The SpEL
expressions use the headers
and payload
keywords to access message’s headers and payload values.
You can use literals (e.g. 'fixed value' ) to set tags with fixed values.
|
All Stream Applications support, ouf of the box, three of the most popular monitoring systems, Wavefront
, Prometheus
and InfluxDB
and you can enable each of them declaratively.
You can add support for additional monitoring systems by just adding their micrometer meter-registry dependencies to the Analytics Sink
applications.
Please visit the Spring Cloud Data Flow Stream Monitoring for detailed instructions for configuring the Monitoring Systems. The following quick snippets can help you start.
-
To enable the Prometheus meter registry, set the following properties.
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
To enable Wavefront meter registry, set the following properties.
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
To enable InfluxDB meter registry, set the following property.
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
If the Data Flow Server Monitoring is enabled then the Analytics Sink will reuse the provided metrics configurations.
|
Following diagram illustrates how the Analytics Sink
helps to collection business insides for stock-exchange, real-time pipeline.
7.2.2. Options
Properties grouped by prefix:
analytics
- amount-expression
-
A SpEL expression to compute the output metrics value (e.g. amount). It defaults to 1.0 (Expression, default:
<none>
) - meter-type
-
Micrometer meter type used to report the metrics to the backend. (MeterType, default:
<none>
, possible values:counter
,gauge
) - name
-
The name of the output metrics. The 'name' and 'nameExpression' are mutually exclusive. Only one of them can be set. (String, default:
<none>
) - name-expression
-
A SpEL expression to compute the output metrics name from the input message. The 'name' and 'nameExpression' are mutually exclusive. Only one of them can be set. (Expression, default:
<none>
)
analytics.tag
- expression
-
Computes tags from SpEL expression. Single SpEL expression can produce an array of values, which in turn means distinct name/value tags. Every name/value tag will produce a separate meter increment. Tag expression format is: analytics.tag.expression.[tag-name]=[SpEL expression] (Map<String, Expression>, default:
<none>
) - fixed
-
DEPRECATED: Please use the analytics.tag.expression with literal SpEL expression. Custom, fixed Tags. Those tags have constant values, created once and then sent along with every published metrics. The convention to define a fixed Tags is: <code> analytics.tag.fixed.[tag-name]=[tag-value] </code> (Map<String, String>, default:
<none>
)
7.3. Elasticsearch Sink
Sink that indexes documents into Elasticsearch.
This Elasticsearch sink only supports indexing JSON documents.
It consumes data from an input destination and then indexes it to Elasticsearch.
The input data can be a plain json string, or a java.util.Map
that represents the JSON.
It also accepts the data as the Elasticsearch provided XContentBuilder
.
However, this is a rare case as it is not likely the middleware keeps the records as XContentBuilder
.
This is provided mainly for direct invocation of the consumer.
7.3.1. Options
The Elasticsearch sink has the following options:
- elasticsearch.consumer.async
-
Indicates whether the indexing operation is async or not. By default indexing is done synchronously. (Boolean, default:
false
) - elasticsearch.consumer.batch-size
-
Number of items to index for each request. It defaults to 1. For values greater than 1 bulk indexing API will be used. (Integer, default:
1
) - elasticsearch.consumer.group-timeout
-
Timeout in milliseconds after which message group is flushed when bulk indexing is active. It defaults to -1, meaning no automatic flush of idle message groups occurs. (Long, default:
-1
) - elasticsearch.consumer.id
-
The id of the document to index. If set, the INDEX_ID header value overrides this property on a per message basis. (Expression, default:
<none>
) - elasticsearch.consumer.index
-
Name of the index. If set, the INDEX_NAME header value overrides this property on a per message basis. (String, default:
<none>
) - elasticsearch.consumer.routing
-
Indicates the shard to route to. If not provided, Elasticsearch will default to a hash of the document id. (String, default:
<none>
) - elasticsearch.consumer.timeout-seconds
-
Timeout for the shard to be available. If not set, it defaults to 1 minute set by the Elasticsearch client. (Long, default:
0
)
7.3.2. Examples of running this sink
-
From the folder
elasticsearch-sink
:./mvnw clean package
-
cd apps
-
cd to the proper binder generated app (Kafka or RabbitMQ)
-
./mvnw clean package
-
Make sure that you have Elasticsearch running. For example you can run it as a docker container using the following command.
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
-
Start the middleware (Kafka or RabbitMQ) if it is not already running.
-
java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing
-
Send some JSON data into the middleware destination. For e.g:
{"foo":"bar"}
-
Verify that the data is indexed:
curl localhost:9200/testing/_search
7.4. File Sink
The file sink app writes each message it receives to a file.
7.4.2. Options
The file-sink
has the following options:
- file.consumer.binary
-
A flag to indicate whether adding a newline after the write should be suppressed. (Boolean, default:
false
) - file.consumer.charset
-
The charset to use when writing text content. (String, default:
UTF-8
) - file.consumer.directory
-
The parent directory of the target file. (File, default:
<none>
) - file.consumer.directory-expression
-
The expression to evaluate for the parent directory of the target file. (String, default:
<none>
) - file.consumer.mode
-
The FileExistsMode to use if the target file already exists. (FileExistsMode, default:
<none>
, possible values:APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - file.consumer.name
-
The name of the target file. (String, default:
file-consumer
) - file.consumer.name-expression
-
The expression to evaluate for the name of the target file. (String, default:
<none>
) - file.consumer.suffix
-
The suffix to append to file name. (String, default:
<empty string>
)
7.5. FTP Sink
FTP sink is a simple option to push files to an FTP server from incoming messages.
It uses an ftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name
based on the value of the file_name header (if it exists) in the MessageHeaders , or if the payload of the Message is already a java.io.File , then it will
use the original name of that file.
|
7.5.4. Options
The ftp sink has the following options:
Properties grouped by prefix:
ftp.consumer
- auto-create-dir
-
Whether or not to create the remote directory. (Boolean, default:
true
) - filename-expression
-
A SpEL expression to generate the remote file name. (String, default:
<none>
) - mode
-
Action to take if the remote file already exists. (FileExistsMode, default:
<none>
, possible values:APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - remote-dir
-
The remote FTP directory. (String, default:
/
) - remote-file-separator
-
The remote file separator. (String, default:
/
) - temporary-remote-dir
-
A temporary directory where the file will be written if '#isUseTemporaryFilename()' is true. (String, default:
/
) - tmp-file-suffix
-
The suffix to use while the transfer is in progress. (String, default:
.tmp
) - use-temporary-filename
-
Whether or not to write to a temporary file and rename. (Boolean, default:
true
)
ftp.factory
- cache-sessions
-
Cache sessions. (Boolean, default:
<none>
) - client-mode
-
The client mode to use for the FTP session. (ClientMode, default:
<none>
, possible values:ACTIVE
,PASSIVE
) - host
-
The host name of the server. (String, default:
localhost
) - password
-
The password to use to connect to the server. (String, default:
<none>
) - port
-
The port of the server. (Integer, default:
21
) - username
-
The username to use to connect to the server. (String, default:
<none>
)
Unresolved directive in sinks.adoc - include::https://raw.githubusercontent.com/spring-cloud/stream-applications/main/applications/sink/geode-sink/README.adoc[tags=ref-doc]
7.6. JDBC Sink
JDBC sink allows you to persist incoming payload into an RDBMS database.
The jdbc.consumer.columns
property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE]
where EXPRESSION_FOR_VALUE
(together with the colon) is optional.
In this case the value is evaluated via generated expression like payload.COLUMN_NAME
, so this way we have a direct mapping from object properties to the table column.
For example we have a JSON payload like:
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
So, we can insert it into the table with name
, city
and street
structure using the configuration:
--jdbc.consumer.columns=name,city:address.city,street:address.street
This sink supports batch inserts, as far as supported by the underlying JDBC driver.
Batch inserts are configured via the batch-size
and idle-timeout
properties:
Incoming messages are aggregated until batch-size
messages are present, then inserted as a batch.
If idle-timeout
milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than batch-size
, capping maximum latency.
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.
|
7.6.1. Examples
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names --jdbc.consumer.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.6.2. Options
The jdbc sink has the following options:
Properties grouped by prefix:
jdbc.consumer
- batch-size
-
Threshold in number of messages when data will be flushed to database table. (Integer, default:
1
) - columns
-
The comma separated colon-based pairs of column names and SpEL expressions for values to insert/update. Names are used at initialization time to issue the DDL. (String, default:
payload:payload.toString()
) - idle-timeout
-
Idle timeout in milliseconds when data is automatically flushed to database table. (Long, default:
-1
) - initialize
-
'true', 'false' or the location of a custom initialization script for the table. (String, default:
false
) - table-name
-
The name of the table to write into. (String, default:
messages
)
spring.datasource
- driver-class-name
-
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - password
-
Login password of the database. (String, default:
<none>
) - url
-
JDBC URL of the database. (String, default:
<none>
) - username
-
Login username of the database. (String, default:
<none>
)
7.7. Log Sink
The log
sink uses the application logger to output the data for inspection.
Please understand that log
sink uses type-less handler, which affects how the actual logging will be performed.
This means that if the content-type is textual, then raw payload bytes will be converted to String, otherwise raw bytes will be logged.
Please see more info in the user-guide.
7.7.1. Options
The log sink has the following options:
- log.expression
-
A SpEL expression (against the incoming message) to evaluate as the logged message. (String, default:
payload
) - log.level
-
The level at which to log messages. (Level, default:
<none>
, possible values:FATAL
,ERROR
,WARN
,INFO
,DEBUG
,TRACE
) - log.name
-
The name of the logger to use. (String, default:
<none>
)
7.8. MongoDB Sink
This sink application ingest incoming data into MongoDB.
This application is fully based on the MongoDataAutoConfiguration
, so refer to the Spring Boot MongoDB Support for more information.
7.8.2. Options
The mongodb sink has the following options:
Properties grouped by prefix:
mongodb.consumer
- collection
-
The MongoDB collection to store data. (String, default:
<none>
) - collection-expression
-
The SpEL expression to evaluate MongoDB collection. (Expression, default:
<none>
)
spring.data.mongodb
- additional-hosts
-
Additional server hosts. Cannot be set with URI or if 'host' is not specified. Additional hosts will use the default mongo port of 27017, if you want to use a different port you can use the "host:port" syntax. (List<String>, default:
<none>
) - authentication-database
-
Authentication database name. (String, default:
<none>
) - auto-index-creation
-
Whether to enable auto-index creation. (Boolean, default:
<none>
) - database
-
Database name. (String, default:
<none>
) - field-naming-strategy
-
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default:
<none>
) - host
-
Mongo server host. Cannot be set with URI. (String, default:
<none>
) - password
-
Login password of the mongo server. Cannot be set with URI. (Character[], default:
<none>
) - port
-
Mongo server port. Cannot be set with URI. (Integer, default:
<none>
) - replica-set-name
-
Required replica set name for the cluster. Cannot be set with URI. (String, default:
<none>
) - uri
-
Mongo database URI. Overrides host, port, username, password, and database. (String, default:
mongodb://localhost/test
) - username
-
Login user of the mongo server. Cannot be set with URI. (String, default:
<none>
) - uuid-representation
-
Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default:
java-legacy
, possible values:UNSPECIFIED
,STANDARD
,C_SHARP_LEGACY
,JAVA_LEGACY
,PYTHON_LEGACY
)
7.9. MQTT Sink
This module sends messages to MQTT.
7.9.2. Options
The mqtt sink has the following options:
Properties grouped by prefix:
mqtt
- clean-session
-
whether the client and server should remember state across restarts and reconnects. (Boolean, default:
true
) - connection-timeout
-
the connection timeout in seconds. (Integer, default:
30
) - keep-alive-interval
-
the ping interval in seconds. (Integer, default:
60
) - password
-
the password to use when connecting to the broker. (String, default:
guest
) - persistence
-
'memory' or 'file'. (String, default:
memory
) - persistence-directory
-
Persistence directory. (String, default:
/tmp/paho
) - ssl-properties
-
MQTT Client SSL properties. (Map<String, String>, default:
<none>
) - url
-
location of the mqtt broker(s) (comma-delimited list). (String[], default:
[tcp://localhost:1883]
) - username
-
the username to use when connecting to the broker. (String, default:
guest
)
mqtt.consumer
- async
-
whether or not to use async sends. (Boolean, default:
false
) - charset
-
the charset used to convert a String payload to byte[]. (String, default:
UTF-8
) - client-id
-
identifies the client. (String, default:
stream.client.id.sink
) - qos
-
the quality of service to use. (Integer, default:
1
) - retained
-
whether to set the 'retained' flag. (Boolean, default:
false
) - topic
-
the topic to which the sink will publish. (String, default:
stream.mqtt
)
7.10. Pgcopy Sink
A module that writes its incoming payload to an RDBMS using the PostgreSQL COPY command.
7.10.3. Options
The jdbc sink has the following options:
- spring.datasource.driver-class-name
-
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - spring.datasource.password
-
Login password of the database. (String, default:
<none>
) - spring.datasource.url
-
JDBC URL of the database. (String, default:
<none>
) - spring.datasource.username
-
Login username of the database. (String, default:
<none>
)
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.
|
7.10.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
For integration tests to run, start a PostgreSQL database on localhost:
docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest
7.11. RabbitMQ Sink
This module sends messages to RabbitMQ.
7.11.1. Options
The rabbit sink has the following options:
(See the Spring Boot documentation for RabbitMQ connection properties)
Properties grouped by prefix:
rabbit
- converter-bean-name
-
The bean name for a custom message converter; if omitted, a SimpleMessageConverter is used. If 'jsonConverter', a Jackson2JsonMessageConverter bean will be created for you. (String, default:
<none>
) - exchange
-
Exchange name - overridden by exchangeNameExpression, if supplied. (String, default:
<empty string>
) - exchange-expression
-
A SpEL expression that evaluates to an exchange name. (Expression, default:
<none>
) - headers-mapped-last
-
When mapping headers for the outbound message, determine whether the headers are mapped before the message is converted, or afterwards. (Boolean, default:
true
) - mapped-request-headers
-
Headers that will be mapped. (String[], default:
[*]
) - own-connection
-
When true, use a separate connection based on the boot properties. (Boolean, default:
false
) - persistent-delivery-mode
-
Default delivery mode when 'amqp_deliveryMode' header is not present, true for PERSISTENT. (Boolean, default:
false
) - routing-key
-
Routing key - overridden by routingKeyExpression, if supplied. (String, default:
<none>
) - routing-key-expression
-
A SpEL expression that evaluates to a routing key. (Expression, default:
<none>
)
spring.rabbitmq
- address-shuffle-mode
-
Mode used to shuffle configured addresses. (AddressShuffleMode, default:
none
, possible values:NONE
,RANDOM
,INORDER
) - addresses
-
Comma-separated list of addresses to which the client should connect. When set, the host and port are ignored. (String, default:
<none>
) - channel-rpc-timeout
-
Continuation timeout for RPC calls in channels. Set it to zero to wait forever. (Duration, default:
10m
) - connection-timeout
-
Connection timeout. Set it to zero to wait forever. (Duration, default:
<none>
) - host
-
RabbitMQ host. Ignored if an address is set. (String, default:
localhost
) - password
-
Login to authenticate against the broker. (String, default:
guest
) - port
-
RabbitMQ port. Ignored if an address is set. Default to 5672, or 5671 if SSL is enabled. (Integer, default:
<none>
) - publisher-confirm-type
-
Type of publisher confirms to use. (ConfirmType, default:
<none>
, possible values:SIMPLE
,CORRELATED
,NONE
) - publisher-returns
-
Whether to enable publisher returns. (Boolean, default:
false
) - requested-channel-max
-
Number of channels per connection requested by the client. Use 0 for unlimited. (Integer, default:
2047
) - requested-heartbeat
-
Requested heartbeat timeout; zero for none. If a duration suffix is not specified, seconds will be used. (Duration, default:
<none>
) - username
-
Login user to authenticate to the broker. (String, default:
guest
) - virtual-host
-
Virtual host to use when connecting to the broker. (String, default:
<none>
)
7.12. Redis Sink
Sends messages to Redis.
7.12.1. Options
The redis sink has the following options:
Properties grouped by prefix:
redis.consumer
- key
-
A literal key name to use when storing to a key. (String, default:
<none>
) - key-expression
-
A SpEL expression to use for storing to a key. (String, default:
<none>
) - queue
-
A literal queue name to use when storing in a queue. (String, default:
<none>
) - queue-expression
-
A SpEL expression to use for queue. (String, default:
<none>
) - topic
-
A literal topic name to use when publishing to a topic. (String, default:
<none>
) - topic-expression
-
A SpEL expression to use for topic. (String, default:
<none>
)
spring.data.redis
- client-name
-
Client name to be set on connections with CLIENT SETNAME. (String, default:
<none>
) - client-type
-
Type of client to use. By default, auto-detected according to the classpath. (ClientType, default:
<none>
, possible values:LETTUCE
,JEDIS
) - connect-timeout
-
Connection timeout. (Duration, default:
<none>
) - database
-
Database index used by the connection factory. (Integer, default:
0
) - host
-
Redis server host. (String, default:
localhost
) - password
-
Login password of the redis server. (String, default:
<none>
) - port
-
Redis server port. (Integer, default:
6379
) - ssl
-
Whether to enable SSL support. (Boolean, default:
false
) - timeout
-
Read timeout. (Duration, default:
<none>
) - url
-
Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default:
<none>
) - username
-
Login username of the redis server. (String, default:
<none>
)
spring.data.redis.jedis.pool
- enabled
-
Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default:
<none>
) - max-active
-
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default:
8
) - max-idle
-
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default:
8
) - max-wait
-
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default:
-1ms
) - min-idle
-
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default:
0
) - time-between-eviction-runs
-
Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default:
<none>
)
spring.data.redis.lettuce.pool
- enabled
-
Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default:
<none>
) - max-active
-
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default:
8
) - max-idle
-
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default:
8
) - max-wait
-
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default:
-1ms
) - min-idle
-
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default:
0
) - time-between-eviction-runs
-
Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default:
<none>
)
spring.data.redis.sentinel
- master
-
Name of the Redis server. (String, default:
<none>
) - nodes
-
Comma-separated list of "host:port" pairs. (List<String>, default:
<none>
) - password
-
Password for authenticating with sentinel(s). (String, default:
<none>
) - username
-
Login username for authenticating with sentinel(s). (String, default:
<none>
)
7.13. Router Sink
This application routes messages to named channels.
7.13.1. Options
The router sink has the following options:
- router.default-output-channel
-
Where to send un-routable messages. (String, default:
nullChannel
) - router.destination-mappings
-
Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - router.expression
-
The expression to be applied to the message to determine the channel(s) to route to. Note that the payload wire format for content types such as text, json or xml is byte[] not String!. Consult the documentation for how to handle byte array payload content. (Expression, default:
<none>
) - router.refresh-delay
-
How often to check for script changes in ms (if present); < 0 means don't refresh. (Integer, default:
60000
) - router.resolution-required
-
Whether or not channel resolution is required. (Boolean, default:
false
) - router.script
-
The location of a groovy script that returns channels or channel mapping resolution keys. (Resource, default:
<none>
) - router.variables
-
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - router.variables-location
-
The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
Since this is a dynamic router, destinations are created as needed; therefore, by default the defaultOutputChannel
and resolutionRequired will only be used if the Binder has some problem binding to the destination.
|
You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations
property.
By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of
destination names, only those will be bound.
Messages that resolve to a destination that is not in this list will be routed to the defaultOutputChannel
, which
must also appear in the list.
destinationMappings
are used to map the evaluation results to an actual destination name.
7.13.2. SpEL-based Routing
The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.
For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring a Generic Router section.
Starting with Spring Cloud Stream 2.0 onwards the message wire format for json , text and xml content types is byte[] not String !
This is an altering change from SCSt 1.x that treats those types as Strings!
Depends on the content type, different techniques for handling the byte[] payloads are available. For plain text
content types, one can covert the octet payload into string using the new String(payload) SpEL expression. For json
types the jsonPath() SpEL utility
already supports string and byte array content interchangeably. Same applies for the xml content type and the
#xpath() SpEL utility.
|
For example for text
content type one should use:
new String(payload).contains('a');
and for json
content type SpEL expressions like this:
#jsonPath(payload, '$.person.name')
7.13.3. Groovy-based Routing
Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :
println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.
For more information, see the Spring Integration Reference manual Groovy Support.
7.14. RSocket Sink
RSocket sink to send data using RSocket protocols' fire and forget strategy.
7.14.1. Options
The rsocket sink has the following options:
- rsocket.consumer.host
-
RSocket host. (String, default:
localhost
) - rsocket.consumer.port
-
RSocket port. (Integer, default:
7000
) - rsocket.consumer.route
-
Route used for RSocket. (String, default:
<none>
) - rsocket.consumer.uri
-
URI that can be used for websocket based transport. (URI, default:
<none>
)
7.15. Amazon S3 Sink
This sink app supports transferring objects to the Amazon S3 bucket.
Files payloads (and directories recursively) are transferred to the remote
directory (S3 bucket) to the local
directory where the application is deployed.
Messages accepted by this sink must contain one of the payload
as:
-
File
, including directories for recursive upload; -
InputStream
; -
byte[]
7.15.1. Options
The s3 sink has the following options:
Properties grouped by prefix:
s3.common
- endpoint-url
-
Optional endpoint url to connect to s3 compatible storage. (String, default:
<none>
) - path-style-access
-
Use path style access. (Boolean, default:
false
)
s3.consumer
- acl
-
S3 Object access control list. (CannedAccessControlList, default:
<none>
, possible values:private
,public-read
,public-read-write
,authenticated-read
,log-delivery-write
,bucket-owner-read
,bucket-owner-full-control
,aws-exec-read
) - acl-expression
-
Expression to evaluate S3 Object access control list. (Expression, default:
<none>
) - bucket
-
AWS bucket for target file(s) to store. (String, default:
<none>
) - bucket-expression
-
Expression to evaluate AWS bucket name. (Expression, default:
<none>
) - key-expression
-
Expression to evaluate S3 Object key. (Expression, default:
<none>
)
The target generated application based on the AmazonS3SinkConfiguration
can be enhanced with the S3MessageHandler.UploadMetadataProvider
and/or S3ProgressListener
, which are injected into S3MessageHandler
bean.
See Spring Integration AWS support for more details.
7.15.2. Amazon AWS common options
The Amazon S3 Sink (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.
Some of them are about AWS credentials:
-
cloud.aws.credentials.accessKey
-
cloud.aws.credentials.secretKey
-
cloud.aws.credentials.instanceProfile
-
cloud.aws.credentials.profileName
-
cloud.aws.credentials.profilePath
Other are for AWS Region
definition:
-
cloud.aws.region.auto
-
cloud.aws.region.static
And for AWS Stack
:
-
cloud.aws.stack.auto
-
cloud.aws.stack.name
7.16. SFTP Sink
SFTP sink is a simple option to push files to an SFTP server from incoming messages.
It uses an sftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name
based on the value of the file_name header (if it exists) in the MessageHeaders , or if the payload of the Message is already a java.io.File , then it will
use the original name of that file.
|
When configuring the sftp.factory.known-hosts-expression
option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
.
7.16.3. Options
The sftp sink has the following options:
Properties grouped by prefix:
sftp.consumer
- auto-create-dir
-
Whether to create the remote directory. (Boolean, default:
true
) - filename-expression
-
A SpEL expression to generate the remote file name. (String, default:
<none>
) - mode
-
Action to take if the remote file already exists. (FileExistsMode, default:
<none>
, possible values:APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - remote-dir
-
The remote FTP directory. (String, default:
/
) - remote-file-separator
-
The remote file separator. (String, default:
/
) - temporary-remote-dir
-
A temporary directory where the file will be written if 'isUseTemporaryFilename()' is true. (String, default:
/
) - tmp-file-suffix
-
The suffix to use while the transfer is in progress. (String, default:
.tmp
) - use-temporary-filename
-
Whether to write to a temporary file and rename. (Boolean, default:
true
)
sftp.consumer.factory
- allow-unknown-keys
-
True to allow an unknown or changed key. (Boolean, default:
false
) - cache-sessions
-
Cache sessions. (Boolean, default:
<none>
) - host
-
The host name of the server. (String, default:
localhost
) - known-hosts-expression
-
A SpEL expression resolving to the location of the known hosts file. (Expression, default:
<none>
) - pass-phrase
-
Passphrase for user's private key. (String, default:
<empty string>
) - password
-
The password to use to connect to the server. (String, default:
<none>
) - port
-
The port of the server. (Integer, default:
22
) - private-key
-
Resource location of user's private key. (Resource, default:
<none>
) - username
-
The username to use to connect to the server. (String, default:
<none>
)
7.17. TCP Sink
This module writes messages to TCP using an Encoder.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.
7.17.1. Options
The tcp sink has the following options:
Properties grouped by prefix:
tcp.consumer
- charset
-
The charset used when converting from bytes to String. (String, default:
UTF-8
) - close
-
Whether to close the socket after each message. (Boolean, default:
false
) - encoder
-
The encoder to use when sending messages. (Encoding, default:
<none>
, possible values:CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
) - host
-
The host to which this sink will connect. (String, default:
<none>
)
tcp
- nio
-
Whether or not to use NIO. (Boolean, default:
false
) - port
-
The port on which to listen; 0 for the OS to choose a port. (Integer, default:
1234
) - reverse-lookup
-
Perform a reverse DNS lookup on the remote IP Address; if false, just the IP address is included in the message headers. (Boolean, default:
false
) - socket-timeout
-
The timeout (ms) before closing the socket when no data is received. (Integer, default:
120000
) - use-direct-buffers
-
Whether or not to use direct buffers. (Boolean, default:
false
)
7.17.2. Available Encoders
- CRLF (default)
-
text terminated by carriage return (0x0d) followed by line feed (0x0a)
- LF
-
text terminated by line feed (0x0a)
- NULL
-
text terminated by a null byte (0x00)
- STXETX
-
text preceded by an STX (0x02) and terminated by an ETX (0x03)
- RAW
-
no structure - the client indicates a complete message by closing the socket
- L1
-
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
- L2
-
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
- L4
-
data preceded by a four byte (signed) length field (up to 231-1 bytes)
7.18. Throughput Sink
Sink that will count messages and log the observed throughput at a selected interval.
7.19. Twitter Message Sink
Send Direct Messages to a specified user from the authenticating user.
Requires a JSON POST body and Content-Type
header to be set to application/json
.
When a message is received from a user you may send up to 5 messages in response within a 24 hour window. Each message received resets the 24 hour window and the 5 allotted messages. Sending a 6th message within a 24 hour window or sending a message outside of a 24 hour window will count towards rate-limiting. This behavior only applies when using the POST direct_messages/events/new endpoint. |
SpEL expressions are used to compute the request parameters from the input message.
7.19.1. Options
Use single quotes (' ) to wrap the literal values of the SpEL expression properties.
For example to set a fixed message text use text='Fixed Text' .
For fixed target userId use userId='666' .
|
- twitter.message.update.media-id
-
A media id to associate with the message. A Direct Message may only reference a single media id. (Expression, default:
<none>
) - twitter.message.update.screen-name
-
The screen name of the user to whom send the direct message. (Expression, default:
<none>
) - twitter.message.update.text
-
The direct message text. URL encode as necessary. Max length of 10,000 characters. (Expression, default:
payload
) - twitter.message.update.user-id
-
The user id of the user to whom send the direct message. (Expression, default:
<none>
)
7.20. Twitter Update Sink
Updates the authenticating user’s current text (e.g Tweeting).
For each update attempt, the update text is compared with the authenticating user’s recent Tweets. Any attempt that would result in duplication will be blocked, resulting in a 403 error. A user cannot submit the same text twice in a row. |
While not rate limited by the API, a user is limited in the number of Tweets they can create at a time. The update limit for standard API is 300 in 3 hours windows. If the number of updates posted by the user reaches the current allowed limit this method will return an HTTP 403 error.
You can find details for the Update API here: developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.20.1. Options
Properties grouped by prefix:
twitter.update
- attachment-url
-
(SpEL expression) In order for a URL to not be counted in the text body of an extended Tweet, provide a URL as a Tweet attachment. This URL must be a Tweet permalink, or Direct Message deep link. Arbitrary, non-Twitter URLs must remain in the text text. URLs passed to the attachment_url parameter not matching either a Tweet permalink or Direct Message deep link will fail at Tweet creation and cause an exception. (Expression, default:
<none>
) - display-coordinates
-
(SpEL expression) Whether or not to put a pin on the exact coordinates a Tweet has been sent from. (Expression, default:
<none>
) - in-reply-to-status-id
-
(SpEL expression) The ID of an existing text that the update is in reply to. Note: This parameter will be ignored unless the author of the Tweet this parameter references is mentioned within the text text. Therefore, you must include @username, where username is the author of the referenced Tweet, within the update. When inReplyToStatusId is set the auto_populate_reply_metadata is automatically set as well. Later ensures that leading @mentions will be looked up from the original Tweet, and added to the new Tweet from there. This wil append @mentions into the metadata of an extended Tweet as a reply chain grows, until the limit on @mentions is reached. In cases where the original Tweet has been deleted, the reply will fail. (Expression, default:
<none>
) - media-ids
-
(SpEL expression) A comma-delimited list of media_ids to associate with the Tweet. You may include up to 4 photos or 1 animated GIF or 1 video in a Tweet. See Uploading Media for further details on uploading media. (Expression, default:
<none>
) - place-id
-
(SpEL expression) A place in the world. (Expression, default:
<none>
) - text
-
(SpEL expression) The text of the text update. URL encode as necessary. t.co link wrapping will affect character counts. Defaults to message's payload (Expression, default:
payload
)
twitter.update.location
- lat
-
The latitude of the location this Tweet refers to. This parameter will be ignored unless it is inside the range -90.0 to +90.0 (North is positive) inclusive. It will also be ignored if there is no corresponding long parameter. (Expression, default:
<none>
) - lon
-
The longitude of the location this Tweet refers to. The valid ranges for longitude are -180.0 to +180.0 (East is positive) inclusive. This parameter will be ignored if outside that range, if it is not a number, if geo_enabled is disabled, or if there no corresponding lat parameter. (Expression, default:
<none>
)
7.21. Wavefront Sink
The Wavefront sink consumes Messages<?>, coverts it into a metric in Wavefront data format and sends the metric directly to Wavefront or a Wavefront proxy.
Supports common ETL use-cases, where existing (historical) metrics data has to be cleaned, transformed and stored in Wavefront for further analysis.
7.21.1. Options
The Wavefront sink has the following options:
- wavefront.api-token
-
Wavefront API access token. (String, default:
<none>
) - wavefront.metric-expression
-
A SpEL expression that evaluates to a metric value. (Expression, default:
<none>
) - wavefront.metric-name
-
The name of the metric.Defaults to the application name. (String, default:
<none>
) - wavefront.proxy-uri
-
The URL of the Wavefront proxy. (String, default:
<none>
) - wavefront.source
-
Unique application, host, container, or instance that emits metrics. (String, default:
<none>
) - wavefront.tag-expression
-
Collection of custom metadata associated with the metric.Point tags cannot be empty. Valid characters for keys: alphanumeric, hyphen ('-'), underscore ('_'), dot ('.'). For values any character is allowed, including spaces. To include a double quote, escape it with a backslash, A backslash cannot be the last character in the tag value. Maximum allowed length for a combination of a point tag key and value is 254 characters (255 including the '=' separating key and value). If the value is longer, the point is rejected and logged (Map<String, Expression>, default:
<none>
) - wavefront.timestamp-expression
-
A SpEL expression that evaluates to a timestamp of the metric (optional). (Expression, default:
<none>
) - wavefront.uri
-
The URL of the Wavefront environment. (String, default:
<none>
)
7.22. Websocket Sink
A simple Websocket Sink implementation.
7.22.1. Options
The following options are supported:
- websocket.consumer.log-level
-
the logLevel for netty channels. Default is <tt>WARN</tt> (String, default:
<none>
) - websocket.consumer.path
-
the path on which a WebsocketSink consumer needs to connect. Default is <tt>/websocket</tt> (String, default:
/websocket
) - websocket.consumer.port
-
the port on which the Netty server listens. Default is <tt>9292</tt> (Integer, default:
9292
) - websocket.consumer.ssl
-
whether or not to create a {@link io.netty.handler.ssl.SslContext}. (Boolean, default:
false
) - websocket.consumer.threads
-
the number of threads for the Netty {@link io.netty.channel.EventLoopGroup}. Default is <tt>1</tt> (Integer, default:
1
)
7.22.2. Examples
To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.
Step 3: Deploy the websocket-sink
Finally start a websocket-sink in trace
mode so that you see the messages produced by the time-source
in the log:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
You should start seeing log messages in the console where you started the WebsocketSink like this:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.22.3. Actuators
There is an Endpoint
that you can use to access the last n
messages sent and received. You have to
enable it by providing --endpoints.websocketconsumertrace.enabled=true
. By default it shows the last 100 messages via the
host:port/websocketconsumertrace
. Here is a sample output:
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
There is also a simple HTML page where you see forwarded messages in a text area. You can access
it directly via host:port
in your browser.
7.23. ZeroMQ Sink
The "zeromq" sink enables sending messages to a ZeroMQ socket.
7.23.3. Options
The zeromq sink has the following options:
- zeromq.consumer.connect-url
-
Connection URL for connecting to the ZeroMQ Socket. (String, default:
<none>
) - zeromq.consumer.socket-type
-
The Socket Type the connection should establish. (SocketType, default:
<none>
, possible values:PAIR
,PUB
,SUB
,REQ
,REP
,DEALER
,ROUTER
,PULL
,PUSH
,XPUB
,XSUB
,STREAM
) - zeromq.consumer.topic
-
A Topic SpEL expression to evaluate a topic before sending messages to subscribers. (Expression, default:
<none>
)