Applications
5. Sources
5.1. Debezium Source
Debezium Engine based Change Data Capture (CDC) source.
The Debezium Source
allows capturing database change events and streaming those over different message binders such Apache Kafka
, RabbitMQ
and all Spring Cloud Stream supporter brokers.
This source can be used with any Spring Cloud Stream message binder. It is not restricted nor depended on the Kafka Connect framework. Though this approach is flexible it comes with certain limitations. |
All Debezium configuration properties are supported.
Just precede any Debezium properties with the debezium.properties.
prefix.
For example to set the Debezium’s connector.class
property use the debezium.properties.connector.class
source property instead.
5.1.1. Database Support
The Debezium Source
currently supports CDC for multiple datastores: MySQL, PostgreSQL, MongoDB, Oracle, SQL Server, Db2, Vitess and Spanner databases.
5.1.2. Options
Event flattening configuration
Debezium provides a comprehensive message format, that accurately details information about changes that happen in the system.
Sometime this format, though, might not be suitable for the downstream consumers, that might require messages that are formatted so that field names and values are presented in a simplified, flattened
structure.
To simplify the format of the event records that the Debezium connectors produce, you can use the Debezium event flattening message transformation. Using the flattering configuration you can configure simple messages format like this:
--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
Debezium Offset Storage
When a Debezium source runs, it reads information from the source and periodically records offsets
that define how much of that information it has processed.
Should the source be restarted, it will use the last recorded offset to know where in the source information it should resume reading.
Out of the box, the following offset storage configuration options are provided:
-
In-Memory
Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
-
Local Filesystem
Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat (1) --debezium.properties.offset.flush.interval.ms=60000 (2)
1 Path to file where offsets are to be stored. Required when offset.storage`
is set to theFileOffsetBackingStore
.2 Interval at which to try committing offsets. The default is 1 minute. -
Kafka topic
Uses a Kafka topic to store offset data.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore --debezium.properties.offset.storage.topic=my-kafka-offset-topic (1) --debezium.properties.offset.storage.partitions=2 (2) --debezium.properties.offset.storage.replication.factor=1 (3) --debezium.properties.offset.flush.interval.ms=60000 (4)
1 The name of the Kafka topic where offsets are to be stored. Required when offset.storage
is set to theKafkaOffsetBackingStore
.2 The number of partitions used when creating the offset storage topic. 3 Replication factor used when creating the offset storage topic. 4 Interval at which to try committing offsets. The default is 1 minute.
One can implement the org.apache.kafka.connect.storage.OffsetBackingStore
interface in to provide a offset storage bound to a custom backend key-value store.
Connectors properties
The table below lists all available Debezium properties for each connecter.
Those properties can be used by prefixing them by the debezium.properties.
prefix.
5.1.3. Examples and Testing
The debezium integration tests use databases fixtures, running on the local machine. Pre-build debezium docker database images with the help of Testcontainers are leveraged.
To run and debug the tests from your IDE you need to deploy the required database images from the command line. Instructions below explains how to run pre-configured test databases form Docker images.
MySQL
Start the debezium/example-mysql
in a docker:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
(optional) Use mysql client to connected to the database and to create a debezium user with required credentials:
|
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
Use following properties to connect the Debezium Source to MySQL DB:
debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector (1)
debezium.properties.name=my-connector (2)
debezium.properties.topic.prefix=my-topic (2)
debezium.properties.database.server.id=85744 (2)
debezium.properties.database.user=debezium (3)
debezium.properties.database.password=dbz (3)
debezium.properties.database.hostname=localhost (3)
debezium.properties.database.port=3306 (3)
debezium.properties.schema=true (4)
debezium.properties.key.converter.schemas.enable=true (4)
debezium.properties.value.converter.schemas.enable=true (4)
debezium.properties.transforms=unwrap (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (5)
debezium.properties.transforms.unwrap.add.fields=name,db (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none (5)
debezium.properties.transforms.unwrap.drop.tombstones=true (5)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (6)
1 | Configures the Debezium Source to use MySqlConnector. |
2 | Metadata used to identify and dispatch the incoming events. |
3 | Connection to the MySQL server running on localhost:3306 as debezium user. |
4 | Includes the Change Event Value schema in the ChangeEvent message. |
5 | Enables the Change Event Flattening. |
6 | Source state to preserver between multiple starts. |
You can run also the DebeziumDatabasesIntegrationTest#mysql()
using this mysql configuration.
Disable the mysql GenericContainer test initialization code. |
PostgreSQL
Start a pre-configured postgres server from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final
You can connect to this server like this:
psql -U postgres -h localhost -p 5432
Use following properties to connect the Debezium Source to PostgreSQL:
debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=postgres (4)
debezium.properties.database.password=postgres (4)
debezium.properties.database..dbname=postgres (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=5432 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 | Configures Debezium Source to use PostgresConnector. |
2 | Configures the Debezium engine to use memory stores. |
3 | Metadata used to identify and dispatch the incoming events. |
4 | Connection to the PostgreSQL server running on localhost:5432 as postgres user. |
5 | Includes the Change Event Value schema in the message. |
6 | Enables the Chage Event Flattening. |
You can run also the DebeziumDatabasesIntegrationTest#postgres()
using this postgres configuration.
Disable the postgres GenericContainer test initialization code. |
MongoDB
Start a pre-configured mongodb from the debezium/example-mongodb:2.3.3.Final
container image:
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:2.3.3.Final
Initialize the inventory collections
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
In the mongodb
terminal output, search for a log entry like host: "3f95a8a6516e:27017"
:
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
Add 127.0.0.1 3f95a8a6516e
entry to your /etc/hosts
Use following properties to connect the Debezium Source to MongoDB:
debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector (1)
debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.mongodb.hosts=rs0/localhost:27017 (3)
debezium.properties.topic.prefix=dbserver1 (3)
debezium.properties.mongodb.user=debezium (3)
debezium.properties.mongodb.password=dbz (3)
debezium.properties.database.whitelist=inventory (3)
debezium.properties.tasks.max=1 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 | Configures Debezium Source to use MongoDB Connector. |
2 | Configures the Debezium engine to use memory . |
3 | Connection to the MongoDB running on localhost:27017 as debezium user. |
4 | debezium.io/docs/connectors/mongodb/#tasks |
5 | Includes the Change Event Value schema in the SourceRecord events. |
6 | Enables the Chnage Event Flattening. |
You can run also the DebeziumDatabasesIntegrationTest#mongodb()
using this mongodb configuration.
SQL Server
Start a sqlserver
from the debezium/example-postgres:1.0
Docker image:
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
Populate with sample data form debezium SqlServer tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
Use following properties to connect the Debezium Source to SQLServer:
debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=sa (4)
debezium.properties.database.password=Password! (4)
debezium.properties.database..dbname=testDB (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=1433 (4)
1 | Configures Debezium Source to use SqlServerConnector. |
2 | Configures the Debezium engine to use memory state stores. |
3 | Metadata used to identify and dispatch the incoming events. |
4 | Connection to the SQL Server running on localhost:1433 as sa user. |
You can run also the DebeziumDatabasesIntegrationTest#sqlServer()
using this SqlServer configuration.
Oracle
Start Oracle reachable from localhost and set up with the configuration, users and grants described in the Debezium Vagrant set-up
Populate with sample data form Debezium Oracle tutorial:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. File Source
This application polls a directory and sends new files or their contents to the output channel. The file source provides the contents of a File as a byte array by default. However, this can be customized using the --file.supplier.mode option:
-
ref Provides a java.io.File reference
-
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --file.supplier.mode=lines
, you can also provide the additional option --file.supplier.withMarkers=true
.
If set to true, the underlying FileSplitter will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers defaults to false if not explicitly set.
5.2.1. Options
The file source has the following options:
5.3. FTP Source
This source application supports transfer of files using the FTP protocol.
Files are transferred from the remote
directory to the local
directory where the app is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
-
ref Provides a
java.io.File
reference -
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.
5.3.1. Input
N/A (Fetches files from an FTP server).
5.3.2. Output
mode = contents
Headers:
-
Content-Type: application/octet-stream
-
file_originalFile: <java.io.File>
-
file_name: <file name>
Payload:
A byte[]
filled with the file contents.
mode = lines
Headers:
-
Content-Type: text/plain
-
file_orginalFile: <java.io.File>
-
file_name: <file name>
-
correlationId: <UUID>
(same for each line) -
sequenceNumber: <n>
-
sequenceSize: 0
(number of lines is not know until the file is read)
Payload:
A String
for each line.
The first line is optionally preceded by a message with a START
marker payload.
The last line is optionally followed by a message with an END
marker payload.
Marker presence and format are determined by the with-markers
and markers-json
properties.
mode = ref
Headers:
None.
Payload:
A java.io.File
object.
5.3.3. Options
The ftp source has the following options:
5.3.4. Examples
java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
--ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo
5.4. Http Source
A source application that listens for HTTP requests and emits the body as a message payload.
If the Content-Type matches text/*
or application/json
, the payload will be a String,
otherwise the payload will be a byte array.
5.4.1. Payload:
If content type matches text/*
or application/json
-
String
If content type does not match text/*
or application/json
-
byte array
5.4.2. Options
The http source supports the following configuration properties:
5.5. JDBC Source
This source polls data from an RDBMS.
This source is fully based on the DataSourceAutoConfiguration
, so refer to the Spring Boot JDBC Support for more information.
5.5.1. Payload
-
Map<String, Object>
whenjdbc.split == true
(default) andList<Map<String, Object>>
otherwise
5.5.2. Options
The jdbc source has the following options:
Also see the Spring Boot Documentation
for addition DataSource
properties and TriggerProperties
and MaxMessagesProperties
for polling options.
5.6. JMS Source
The JMS source enables receiving messages from JMS.
5.6.1. Options
The JMS source has the following options:
5.7. Apache Kafka Source
This module consumes messages from Apache Kafka.
5.7.1. Options
The kafka source has the following options:
(See the Spring Boot documentation for Spring for Apache Kafka configuration properties)
5.8. Load Generator Source
A source that sends generated data and dispatches it to the stream.
5.8.1. Options
The load-generator source has the following options:
- load-generator.generate-timestamp
-
Whether timestamp generated. (Boolean, default:
false
) - load-generator.message-count
-
Message count. (Integer, default:
1000
) - load-generator.message-size
-
Message size. (Integer, default:
1000
) - load-generator.producers
-
Number of producers. (Integer, default:
1
)
5.9. Mail Source
A source application that listens for Emails and emits the message body as a message payload.
5.9.1. Options
The mail source has the following options:
5.10. MongoDB Source
This source polls data from MongoDB.
This source is fully based on the MongoDataAutoConfiguration
, so refer to the
Spring Boot MongoDB Support
for more information.
5.10.1. Options
The mongodb source has the following options:
Also see the Spring Boot Documentation for additional MongoProperties
properties.
See and TriggerProperties
for polling options.
5.11. MQTT Source
Source that enables receiving messages from MQTT.
5.11.1. Payload:
-
String
if binary setting isfalse
(default) -
byte[]
if binary setting istrue
5.11.2. Options
The mqtt source has the following options:
5.12. RabbitMQ Source
The "rabbit" source enables receiving messages from RabbitMQ.
The queue(s) must exist before the stream is deployed; they are not created automatically. You can easily create a Queue using the RabbitMQ web UI.
5.12.1. Input
N/A
5.12.2. Output
Payload
-
byte[]
5.12.3. Options
The rabbit source has the following options:
Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.
A Note About Retry
With the default ackMode (AUTO) and requeue (true) options, failed message deliveries will be retried
indefinitely.
Since there is not much processing in the rabbit source, the risk of failure in the source itself is small, unless
the downstream Binder is not connected for some reason.
Setting requeue to false will cause messages to be rejected on the first attempt (and possibly sent to a Dead Letter
Exchange/Queue if the broker is so configured).
The enableRetry option allows configuration of retry parameters such that a failed message delivery can be retried and
eventually discarded (or dead-lettered) when retries are exhausted.
The delivery thread is suspended during the retry interval(s).
Retry options are enableRetry, maxAttempts, initialRetryInterval, retryMultiplier, and maxRetryInterval.
Message deliveries failing with a MessageConversionException are never retried; the assumption being that if a message
could not be converted on the first attempt, subsequent attempts will also fail.
Such messages are discarded (or dead-lettered).
|
5.12.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
5.12.5. Examples
java -jar rabbit-source.jar --rabbit.queues=
5.13. Amazon S3 Source
This source app supports transfer of files using the Amazon S3 protocol.
Files are transferred from the remote
directory (S3 bucket) to the local
directory where the application is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
-
ref Provides a
java.io.File
reference -
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.
5.13.1. mode = lines
Headers:
-
Content-Type: text/plain
-
file_orginalFile: <java.io.File>
-
file_name: <file name>
-
correlationId: <UUID>
(same for each line) -
sequenceNumber: <n>
-
sequenceSize: 0
(number of lines is not know until the file is read)
Payload:
A String
for each line.
The first line is optionally preceded by a message with a START
marker payload.
The last line is optionally followed by a message with an END
marker payload.
Marker presence and format are determined by the with-markers
and markers-json
properties.
5.13.2. mode = ref
Headers:
None.
Payload:
A java.io.File
object.
5.13.3. Options
The s3-source
has the following options:
5.13.4. Amazon AWS common options
The Amazon S3 Source (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.
5.13.5. Examples
java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines
5.14. SFTP Source
This source application supports transfer of files using the SFTP protocol.
Files are transferred from the remote
directory to the local
directory where the app is deployed.
Messages emitted by the source are provided as a byte array by default. However, this can be
customized using the --mode
option:
-
ref Provides a
java.io.File
reference -
lines Will split files line-by-line and emit a new message for each line
-
contents The default. Provides the contents of a file as a byte array
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
See sftp-supplier
for advanced configuration options.
See also MetadataStore options for possible shared persistent store configuration used to prevent duplicate messages on restart.
5.14.1. Input
N/A (Fetches files from an SFTP server).
5.14.2. Output
mode = contents
Headers:
-
Content-Type: application/octet-stream
-
file_name: <file name>
-
file_remoteFileInfo <file metadata>
-
file_remoteHostPort: <host:port>
-
file_remoteDirectory: <relative-path>
-
file_remoteFile: <file-name>
-
sftp_selectedServer: <server-key>
(if multi-source)
Payload:
A byte[]
filled with the file contents.
mode = lines
Headers:
-
Content-Type: text/plain
-
file_name: <file name>
-
correlationId: <UUID>
(same for each line) -
sequenceNumber: <n>
-
sequenceSize: 0
(number of lines is not know until the file is read) -
file_marker : <file marker>
(if with-markers is enabled)
Payload:
A String
for each line.
The first line is optionally preceded by a message with a START
marker payload.
The last line is optionally followed by a message with an END
marker payload.
Marker presence and format are determined by the with-markers
and markers-json
properties.
mode = ref
Headers:
-
file_remoteHostPort: <host:port>
-
file_remoteDirectory: <relative-path>
-
file_remoteFile: <file-name>
-
file_originalFile: <absolute-path-of-local-file>
-
file_name <local-file-name>
-
file_relativePath
-
file_remoteFile: <remote-file-name>
-
sftp_selectedServer: <server-key>
(if multi-source)
Payload:
A java.io.File
object.
5.14.3. Options
The ftp source has the following options:
5.14.4. Examples
java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
--sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo
5.15. SYSLOG
The syslog source receives SYSLOG packets over UDP, TCP, or both. RFC3164 (BSD) and RFC5424 formats are supported.
5.15.1. Options
5.16. TCP
The tcp
source acts as a server and allows a remote party to connect to it and submit data over a raw tcp socket.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of decoders are available, the default being 'CRLF' which is compatible with Telnet.
Messages produced by the TCP source application have a byte[]
payload.
5.16.1. Options
5.16.2. Available Decoders
- CRLF (default)
-
text terminated by carriage return (0x0d) followed by line feed (0x0a)
- LF
-
text terminated by line feed (0x0a)
- NULL
-
text terminated by a null byte (0x00)
- STXETX
-
text preceded by an STX (0x02) and terminated by an ETX (0x03)
- RAW
-
no structure - the client indicates a complete message by closing the socket
- L1
-
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
- L2
-
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
- L4
-
data preceded by a four byte (signed) length field (up to 231-1 bytes)
5.17. Time Source
The time source will simply emit a String with the current time every so often.
5.17.1. Options
The time source has the following options:
spring.integration.poller
- cron
-
Cron expression for polling. Mutually exclusive with 'fixedDelay' and 'fixedRate'. (String, default:
<none>
) - fixed-delay
-
Polling delay period. Mutually exclusive with 'cron' and 'fixedRate'. (Duration, default:
<none>
) - fixed-rate
-
Polling rate period. Mutually exclusive with 'fixedDelay' and 'cron'. (Duration, default:
<none>
) - initial-delay
-
Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for 'cron'. (Duration, default:
<none>
) - max-messages-per-poll
-
Maximum number of messages to poll per polling cycle. (Integer, default:
<none>
) - receive-timeout
-
How long to wait for messages on poll. (Duration, default:
1s
)
5.18. Twitter Message Source
Repeatedly retrieves the direct messages (both sent and received) within the last 30 days, sorted in reverse-chronological order.
The relieved messages are cached (in a MetadataStore
cache) to prevent duplications.
By default an in-memory SimpleMetadataStore
is used.
The twitter.message.source.count
controls the number or returned messages.
The spring.cloud.stream.poller
properties control the message poll interval.
Must be aligned with used APIs rate limit
5.18.1. Options
5.19. Twitter Search Source
The Twitter’s Standard search API (search/tweets) allows simple queries against the indices of recent or popular Tweets. This Source
provides continuous searches against a sampling of recent Tweets published in the past 7 days. Part of the 'public' set of APIs.
Returns a collection of relevant Tweets matching a specified query.
Use the spring.cloud.stream.poller
properties to control the interval between consecutive search requests. Rate Limit - 180 requests per 30 min. window (e.g. ~6 r/m, ~ 1 req / 10 sec.)
The twitter.search
query properties allows querying by keywords and filter the result by time and geolocation.
The twitter.search.count
and twitter.search.page
control the result pagination in accordance with to the Search API.
Note: Twitter’s search service and, by extension, the Search API is not meant to be an exhaustive source of Tweets. Not all Tweets will be indexed or made available via the search interface.
5.19.1. Options
5.20. Twitter Stream Source
-
The
Filter API
returns public statuses that match one or more filter predicates. Multiple parameters allows using a single connection to the Streaming API. TIP: Thetrack
,follow
, andlocations
fields are combined with an OR operator! Queries withtrack=foo
andfollow=1234
returns Tweets matchingtest
OR created by user1234
. -
The
Sample API
returns a small random sample of all public statuses. The Tweets returned by the default access level are the same, so if two different clients connect to this endpoint, they will see the same Tweets.
The default access level allows up to 400 track keywords, 5,000 follow user Ids and 25 0.1-360 degree location boxes.
5.20.1. Options
5.21. Websocket Source
The Websocket
source that produces messages through web socket.
5.21.1. Options
5.21.2. Examples
To verify that the websocket-source receives messages from Websocket clients, you can use the following simple end-to-end setup.
Step 1: Start kafka
Step 2: Deploy websocket-source
on a specific port, say 8080
Step 3: Connect a websocket client on port 8080 path "/websocket", and send some messages.
You can start a kafka console consumer and see the messages there.
5.22. XMPP Source
The "xmpp" source enables receiving messages from an XMPP Server.
5.22.1. Input
N/A
5.22.2. Output
Payload
-
byte[]
5.22.3. Options
The xmpp source has the following options:
Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.
5.22.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
5.22.5. Examples
java -jar xmpp-source.jar --xmpp.factory.host=localhost --xmpp.factory.port=5222 --xmpp.factory.user=jane --xmpp.factory.password=secret --xmpp.factory.service-name=localhost
5.23. ZeroMQ Source
The "zeromq" source enables receiving messages from ZeroMQ.
5.23.1. Input
N/A
5.23.2. Output
Payload
-
byte[]
5.23.3. Options
The zeromq source has the following options:
Also see the Spring Boot Documentation for addition properties for the broker connections and listener properties.
5.23.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
5.23.5. Examples
java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=
6. Processors
6.1. Aggregator Processor
Aggregator processor enables an application to aggregates incoming messages into groups and release them into an output destination.
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
Change kafka to rabbit if you want to run it against RabbitMQ.
6.1.1. Payload
If an input payload is a byte[]
and content-type header is a JSON, then JsonBytesToMap
function tries to deserialize this payload to a Map
for better data representation on the output of the aggregator function.
Also, such a Map
data representation makes it easy to access to the payload content from SpEL expressions mentioned below.
Otherwise(including a deserialization error), the input payload is left as is - and it is the target application configuration to convert it into a desired form.
6.1.2. Options
6.2. Bridge Processor
A processor that bridges the input and ouput by simply passing the incoming payload to the outbound.
6.2.1. Payload
Any
6.3. Filter Processor
Filter processor enables an application to examine the incoming payload and then applies a predicate against it which decides if the record needs to be continued.
For example, if the incoming payload is of type String
and you want to filter out anything that has less than five characters, you can run the filter processor as below.
java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
Change kafka to rabbit if you want to run it against RabbitMQ.
6.3.1. Payload
You can pass any type as payload and then apply SpEL expressions against it to filter.
If the incoming type is byte[]
and the content type is set to text/plain
or application/json
, then the application converts the byte[]
into String
.
6.3.2. Options
6.4. Groovy Processor
A Processor that applies a Groovy script against messages.
6.4.1. Options
The groovy-processor processor has the following options:
- groovy-processor.script
-
Reference to a script used to process messages. (Resource, default:
<none>
) - groovy-processor.variables
-
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - groovy-processor.variables-location
-
The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
6.5. Header Enricher Processor
Use the header-enricher app to add message headers.
The headers are provided in the form of new line delimited key value pairs, where the keys are the header names and the values are SpEL expressions.
For example --headers='foo=payload.someProperty \n bar=payload.otherProperty'
.
6.5.1. Options
The header-enricher processor has the following options:
6.6. Http Request Processor
A processor app that makes requests to an HTTP resource and emits the response body as a message payload.
6.6.1. Input
Headers
Any Required HTTP headers must be explicitly set via the headers
or headers-expression
property. See examples below.
Header values may also be used to construct:
-
the request body when referenced in the
body-expression
property. -
the HTTP method when referenced in the
http-method-expression
property. -
the URL when referenced in the
url-expression
property.
Payload
The payload is used as the request body for a POST request by default, and can be any Java type. It should be an empty String for a GET request. The payload may also be used to construct:
-
the request body when referenced in the
body-expression
property. -
the HTTP method when referenced in the
http-method-expression
property. -
the URL when referenced in the
url-expression
property.
The underlying WebClient supports Jackson JSON serialization to support any request and response types if necessary.
The expected-response-type
property, String.class
by default, may be set to any class in your application class path.
Note that user defined payload types will require adding required dependencies to your pom file.
6.6.2. Output
Headers
No HTTP message headers are mapped to the outbound Message.
Payload
The raw output object is ResponseEntity<?> any of its fields (e.g., body
, headers
) or accessor methods (statusCode
) may be referenced as part of the reply-expression
.
By default the outbound Message payload is the response body.
Note that ResponseEntity (referenced by the expression #root
) cannot be deserialized by Jackson by default, but may be rendered as a HashMap
.
6.6.3. Options
The http-request processor has the following options:
Unresolved directive in processors.adoc - include::/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/image-recognition-processor/README.adoc[tags=ref-doc]
Unresolved directive in processors.adoc - include::/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/object-detection-processor/README.adoc[tags=ref-doc]
Unresolved directive in processors.adoc - include::/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/semantic-segmentation-processor/README.adoc[tags=ref-doc]
6.7. Script Processor
Processor that transforms messages using a script. The script body is supplied directly as a property value. The language of the script can be specified (groovy/javascript/ruby/python).
6.7.1. Options
The script-processor processor has the following options:
- script-processor.language
-
Language of the text in the script property. Supported: groovy, javascript, ruby, python. (String, default:
<none>
) - script-processor.script
-
Text of the script. (String, default:
<none>
) - script-processor.variables
-
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - script-processor.variables-location
-
The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
6.8. Splitter Processor
The splitter app builds upon the concept of the same name in Spring Integration and allows the splitting of a single message into several distinct messages.
The processor uses a function that takes a Message<?>
as input and then produces a List<Message<?>
as output based on various properties (see below).
You can use a SpEL expression or a delimiter to specify how you want to split the incoming message.
6.8.1. Payload
-
Incoming payload -
Message<?
>
If the incoming type is byte[]
and the content type is set to text/plain
or application/json
, then the application converts the byte[]
into String
.
-
Outgoing payload -
List<Message<?>
6.8.2. Options
6.9. Transform Processor
Transformer processor allows you to convert the message payload structure based on a SpEL expression.
Here is an example of how you can run this application.
java -jar transform-processor-kafka-<version>.jar \
--spel.function.expression=payload.toUpperCase()
Change kafka to rabbit if you want to run it against RabbitMQ.
6.9.1. Payload
The incoming message can contain any type of payload.
6.9.2. Options
6.10. Twitter Trend and Trend Locations Processor
Processor that can return either trending topic or the Locations of the trending topics.
The twitter.trend.trend-query-type
property allow to select the query type.
6.10.1. Retrieve trending topic in a location (optionally)
For this mode set twitter.trend.trend-query-type
to trend
.
Processor based on Trends API. Returns the trending topics near a specific latitude, longitude location.
6.10.2. Retrieve trend Locations
For this mode set twitter.trend.trend-query-type
to trendLocation
.
Retrieve a full or nearby locations list of trending topics by location.
If the latitude
, longitude
parameters are NOT provided the processor performs the Trends Available API and returns the locations that Twitter has trending topic information for.
If the latitude
, longitude
parameters are provided the processor performs the Trends Closest API and returns the locations that Twitter has trending topic information for, closest to a specified location.
Response is an array of locations
that encode the location’s WOEID and some other human-readable information such as a canonical name and country the location belongs in.
6.10.3. Options
7. Sinks
7.1. Cassandra Sink
This sink application writes the content of each message it receives into Cassandra.
It expects a payload of JSON String and uses it’s properties to map to table columns.
7.1.1. Payload
A JSON String or byte array representing the entity (or a list of entities) to be persisted.
7.1.2. Options
The cassandra sink has the following options:
7.2. Analytics Sink
Sink application, built on top of the Analytics Consumer, that computes analytics from the input messages and publishes the analytics as metrics to various monitoring systems. It leverages the micrometer library for providing a uniform programming experience across the most popular monitoring systems and exposes Spring Expression Language (SpEL) properties for defining how the metric Name, Values and Tags are computed from the input data.
The analytics sink can produce two metrics types:
-
Counter - reports a single metric, a count, that increments by a fixed, positive amount. Counters can be used for computing the rates of how the data changes in time.
-
Gauge - reports the current value. Typical examples for gauges would be the size of a collection or map or number of threads in a running state.
A Meter (e.g Counter or Gauge) is uniquely identified by its name
and dimensions
(the term dimensions and tags is used interchangeably). Dimensions allow a particular named metric to be sliced to drill down and reason about the data.
As a metrics is uniquely identified by its name and dimensions , you can assign multiple tags (e.g. key/value pairs) to every metric, but you cannot randomly change those tags afterwards! Monitoring systems such as Prometheus will complain if a metric with the same name has different sets of tags.
|
Use the analytics.name
or analytics.name-expression
properties set the name of the output analytics metrics. If not set the metrics name defaults to the applications name.
Use the analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>
, property for adding one or many tags to your metrics. the TAG_NAME
used in the property definition will appear as tag name in the metrics. The TAG_VALUE is a SpEL
expression that dynamically computes the tag value from the incoming message.
The SpEL
expressions use the headers
and payload
keywords to access message’s headers and payload values.
You can use literals (e.g. 'fixed value' ) to set tags with fixed values.
|
All Stream Applications support, ouf of the box, three of the most popular monitoring systems, Wavefront
, Prometheus
and InfluxDB
and you can enable each of them declaratively.
You can add support for additional monitoring systems by just adding their micrometer meter-registry dependencies to the Analytics Sink
applications.
Please visit the Spring Cloud Data Flow Stream Monitoring for detailed instructions for configuring the Monitoring Systems. The following quick snippets can help you start.
-
To enable the Prometheus meter registry, set the following properties.
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
To enable Wavefront meter registry, set the following properties.
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
To enable InfluxDB meter registry, set the following property.
management.metrics.export.influx.enabled=true management.metrics.export.influx.uri={influxdb-server-url}
If the Data Flow Server Monitoring is enabled then the Analytics Sink will reuse the provided metrics configurations.
|
Following diagram illustrates how the Analytics Sink
helps to collection business insides for stock-exchange, real-time pipeline.
7.2.1. Payload
The incoming message can contain any type of payload.
7.2.2. Options
7.3. Elasticsearch Sink
Sink that indexes documents into Elasticsearch.
This Elasticsearch sink only supports indexing JSON documents.
It consumes data from an input destination and then indexes it to Elasticsearch.
The input data can be a plain json string, or a java.util.Map
that represents the JSON.
It also accepts the data as the Elasticsearch provided XContentBuilder
.
However, this is a rare case as it is not likely the middleware keeps the records as XContentBuilder
.
This is provided mainly for direct invocation of the consumer.
7.3.1. Options
The Elasticsearch sink has the following options:
7.3.2. Examples of running this sink
-
From the folder
elasticsearch-sink
:./mvnw clean package
-
cd apps
-
cd to the proper binder generated app (Kafka or RabbitMQ)
-
./mvnw clean package
-
Make sure that you have Elasticsearch running. For example you can run it as a docker container using the following command.
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
-
Start the middleware (Kafka or RabbitMQ) if it is not already running.
-
java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing
-
Send some JSON data into the middleware destination. For e.g:
{"foo":"bar"}
-
Verify that the data is indexed:
curl localhost:9200/testing/_search
7.4. File Sink
The file sink app writes each message it receives to a file.
7.4.1. Payload
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.4.2. Options
The file-sink
has the following options:
7.5. FTP Sink
FTP sink is a simple option to push files to an FTP server from incoming messages.
It uses an ftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name
based on the value of the file_name header (if it exists) in the MessageHeaders , or if the payload of the Message is already a java.io.File , then it will
use the original name of that file.
|
7.5.1. Headers
-
file_name
(See note above)
7.5.2. Payload
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.5.3. Output
N/A (writes to the FTP server).
7.5.4. Options
The ftp sink has the following options:
7.6. JDBC Sink
JDBC sink allows you to persist incoming payload into an RDBMS database.
The jdbc.consumer.columns
property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE]
where EXPRESSION_FOR_VALUE
(together with the colon) is optional.
In this case the value is evaluated via generated expression like payload.COLUMN_NAME
, so this way we have a direct mapping from object properties to the table column.
For example we have a JSON payload like:
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
So, we can insert it into the table with name
, city
and street
structure using the configuration:
--jdbc.consumer.columns=name,city:address.city,street:address.street
This sink supports batch inserts, as far as supported by the underlying JDBC driver.
Batch inserts are configured via the batch-size
and idle-timeout
properties:
Incoming messages are aggregated until batch-size
messages are present, then inserted as a batch.
If idle-timeout
milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than batch-size
, capping maximum latency.
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.
|
7.6.1. Examples
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
--jdbc.consumer.columns=name \
--spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.6.2. Payload
7.6.3. Options
The jdbc sink has the following options:
7.7. Apache Kafka Sink
This module publishes messages to Apache Kafka.
7.7.1. Options
The kafka sink has the following options:
(See the Spring Boot documentation for Spring for Apache Kafka configuration properties)
7.8. Log Sink
The log
sink uses the application logger to output the data for inspection.
Please understand that log
sink uses type-less handler, which affects how the actual logging will be performed.
This means that if the content-type is textual, then raw payload bytes will be converted to String, otherwise raw bytes will be logged.
Please see more info in the user-guide.
7.8.1. Options
The log sink has the following options:
7.9. MongoDB Sink
This sink application ingest incoming data into MongoDB.
This application is fully based on the MongoDataAutoConfiguration
, so refer to the Spring Boot MongoDB Support for more information.
7.9.1. Input
Payload
-
Any POJO
-
String
-
byte[]
7.9.2. Options
The mongodb sink has the following options:
7.10. MQTT Sink
This module sends messages to MQTT.
7.10.1. Payload:
-
byte[]
-
String
7.10.2. Options
The mqtt sink has the following options:
7.11. Pgcopy Sink
A module that writes its incoming payload to an RDBMS using the PostgreSQL COPY command.
7.11.1. Input
Headers
Payload
-
Any
Column expression will be evaluated against the message and the expression will usually be compatible with only one type (such as a Map or bean etc.)
7.11.2. Output
N/A
7.11.3. Options
The jdbc sink has the following options:
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.
|
7.11.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
For integration tests to run, start a PostgreSQL database on localhost:
docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest
7.11.5. Examples
java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.12. RabbitMQ Sink
This module sends messages to RabbitMQ.
7.12.1. Options
The rabbit sink has the following options:
(See the Spring Boot documentation for RabbitMQ connection properties)
7.14. Router Sink
This application routes messages to named channels.
7.14.1. Options
The router sink has the following options:
- router.default-output-binding
-
Where to send un-routable messages. (String, default:
<none>
) - router.destination-mappings
-
Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - router.expression
-
The expression to be applied to the message to determine the channel(s) to route to. Note that the payload wire format for content types such as text, json or xml is byte[] not String!. Consult the documentation for how to handle byte array payload content. (Expression, default:
<none>
) - router.refresh-delay
-
How often to check for script changes in ms (if present); < 0 means don't refresh. (Integer, default:
60000
) - router.resolution-required
-
Whether channel resolution is required. (Boolean, default:
false
) - router.script
-
The location of a groovy script that returns channels or channel mapping resolution keys. (Resource, default:
<none>
) - router.variables
-
Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - router.variables-location
-
The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
This router sink is based on a StreamBridge API from Spring Cloud Stream, therefore destinations can be created as needed.
In this case a defaultOutputBinding can only be reached if key is not included into destinationMappings .
The resolutionRequired = true neglects defaultOutputBinding and throws an exception if no mapping and no respective binding already declared.
|
You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations
property.
By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of destination names, only those will be bound.
Messages that resolve to a destination that is not in this list will be routed to the defaultOutputBinding
, which must also appear in the list.
The destinationMappings
are used to map the evaluation results to an actual destination name.
7.14.2. SpEL-based Routing
The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.
For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring a Generic Router section.
7.14.3. Groovy-based Routing
Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :
println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.
For more information, see the Spring Integration Reference manual Groovy Support.
7.15. RSocket Sink
RSocket sink to send data using RSocket protocols' fire and forget strategy.
7.15.1. Options
The rsocket sink has the following options:
7.16. Amazon S3 Sink
This sink app supports transferring objects to the Amazon S3 bucket.
Files payloads (and directories recursively) are transferred to the remote
directory (S3 bucket) to the local
directory where the application is deployed.
Messages accepted by this sink must contain one of the payload
as:
-
File
, including directories for recursive upload; -
InputStream
; -
byte[]
7.16.1. Options
The s3 sink has the following options:
The target generated application based on the AmazonS3SinkConfiguration
can be enhanced with the S3MessageHandler.UploadMetadataProvider
and/or S3ProgressListener
, which are injected into S3MessageHandler
bean.
See Spring Integration AWS support for more details.
7.16.2. Amazon AWS common options
The Amazon S3 Sink (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.
Some of them are about AWS credentials:
-
spring.cloud.aws.credentials.accessKey
-
spring.cloud.aws.credentials.secretKey
-
spring.cloud.aws.credentials.instanceProfile
-
spring.cloud.aws.credentials.profileName
-
spring.cloud.aws.credentials.profilePath
Other are for AWS Region
definition:
-
cloud.aws.region.auto
-
cloud.aws.region.static
Examples
java -jar s3-sink.jar --s3.bucket=/tmp/bar
7.17. SFTP Sink
SFTP sink is a simple option to push files to an SFTP server from incoming messages.
It uses an sftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name
based on the value of the file_name header (if it exists) in the MessageHeaders , or if the payload of the Message is already a java.io.File , then it will
use the original name of that file.
|
When configuring the sftp.factory.known-hosts-expression
option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
.
7.17.1. Input
Headers
-
file_name
(See note above)
Payload
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.17.2. Output
N/A (writes to the SFTP server).
7.17.3. Options
The sftp sink has the following options:
7.18. TCP Sink
This module writes messages to TCP using an Encoder.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.
7.18.1. Options
The tcp sink has the following options:
7.18.2. Available Encoders
- CRLF (default)
-
text terminated by carriage return (0x0d) followed by line feed (0x0a)
- LF
-
text terminated by line feed (0x0a)
- NULL
-
text terminated by a null byte (0x00)
- STXETX
-
text preceded by an STX (0x02) and terminated by an ETX (0x03)
- RAW
-
no structure - the client indicates a complete message by closing the socket
- L1
-
data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
- L2
-
data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
- L4
-
data preceded by a four byte (signed) length field (up to 231-1 bytes)
7.19. Throughput Sink
Sink that will count messages and log the observed throughput at a selected interval.
7.19.1. Options
The throughput sink has the following options:
- throughput.report-every-ms
-
how often to report. (Integer, default:
1000
)
7.20. Twitter Message Sink
Send Direct Messages to a specified user from the authenticating user.
Requires a JSON POST body and Content-Type
header to be set to application/json
.
When a message is received from a user you may send up to 5 messages in response within a 24 hour window. Each message received resets the 24 hour window and the 5 allotted messages. Sending a 6th message within a 24 hour window or sending a message outside of a 24 hour window will count towards rate-limiting. This behavior only applies when using the POST direct_messages/events/new endpoint. |
SpEL expressions are used to compute the request parameters from the input message.
7.20.1. Options
Use single quotes (' ) to wrap the literal values of the SpEL expression properties.
For example to set a fixed message text use text='Fixed Text' .
For fixed target userId use userId='666' .
|
7.21. Twitter Update Sink
Updates the authenticating user’s current text (e.g Tweeting).
For each update attempt, the update text is compared with the authenticating user’s recent Tweets. Any attempt that would result in duplication will be blocked, resulting in a 403 error. A user cannot submit the same text twice in a row. |
While not rate limited by the API, a user is limited in the number of Tweets they can create at a time. The update limit for standard API is 300 in 3 hours windows. If the number of updates posted by the user reaches the current allowed limit this method will return an HTTP 403 error.
You can find details for the Update API here: developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.21.1. Options
7.22. Wavefront Sink
The Wavefront sink consumes Messages<?>, coverts it into a metric in Wavefront data format and sends the metric directly to Wavefront or a Wavefront proxy.
Supports common ETL use-cases, where existing (historical) metrics data has to be cleaned, transformed and stored in Wavefront for further analysis.
7.22.1. Options
The Wavefront sink has the following options:
7.23. Websocket Sink
A simple Websocket Sink implementation.
7.23.1. Options
The following options are supported:
7.23.2. Examples
To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.
Step 1: Start Rabbitmq
Step 2: Deploy a time-source
Step 3: Deploy the websocket-sink
Finally start a websocket-sink in trace
mode so that you see the messages produced by the time-source
in the log:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
You should start seeing log messages in the console where you started the WebsocketSink like this:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.23.3. Actuators
There is an Endpoint
that you can use to access the last n
messages sent and received. You have to
enable it by providing --endpoints.websocketconsumertrace.enabled=true
. By default it shows the last 100 messages via the
host:port/websocketconsumertrace
. Here is a sample output:
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
There is also a simple HTML page where you see forwarded messages in a text area. You can access
it directly via host:port
in your browser.
7.24. XMPP Sink
The "xmpp" sink enables sending messages to a XMPP server.
7.24.1. Input
-
byte[]
7.24.2. Output
Payload
N/A
7.24.3. Options
The zeromq sink has the following options:
7.24.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
7.25. ZeroMQ Sink
The "zeromq" sink enables sending messages to a ZeroMQ socket.
7.25.1. Input
-
byte[]
7.25.2. Output
Payload
N/A
7.25.3. Options
The zeromq sink has the following options:
7.25.4. Build
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
7.25.5. Examples
java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=