应用
5. 来源
5.1. Debezium 源码
基于 Debezium 引擎的变更数据捕获 (CDC) 源。
它允许捕获数据库更改事件并通过不同的消息绑定器(例如)和所有 Spring Cloud Stream 支持者代理流式传输这些事件。Debezium Source
Apache Kafka
RabbitMQ
此源可以与任何 Spring Cloud Stream 消息 Binder 一起使用。 它不受 Kafka Connect 框架的限制或依赖。尽管这种方法很灵活,但它存在一些限制。 |
支持所有 Debezium 配置属性。
只需在任何 Debezium 属性前面加上前缀即可。
例如,要设置 Debezium 的属性,请改用 source 属性。debezium.properties.
connector.class
debezium.properties.connector.class
5.1.1. 数据库支持
目前支持多个数据存储的 CDC:MySQL、PostgreSQL、MongoDB、Oracle、SQL Server、Db2、Vitess 和 Spanner 数据库。Debezium Source
5.1.2. 选项
事件扁平化配置
Debezium 提供了一种全面的消息格式,可以准确地详细说明系统中发生的更改的信息。
但是,有时这种格式可能不适合下游使用者,这可能需要格式化消息,以便以简化的结构显示字段名称和值。flattened
要简化 Debezium 连接器生成的事件记录的格式,可以使用 Debezium 事件展平消息转换。 使用讨人喜欢的配置,您可以配置简单的消息格式,如下所示:
--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
Debezium 胶印存储
当 Debezium 源运行时,它会从源中读取信息,并定期记录定义它已处理的信息量。
如果源重新启动,它将使用最后记录的偏移量来了解它应该在源信息中的哪个位置继续读取。
开箱即用,提供了以下偏移存储配置选项:offsets
-
内存中
Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
-
本地文件系统
Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat (1) --debezium.properties.offset.flush.interval.ms=60000 (2)
1 要存储偏移量的文件的路径。当设置为 . offset.storage`
FileOffsetBackingStore
2 尝试提交偏移量的时间间隔。默认值为 1 分钟。 -
Kafka 主题
Uses a Kafka topic to store offset data.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore --debezium.properties.offset.storage.topic=my-kafka-offset-topic (1) --debezium.properties.offset.storage.partitions=2 (2) --debezium.properties.offset.storage.replication.factor=1 (3) --debezium.properties.offset.flush.interval.ms=60000 (4)
1 要存储偏移量的 Kafka 主题的名称。当设置为 . offset.storage
KafkaOffsetBackingStore
2 创建 offset 存储主题时使用的分区数。 3 创建偏移存储主题时使用的复制因子。 4 尝试提交偏移量的时间间隔。默认值为 1 分钟。
可以在 中实现接口以提供绑定到自定义后端键值存储的偏移量存储。org.apache.kafka.connect.storage.OffsetBackingStore
连接器属性
下表列出了每个连接器的所有可用 Debezium 属性。
可以通过在它们前面加上前缀来使用这些属性。debezium.properties.
5.1.3. 示例和测试
debezium 集成测试使用在本地计算机上运行的数据库夹具。在 Testcontainers 的帮助下,利用预构建的 debezium docker 数据库镜像。
要从 IDE 运行和调试测试,您需要从命令行部署所需的数据库映像。 以下说明介绍了如何从 Docker 映像运行预配置的测试数据库。
MySQL (MySQL的
在 docker 中启动 :debezium/example-mysql
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
(可选)使用 client 连接到数据库并创建具有所需凭据的用户:mysql debezium |
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
使用以下属性将 Debezium Source 连接到 MySQL DB:
debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector (1)
debezium.properties.name=my-connector (2)
debezium.properties.topic.prefix=my-topic (2)
debezium.properties.database.server.id=85744 (2)
debezium.properties.database.user=debezium (3)
debezium.properties.database.password=dbz (3)
debezium.properties.database.hostname=localhost (3)
debezium.properties.database.port=3306 (3)
debezium.properties.schema=true (4)
debezium.properties.key.converter.schemas.enable=true (4)
debezium.properties.value.converter.schemas.enable=true (4)
debezium.properties.transforms=unwrap (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (5)
debezium.properties.transforms.unwrap.add.fields=name,db (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none (5)
debezium.properties.transforms.unwrap.drop.tombstones=true (5)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (6)
1 | 将 Debezium 源配置为使用 MySqlConnector。 |
2 | 用于标识和调度传入事件的元数据。 |
3 | 连接到以 用户身份运行的 MySQL 服务器。localhost:3306 debezium |
4 | 在消息中包含 Change Event Value 架构。ChangeEvent |
5 | 启用 Change Event Flattening。 |
6 | Source state 到 preserver 的 source 状态。 |
您也可以使用此 mysql 配置运行。DebeziumDatabasesIntegrationTest#mysql()
禁用 mysql GenericContainer 测试初始化代码。 |
PostgreSQL 数据库
从 Docker 镜像启动预配置的 postgres 服务器:debezium/example-postgres:1.0
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final
您可以像这样连接到此服务器:
psql -U postgres -h localhost -p 5432
使用以下属性将 Debezium 源连接到 PostgreSQL:
debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=postgres (4)
debezium.properties.database.password=postgres (4)
debezium.properties.database..dbname=postgres (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=5432 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 | 配置为使用 PostgresConnector。Debezium Source |
2 | 将 Debezium 引擎配置为使用 store。memory |
3 | 用于标识和调度传入事件的元数据。 |
4 | 连接到以 用户身份运行的 PostgreSQL 服务器。localhost:5432 postgres |
5 | 在消息中包含 Change Event Value 架构。 |
6 | 启用 Chage 事件展平。 |
您还可以使用此 postgres 配置运行 。DebeziumDatabasesIntegrationTest#postgres()
禁用 postgres GenericContainer 测试初始化代码。 |
MongoDB 数据库
从容器镜像启动预配置的 mongodb:debezium/example-mongodb:2.3.3.Final
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:2.3.3.Final
初始化清单集合
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
在终端输出中,搜索类似 :mongodb
host: "3f95a8a6516e:27017"
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
将条目添加到您的127.0.0.1 3f95a8a6516e
/etc/hosts
使用以下属性将 Debezium 源连接到 MongoDB:
debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector (1)
debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.mongodb.hosts=rs0/localhost:27017 (3)
debezium.properties.topic.prefix=dbserver1 (3)
debezium.properties.mongodb.user=debezium (3)
debezium.properties.mongodb.password=dbz (3)
debezium.properties.database.whitelist=inventory (3)
debezium.properties.tasks.max=1 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 | 配置为使用 MongoDB Connector。Debezium Source |
2 | 将 Debezium 引擎配置为使用 .memory |
3 | 连接到以 user.localhost:27017 debezium |
4 | debezium.io/docs/connectors/mongodb/#tasks |
5 | 在事件中包含 Change Event Value 架构。SourceRecord |
6 | 启用 Chnage 事件展平。 |
您还可以使用此 mongodb 配置运行 。DebeziumDatabasesIntegrationTest#mongodb()
SQL 服务器
从 Docker 镜像启动 a:sqlserver
debezium/example-postgres:1.0
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
使用 debezium SqlServer 教程中的示例数据填充:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
使用以下属性将 Debezium Source 连接到 SQLServer:
debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=sa (4)
debezium.properties.database.password=Password! (4)
debezium.properties.database..dbname=testDB (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=1433 (4)
1 | 配置为使用 SqlServerConnector。Debezium Source |
2 | 将 Debezium 引擎配置为使用状态存储。memory |
3 | 用于标识和调度传入事件的元数据。 |
4 | 连接到以用户身份运行的 SQL Server。localhost:1433 sa |
您还可以使用此 SqlServer 配置运行 。DebeziumDatabasesIntegrationTest#sqlServer()
神谕
从 localhost 启动 Oracle reachable,并使用 Debezium Vagrant 设置中描述的配置、用户和授权进行设置
使用 Debezium Oracle 教程中的示例数据填充:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. 文件源
此应用程序轮询目录并将新文件或其内容发送到 output 通道。 默认情况下,file 源以字节数组的形式提供 File 的内容。 但是,这可以使用 --file.supplier.mode 选项进行自定义:
-
ref 提供 java.io.File 引用
-
lines 将逐行拆分文件并为每行发出一条新消息
-
contents 默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 true,则底层 FileSplitter 将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项 withMarkers 默认为 false。--file.supplier.mode=lines
--file.supplier.withMarkers=true
FileSplitter.FileMarker
5.3. FTP 源
此源应用程序支持使用 FTP 协议传输文件。
文件将从目录传输到部署应用程序的目录。
默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是
使用选项进行自定义:remote
local
--mode
-
裁判提供参考
java.io.File
-
线将逐行拆分文件,并为每行发出一条新消息
-
内容默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头和文件结尾标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines
--withMarkers=true
true
FileSplitter
FileSplitter.FileMarker
withMarkers
false
另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。
5.3.1. 输入
N/A (从 FTP 服务器获取文件)。
5.3.2. 输出
mode = 内容
头:
-
Content-Type: application/octet-stream
-
file_originalFile: <java.io.File>
-
file_name: <file name>
有效载荷:
A 填充了文件内容。byte[]
模式 = 行
头:
-
Content-Type: text/plain
-
file_orginalFile: <java.io.File>
-
file_name: <file name>
-
correlationId: <UUID>
(每行相同) -
sequenceNumber: <n>
-
sequenceSize: 0
(在读取文件之前不知道行数)
有效载荷:
A 表示每行。String
第一行前面有一条带有标记有效负载的消息(可选)。
最后一行(可选)后跟带有标记有效负载的消息。START
END
标记存在和格式由 和 属性确定。with-markers
markers-json
模式 = ref
头:
没有。
有效载荷:
一个对象。java.io.File
5.3.4. 示例
java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
--ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo
5.4. Http 源
一个源应用程序,用于侦听 HTTP 请求并将正文作为消息负载发出。
如果 Content-Type 与 或 匹配,则有效负载将为 String,
否则,有效负载将是一个字节数组。text/*
application/json
5.4.1. 有效载荷:
如果内容类型匹配或text/*
application/json
-
String
如果内容类型不匹配或text/*
application/json
-
byte array
5.5. JDBC 源码
此源从 RDBMS 轮询数据。
此源代码完全基于 ,因此请参阅 Spring Boot JDBC 支持以获取更多信息。DataSourceAutoConfiguration
5.5.1. 有效载荷
-
Map<String, Object>
when(默认)和jdbc.split == true
List<Map<String, Object>>
5.5.2. 选项
jdbc 源具有以下选项:
另请参阅 Spring Boot 文档,了解其他属性和轮询选项。DataSource
TriggerProperties
MaxMessagesProperties
5.7. Apache Kafka 源码
此模块使用来自 Apache Kafka 的消息。
5.7.1. 选项
kafka 源具有以下选项:
(请参阅 Spring for Apache Kafka 配置属性的 Spring Boot 文档)
5.8. 负载生成器源
发送生成的数据并将其调度到流的源。
5.8.1. 选项
load-generator 源具有以下选项:
- load-generator.generate-timestamp 生成时间戳
-
是否生成时间戳。(布尔值,默认值:
false
) - load-generator.消息计数
-
消息计数。(整数,默认值:
1000
) - load-generator.消息大小
-
消息大小。(整数,默认值:
1000
) - load-generator.producers
-
生产者数量。(整数,默认值:
1
)
5.10. MongoDB 源
此源轮询来自 MongoDB 的数据。
此源完全基于 ,因此请参阅 Spring Boot MongoDB 支持以获取更多信息。MongoDataAutoConfiguration
5.10.1. 选项
mongodb 源具有以下选项:
另请参阅 Spring Boot 文档以获取其他属性。
请参阅 和 了解轮询选项。MongoProperties
TriggerProperties
5.11. MQTT 源码
允许从 MQTT 接收消息的 Source。
5.11.1. 有效负载:
-
String
如果 Binary 设置为 (default)false
-
byte[]
如果二进制设置为true
5.12. RabbitMQ 源码
“rabbit” 源允许从 RabbitMQ 接收消息。
在部署流之前,队列必须存在;它们不是自动创建的。 您可以使用 RabbitMQ Web UI 轻松创建队列。
5.12.1. 输入
不适用
5.12.2. 输出
有效载荷
-
byte[]
5.12.3. 选项
rabbit 源具有以下选项:
另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的其他属性。
关于重试的说明
使用默认的 ackMode (AUTO) 和 requeue (true) 选项,将重试失败的消息传送
无限期。
由于 rabbit source 中没有太多的处理,因此 source 本身失败的风险很小,除非
由于某种原因,下游未连接。
将 requeue 设置为 false 将导致邮件在第一次尝试时被拒绝(并可能发送到 Dead Letter
Exchange/Queue(如果代理是这样配置的)。
的 enableRetry 选项允许配置重试参数,以便可以重试失败的消息投放,并且
最终在重试用尽时丢弃(或死信)。
传递线程在重试间隔期间暂停。
重试选项包括 enableRetry、maxAttempts、initialRetryInterval、retryMultiplier 和 maxRetryInterval。
消息传送失败并显示 MessageConversionException 永远不会重试;假设是,如果消息
无法转换,则后续尝试也将失败。
此类消息将被丢弃(或死信)。Binder |
5.12.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
5.12.5. 示例
java -jar rabbit-source.jar --rabbit.queues=
5.13. Amazon S3 源
此源应用程序支持使用 Amazon S3 协议传输文件。
文件从目录 (S3 存储桶) 传输到部署应用程序的目录。remote
local
默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是
使用选项进行自定义:--mode
-
裁判提供参考
java.io.File
-
线将逐行拆分文件,并为每行发出一条新消息
-
内容默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头和文件结尾标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines
--withMarkers=true
true
FileSplitter
FileSplitter.FileMarker
withMarkers
false
另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。
5.13.1. 模式 = 行
头:
-
Content-Type: text/plain
-
file_orginalFile: <java.io.File>
-
file_name: <file name>
-
correlationId: <UUID>
(每行相同) -
sequenceNumber: <n>
-
sequenceSize: 0
(在读取文件之前不知道行数)
有效载荷:
A 表示每行。String
第一行前面有一条带有标记有效负载的消息(可选)。
最后一行(可选)后跟带有标记有效负载的消息。START
END
标记存在和格式由 和 属性确定。with-markers
markers-json
5.13.2. 模式 = ref
头:
没有。
有效载荷:
一个对象。java.io.File
5.13.4. Amazon AWS 常用选项
Amazon S3 源(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,其自动配置类由 Spring Boot 自动使用。 请查阅其文档,了解必需和有用的自动配置属性。
5.13.5. 示例
java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines
5.14. SFTP 源
此源应用程序支持使用 SFTP 协议传输文件。
文件将从目录传输到部署应用程序的目录。
默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是
使用选项进行自定义:remote
local
--mode
-
裁判提供参考
java.io.File
-
线将逐行拆分文件,并为每行发出一条新消息
-
内容默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头和文件结尾标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines
--withMarkers=true
true
FileSplitter
FileSplitter.FileMarker
withMarkers
false
有关高级配置选项,请参阅 sftp-supplier
。
另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。
5.14.1. 输入
N/A (从 SFTP 服务器获取文件)。
5.14.2. 输出
mode = 内容
头:
-
Content-Type: application/octet-stream
-
file_name: <file name>
-
file_remoteFileInfo <file metadata>
-
file_remoteHostPort: <host:port>
-
file_remoteDirectory: <relative-path>
-
file_remoteFile: <file-name>
-
sftp_selectedServer: <server-key>
(如果是多源)
有效载荷:
A 填充了文件内容。byte[]
模式 = 行
头:
-
Content-Type: text/plain
-
file_name: <file name>
-
correlationId: <UUID>
(每行相同) -
sequenceNumber: <n>
-
sequenceSize: 0
(在读取文件之前不知道行数) -
file_marker : <file marker>
(如果启用了 with-markers)
有效载荷:
A 表示每行。String
第一行前面有一条带有标记有效负载的消息(可选)。
最后一行(可选)后跟带有标记有效负载的消息。START
END
标记存在和格式由 和 属性确定。with-markers
markers-json
模式 = ref
头:
-
file_remoteHostPort: <host:port>
-
file_remoteDirectory: <relative-path>
-
file_remoteFile: <file-name>
-
file_originalFile: <absolute-path-of-local-file>
-
file_name <local-file-name>
-
file_relativePath
-
file_remoteFile: <remote-file-name>
-
sftp_selectedServer: <server-key>
(如果是多源)
有效载荷:
一个对象。java.io.File
5.14.4. 示例
java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
--sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo
5.16. TCP 协议
源充当服务器,并允许远程方连接到它并通过原始 tcp 套接字提交数据。tcp
TCP 是一种流协议,需要一些机制来在网络上构建消息。许多解码器是 available,默认值为 'CRLF',它与 Telnet 兼容。
TCP 源应用程序生成的消息具有有效负载。byte[]
5.16.1. 选项
5.16.2. 可用的解码器
- CRLF (默认)
-
以回车 (0x0d) 后跟换行符 (0x0a) 结尾的文本
- 如果
-
由换行符终止的文本 (0x0a)
- 零
-
以 null 字节 (0x00) 结尾的文本
- STXETX
-
文本前面有 STX (0x02) 并以 ETX (0x03) 结尾
- 生
-
no structure - 客户端通过关闭套接字来指示完整的消息
- L1 系列
-
数据前面有一个单字节(无符号)长度字段(最多支持 255 个字节)
- L2 (二层)
-
数据前面有一个两字节(无符号)长度的字段(最多 2 个16-1 字节)
- L4 系列
-
数据前面有一个四字节(带符号)长度的字段(最多 2 个31-1 字节)
5.17. 时间源
时间源将每隔一段时间简单地发出一个包含当前时间的 String。
5.17.1. 选项
时间源具有以下选项:
spring.integration.poller
- cron (定时)
-
Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:
<none>
) - 固定延迟
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:
<无>
) - 固定利率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:
<无>
) - 初始延迟
-
轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:
<无>
) - 最大每次轮询消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认值:
<none>
) - 接收超时
-
轮询消息等待多长时间。(持续时间,默认:
1 秒
)
5.18. Twitter 消息源
重复检索过去 30 天内的直接消息(发送和接收),按时间倒序排序。
已释放的消息将缓存(在缓存中)以防止重复。
默认情况下,使用 in-memory。MetadataStore
SimpleMetadataStore
控制消息的数量或返回的消息。twitter.message.source.count
这些属性控制消息轮询间隔。
必须与使用的 API 速率限制保持一致spring.cloud.stream.poller
5.18.1. 选项
5.19. Twitter 搜索源
Twitter 的标准搜索 API(搜索/推文)允许对最近或热门推文的索引进行简单查询。这提供了对过去 7 天内发布的最近推文样本的连续搜索。“公共”API 集的一部分。Source
返回与指定查询匹配的相关推文的集合。
使用属性可控制连续搜索请求之间的间隔。速率限制 - 每 30 分钟窗口 180 个请求(例如 ~6 r/m,~ 1 个请求/10 秒)spring.cloud.stream.poller
查询属性允许按关键字进行查询,并按时间和地理位置筛选结果。twitter.search
和 根据搜索 API 控制结果分页。twitter.search.count
twitter.search.page
注意:Twitter 的搜索服务以及 Search API 并不是详尽的推文来源。并非所有推文都会被编入索引或通过搜索界面提供。
5.19.1. 选项
5.20. Twitter 流源
-
返回与一个或多个筛选条件谓词匹配的公有状态。 多个参数允许使用与 Streaming API 的单个连接。 提示:、 、 和 字段与 OR 运算符组合在一起! 查询并返回与 OR 匹配的推文 由 用户 创建。
Filter API
track
follow
locations
track=foo
follow=1234
test
1234
-
这将返回所有公共状态的小型随机样本。 默认访问级别返回的推文是相同的,因此,如果两个不同的客户端连接到此终端节点,它们将看到相同的推文。
Sample API
默认访问级别最多允许 400 个跟踪关键词、5,000 个关注用户 ID 和 25 个 0.1-360 度位置框。
5.20.1. 选项
5.21. Websocket 源码
通过 Web 套接字生成消息的源。Websocket
5.21.1. 选项
5.21.2. 示例
要验证 websocket-source 是否从 Websocket 客户端接收消息,您可以使用以下简单的端到端设置。
第 1 步:启动 Kafka
第 2 步:在特定端口上部署,例如 8080websocket-source
第 3 步:在 8080 端口路径 “/websocket” 上连接一个 websocket 客户端,并发送一些消息。
您可以启动 Kafka 控制台使用者并在其中查看消息。
5.22. XMPP 源
“xmpp” 源允许从 XMPP 服务器接收消息。
5.22.1. 输入
不适用
5.22.2. 输出
有效载荷
-
byte[]
5.22.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
5.22.5. 示例
java -jar xmpp-source.jar --xmpp.factory.host=localhost --xmpp.factory.port=5222 --xmpp.factory.user=jane --xmpp.factory.password=secret --xmpp.factory.service-name=localhost
5.23. ZeroMQ 源码
“zeromq” 源允许从 ZeroMQ 接收消息。
5.23.1. 输入
不适用
5.23.2. 输出
有效载荷
-
byte[]
5.23.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
5.23.5. 例子
java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=
6. 处理器
6.1. 聚合器处理器
聚合器处理器使应用程序能够将传入消息聚合到组中,并将其发布到输出目标中。
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。
6.1.1. 负载
如果输入有效负载是 a 且 content-type 标头是 JSON,则函数会尝试将此有效负载反序列化为 a,以便在聚合器函数的输出上更好地表示数据。
此外,这样的数据表示可以轻松地从下面提到的 SPEL 表达式中访问有效负载内容。
否则(包括反序列化错误),输入有效负载将保持原样 - 并且是将其转换为所需形式的目标应用程序配置。byte[]
JsonBytesToMap
Map
Map
6.1.2. 选项
6.3. Filter处理器
筛选器处理器使应用程序能够检查传入的有效负载,然后对其应用谓词,以决定是否需要继续记录。
例如,如果传入的有效负载是 type 并且您想要过滤掉少于 5 个字符的任何内容,则可以运行过滤器处理器,如下所示。String
java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。
6.3.1. 负载
您可以将任何类型作为有效负载传递,然后对其应用 SpEL 表达式以进行筛选。
如果传入类型为 且内容类型设置为 或 ,则应用程序会将 转换为 。byte[]
text/plain
application/json
byte[]
String
6.3.2. 选项
6.4. Groovy 处理器
对消息应用 Groovy 脚本的处理器。
6.4.1. 选项
groovy-processor 处理器具有以下选项:
- groovy-processor.script
-
对用于处理消息的脚本的引用。(资源,默认值:
<无>
) - groovy-processor.variables
-
变量绑定作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - groovy-processor.variables-位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<无>
)
6.5. Header Enricher 处理器
使用 header-enricher 应用程序添加消息标头。
标头以换行分隔的键值对的形式提供,其中键是标头名称,值是 SpEL 表达式。
例如。--headers='foo=payload.someProperty \n bar=payload.otherProperty'
6.6. Http 请求处理器
一个处理器应用程序,它向 HTTP 资源发出请求并将响应正文作为消息负载发出。
6.6.1. 输入
头
任何必需的 HTTP 标头都必须通过 or 属性显式设置。请参阅下面的示例。
标头值也可用于构造:headers
headers-expression
-
在属性中引用时的请求正文。
body-expression
-
在属性中引用的 HTTP 方法。
http-method-expression
-
在属性中引用时的 URL。
url-expression
有效载荷
默认情况下,有效负载用作 POST 请求的请求正文,可以是任何 Java 类型。 对于 GET 请求,它应为空 String。 payload 还可用于构造:
-
在属性中引用时的请求正文。
body-expression
-
在属性中引用的 HTTP 方法。
http-method-expression
-
在属性中引用时的 URL。
url-expression
底层 WebClient 支持 Jackson JSON 序列化,以在必要时支持任何请求和响应类型。
默认情况下,该属性可以设置为应用程序类路径中的任何类。
请注意,用户定义的有效负载类型需要向 pom 文件添加所需的依赖项。expected-response-type
String.class
6.6.2. 输出
头
没有 HTTP 消息标头映射到出站消息。
有效载荷
原始输出对象是 ResponseEntity<?>它的任何字段(例如、、)或访问器方法 () 都可以作为 .
默认情况下,出站 Message 负载是响应正文。
请注意, ResponseEntity (由表达式引用) 默认情况下不能由 Jackson 反序列化,但可以呈现为 .body
headers
statusCode
reply-expression
#root
HashMap
6.6.3. 选项
http-request 处理器有以下选项:
processors.adoc 中未解析的指令 - include::/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/image-recognition-processor/README.adoc[tags=ref-doc]
processors.adoc 中未解析的指令 - include::/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/object-detection-processor/README.adoc[tags=ref-doc]
processors.adoc 中未解析的指令 - include::/home/runner/work/stream-applications/stream-applications/stream-applications/stream-applications/stream-applications-release-train/stream-applications-docs/../../applications/processor/semantic-segmentation-processor/README.adoc[tags=ref-doc]
6.7. 脚本处理器
使用脚本转换消息的处理器。脚本正文是直接提供的 作为属性值。可以指定脚本的语言 (groovy/javascript/ruby/python)。
6.7.1. 选项
script-processor 处理器具有以下选项:
- script-processor.language 语言
-
script 属性中文本的语言。支持:groovy、javascript、ruby、python。(字符串,默认值:
<none>
) - script-processor.脚本
-
脚本的文本。(字符串,默认值:
<none>
) - script-processor.variables
-
变量绑定作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - 脚本处理器.变量位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<无>
)
6.8. 分路器处理器
splitter 应用程序构建在 Spring Integration 中的同名概念之上,并允许将单个消息拆分为多个不同的消息。
处理器使用一个函数,该函数将 a 作为输入,然后根据各种属性生成 as 输出(见下文)。
您可以使用 SPEL 表达式或分隔符来指定要如何拆分传入消息。Message<?>
List<Message<?>
6.8.1. 负载
-
传入有效负载 -
Message<?
>
如果传入类型为 且内容类型设置为 或 ,则应用程序会将 转换为 。byte[]
text/plain
application/json
byte[]
String
-
传出有效负载 -
List<Message<?>
6.8.2. 选项
6.9. 变换处理器
Transformer 处理器允许您根据 SPEL 表达式转换消息有效负载结构。
下面是如何运行此应用程序的示例。
java -jar transform-processor-kafka-<version>.jar \
--spel.function.expression=payload.toUpperCase()
如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。
6.9.1. 负载
传入消息可以包含任何类型的有效负载。
6.9.2. 选项
6.10. Twitter 趋势和趋势位置处理器
可以返回热门主题或热门主题的 Locations 的处理器。
该属性 允许 选择查询类型。twitter.trend.trend-query-type
6.10.1. 在某个位置检索热门话题(可选)
对于此模式,设置为 。twitter.trend.trend-query-type
trend
基于 Trends API 的处理器。 返回特定纬度、经度位置附近的热门主题。
6.10.2. 获取趋势位置
对于此模式,设置为 。twitter.trend.trend-query-type
trendLocation
按位置检索热门主题的完整列表或附近的位置列表。
如果未提供 , 参数,则处理器将执行 Trends Available API 并返回 Twitter 具有其热门主题信息的位置。latitude
longitude
如果提供了 , 参数,则处理器将执行 Trends Closest API 并返回 Twitter 具有其热门主题信息、最接近指定位置的位置。latitude
longitude
响应是一个数组,用于编码位置的 WOEID 和一些其他人类可读的信息,例如位置所属的规范名称和国家/地区。locations
6.10.3. 选项
7. 水槽
7.1. Cassandra 接收器
此接收器应用程序将其收到的每条消息的内容写入 Cassandra。
它需要 JSON String 的有效负载,并使用其属性映射到表列。
7.1.1. 负载
一个 JSON 字符串或字节数组,表示要持久保存的实体(或实体列表)。
7.2. Analytics 接收器
Sink 应用程序,构建在 Analytics Consumer 之上,用于计算输入消息的分析,并将分析作为指标发布到各种监控系统。它利用千分尺库在最流行的监控系统中提供统一的编程体验,并公开 Spring 表达式语言 (SpEL) 属性,用于定义如何从输入数据计算指标 Name、Values 和 Tags。
分析接收器可以生成两种指标类型:
仪表(例如 Counter 或 Gauge)由其 和 唯一标识(术语 dimensions 和 tags 可互换使用)。维度允许对特定的命名量度进行切片,以便向下钻取和推理数据。name
dimensions
由于量度由其 和 唯一标识,因此您可以为每个量度分配多个标签(e.g. key/值对),但之后无法随机更改这些标签!如果具有相同名称的指标具有不同的标签集,Prometheus 等监控系统将会抱怨。name dimensions |
使用 or 属性设置输出分析指标的名称。如果未设置,则指标名称默认为应用程序名称。analytics.name
analytics.name-expression
使用 , 属性向量度添加一个或多个标签。在属性定义中使用的将在量度中显示为标记名称。TAG_VALUE 是一个表达式,用于动态计算传入消息的标签值。analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>
TAG_NAME
SpEL
表达式使用 and 关键字来访问消息的标头和有效负载值。SpEL
headers
payload
你可以使用 literals (例如 ) 来设置具有固定值的标签。'fixed value' |
所有 Stream 应用程序都支持三种最流行的监控系统,并且您可以通过声明方式启用它们。
您只需将 micrometer meter-registry 依赖项添加到应用程序中,即可添加对其他监控系统的支持。Wavefront
Prometheus
InfluxDB
Analytics Sink
请访问 Spring Cloud 数据流流监控,了解有关配置监控系统的详细说明。以下快速代码段可以帮助您开始。
-
要启用 Prometheus 计量注册表,请设置以下属性。
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
要启用 Wavefront 仪表注册表,请设置以下属性。
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
要启用 InfluxDB 计量注册表,请设置以下属性。
management.metrics.export.influx.enabled=true management.metrics.export.influx.uri={influxdb-server-url}
如果启用了 Data Flow Server Monitoring,则将重复使用提供的指标配置。Analytics Sink |
下图说明了如何帮助收集股票交易业务内部的实时管道。Analytics Sink
7.2.1. 负载
传入消息可以包含任何类型的有效负载。
7.2.2. 选项
7.3. Elasticsearch 接收器
Sink,用于将文档索引到 Elasticsearch 中。
此 Elasticsearch 接收器仅支持为 JSON 文档编制索引。
它使用来自输入目标的数据,然后将其索引到 Elasticsearch。
输入数据可以是纯 json 字符串,也可以是表示 JSON 的 a。
它还接受 Elasticsearch 提供的数据。
但是,这种情况很少见,因为中间件不太可能将记录保存为 。
这主要用于消费者的直接调用。java.util.Map
XContentBuilder
XContentBuilder
7.3.2. 运行此 sink 的示例
-
从文件夹 :
elasticsearch-sink
./mvnw clean package
-
CD 应用程序
-
cd 复制到适当的 Binder 生成的应用程序(Kafka 或 RabbitMQ)
-
./mvnw clean package
-
确保您正在运行 Elasticsearch。例如,您可以使用以下命令将其作为 docker 容器运行。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
-
如果中间件(Kafka 或 RabbitMQ)尚未运行,请启动它。
-
java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing
-
将一些 JSON 数据发送到中间件目标。例如:
{"foo":"bar"}
-
验证数据是否已编制索引:
curl localhost:9200/testing/_search
7.5. FTP 接收器
FTP 接收器是将文件从传入邮件推送到 FTP 服务器的简单选项。
它使用 ,因此传入消息可以是对象、(文件内容)
或 (file content 的数组)。ftp-outbound-adapter
java.io.File
String
bytes
要使用此接收器,您需要用户名和密码才能登录。
默认情况下,如果未指定任何名称,则 Spring Integration 将使用。 将确定文件名
根据 中的标头值(如果存在),或者如果 的有效负载已经是 ,则它将
使用该文件的原始名称。o.s.i.file.DefaultFileNameGenerator DefaultFileNameGenerator file_name MessageHeaders Message java.io.File |
7.5.1. 标头
-
file_name
(见上面的注释)
7.5.2. 负载
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.5.3. 输出
N/A (写入 FTP 服务器)。
7.6. JDBC 接收器
JDBC sink 允许您将传入的有效负载持久化到 RDBMS 数据库中。
该属性表示 where (连同冒号) 是可选的成对。
在这种情况下,值是通过生成的表达式(如 )计算的,因此这样我们就可以从对象属性直接映射到表列。
例如,我们有一个 JSON 有效负载,如下所示:jdbc.consumer.columns
COLUMN_NAME[:EXPRESSION_FOR_VALUE]
EXPRESSION_FOR_VALUE
payload.COLUMN_NAME
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
因此,我们可以使用 ,并使用配置将其插入到表中:name
city
street
--jdbc.consumer.columns=name,city:address.city,street:address.street
只要底层 JDBC 驱动程序支持,此接收器就支持批量插入。
批量插入通过 和 属性进行配置:
传入消息将聚合,直到消息出现,然后作为批处理插入。
如果毫秒过去了,没有新消息,则即使聚合批处理小于 ,也会插入聚合批处理,从而限制最大延迟。batch-size
idle-timeout
batch-size
idle-timeout
batch-size
该模块还使用 Spring Boot 的DataSource支持来配置数据库连接,因此诸如etc.之类的属性适用。spring.datasource.url |
7.6.1. 示例
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
--jdbc.consumer.columns=name \
--spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.6.2. 负载
7.8. 日志接收器
sink 使用应用程序记录器输出数据以供检查。log
请理解 sink 使用无类型的处理程序,这会影响实际日志记录的执行方式。
这意味着,如果 content-type 是 textual,则原始有效负载字节将转换为 String,否则将记录原始字节。
请参阅用户指南中的更多信息。log
7.9. MongoDB 接收器
此接收器应用程序将传入数据提取到 MongoDB 中。
此应用程序完全基于 ,因此请参阅 Spring Boot MongoDB 支持以获取更多信息。MongoDataAutoConfiguration
7.9.1. 输入
有效载荷
-
任何 POJO
-
String
-
byte[]
7.11. pgcopy 接收器
使用 PostgreSQL COPY 命令将其传入负载写入 RDBMS 的模块。
7.11.1. 输入
头
有效载荷
-
任何
Column expression 将根据消息进行评估,并且表达式通常只与一种类型(例如 Map 或 bean 等)兼容。
7.11.2. 输出
不适用
7.11.3. 选项
jdbc sink 具有以下选项:
该模块还使用 Spring Boot 的DataSource支持来配置数据库连接,因此诸如etc.之类的属性适用。spring.datasource.url |
7.11.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后,你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
要运行集成测试,请在 localhost 上启动 PostgreSQL 数据库:
docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest
7.11.5. 示例
java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.12. RabbitMQ 接收器
该模块向 RabbitMQ 发送消息。
7.12.1. 选项
rabbit sink 具有以下选项:
(有关 RabbitMQ 连接属性,请参阅 Spring Boot 文档)
7.14. 路由器接收器
此应用程序将消息路由到命名通道。
7.14.1. 选项
router sink 有以下选项:
- router.default-output-binding 文件
-
将不可路由的消息发送到何处。(字符串,默认值:
<none>
) - router.destination 映射
-
目标映射为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - router.expression 的
-
要应用于消息的表达式,以确定要路由到的通道。请注意,文本、json 或 xml 等内容类型的负载连线格式是 byte[] 而不是 String!。有关如何处理字节数组有效负载内容的文档。(表达式,默认值:
<none>
) - router.refresh-delay
-
检查脚本更改的频率(以毫秒为单位)(如果存在);< 0 表示不刷新。(整数,默认值:
60000
) - router.resolution-required
-
是否需要通道解析。(布尔值,默认值:
false
) - router.script 的
-
返回通道或通道映射解析键的 groovy 脚本的位置。(资源,默认值:
<无>
) - router.variables 的
-
变量绑定作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - router.variables-位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<无>
)
此路由器接收器基于 Spring Cloud Stream 的 API,因此可以根据需要创建目标。
在这种情况下,只有当 key 未包含在 中时,才能到达 a 。
如果没有 map 并且没有声明相应的绑定,则 this 会忽略并引发异常。StreamBridge defaultOutputBinding destinationMappings resolutionRequired = true defaultOutputBinding |
您可以使用 该属性限制动态绑定的创建。
默认情况下,所有解析的目标都将动态绑定;如果此属性具有以逗号分隔的目标名称列表,则只会绑定这些名称。
解析到不在此列表中的目标的邮件将被路由到 ,该目标也必须出现在列表中。spring.cloud.stream.dynamicDestinations
defaultOutputBinding
用于将评估结果映射到实际目标名称。destinationMappings
7.14.2. 基于 SPEL 的路由
该表达式根据消息进行计算,并返回通道名称或通道名称映射的键。
有关更多信息,请参见 Spring 集成参考手册配置通用路由器部分中的“路由器和 Spring 表达式语言(SPEL)”小节。
7.14.3. 基于 Groovy 的路由
还可以使用 Groovy 脚本代替 SpEL 表达式。让我们在文件系统的 “file:/my/path/router.groovy” 或 “classpath:/my/path/router.groovy” 处创建一个 Groovy 脚本:
println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
如果要将变量值传递给脚本,可以使用 variables 选项静态绑定值,或者使用 propertiesLocation 选项选择性地将路径传递给包含绑定的属性文件。 文件中的所有属性都将作为变量提供给脚本。您可以同时指定 variables 和 propertiesLocation,在这种情况下,作为变量提供的任何重复值都会覆盖 propertiesLocation 中提供的值。 请注意,payload 和 headers 是隐式绑定的,以允许您访问消息中包含的数据。
有关更多信息,请参阅 Spring 集成参考手册 Groovy 支持。
7.16. Amazon S3 接收器
此接收器应用程序支持将对象传输到 Amazon S3 存储桶。
文件负载(和递归目录)将传输到部署应用程序的目录(S3 存储桶)中。remote
local
此接收器接受的消息必须包含以下 as 之一:payload
-
File
,包括用于递归上传的目录; -
InputStream
; -
byte[]
7.16.1. 选项
s3 接收器具有以下选项:
基于 的目标生成的应用程序可以通过注入到 bean 中的 and/或 进行增强。
有关更多详细信息,请参阅 Spring 集成 AWS 支持。AmazonS3SinkConfiguration
S3MessageHandler.UploadMetadataProvider
S3ProgressListener
S3MessageHandler
7.16.2. Amazon AWS 通用选项
Amazon S3 Sink(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为其基础,其自动配置 类由 Spring Boot 自动使用。 请查阅其文档,了解必需和有用的自动配置属性。
其中一些与 AWS 凭证有关:
-
spring.cloud.aws.credentials.accessKey
-
spring.cloud.aws.credentials.secretKey
-
spring.cloud.aws.credentials.instanceProfile
-
spring.cloud.aws.credentials.profileName
-
spring.cloud.aws.credentials.profilePath
其他用于 AWS 定义:Region
-
cloud.aws.region.auto
-
cloud.aws.region.static
例子
java -jar s3-sink.jar --s3.bucket=/tmp/bar
7.17. SFTP 接收器
SFTP 接收器是将文件从传入邮件推送到 SFTP 服务器的简单选项。
它使用 ,因此传入消息可以是对象、(文件内容)
或 (file content 的数组)。sftp-outbound-adapter
java.io.File
String
bytes
要使用此接收器,您需要用户名和密码才能登录。
默认情况下,如果未指定任何名称,则 Spring Integration 将使用。 将确定文件名
根据 中的标头值(如果存在),或者如果 的有效负载已经是 ,则它将
使用该文件的原始名称。o.s.i.file.DefaultFileNameGenerator DefaultFileNameGenerator file_name MessageHeaders Message java.io.File |
配置选项时,评估的根对象是应用程序上下文,例如。sftp.factory.known-hosts-expression
sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
7.17.1. 输入
头
-
file_name
(见上面的注释)
有效载荷
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.17.2. 输出
N/A(写入 SFTP 服务器)。
7.18. TCP 接收器
此模块使用 Encoder 将消息写入 TCP。
TCP 是一种流协议,需要一些机制来在网络上构建消息。许多编码器是 available,默认值为 'CRLF'。
7.18.2. 可用的编码器
- CRLF (默认)
-
以回车 (0x0d) 后跟换行符 (0x0a) 结尾的文本
- 如果
-
由换行符终止的文本 (0x0a)
- 零
-
以 null 字节 (0x00) 结尾的文本
- STXETX
-
文本前面有 STX (0x02) 并以 ETX (0x03) 结尾
- 生
-
no structure - 客户端通过关闭套接字来指示完整的消息
- L1 系列
-
数据前面有一个单字节(无符号)长度字段(最多支持 255 个字节)
- L2 (二层)
-
数据前面有一个两字节(无符号)长度的字段(最多 2 个16-1 字节)
- L4 系列
-
数据前面有一个四字节(带符号)长度的字段(最多 2 个31-1 字节)
7.19. 吞吐量接收器
Sink,它将对消息进行计数,并按选定的时间间隔记录观察到的吞吐量。
7.19.1. 选项
吞吐量接收器具有以下选项:
- 吞吐量.报告每毫秒
-
报告的频率。(整数,默认值:
1000
)
7.20. Twitter 消息接收器
从身份验证用户向指定用户发送私信。
需要将 JSON POST 正文和标头设置为 。Content-Type
application/json
收到用户的消息后,您可以在 24 小时内发送最多 5 条消息作为响应。 收到的每条消息都会重置 24 小时窗口和分配的 5 条消息。 在 24 小时内发送第 6 条消息或在 24 小时时段外发送消息将计入速率限制。 此行为仅在使用 POST direct_messages/events/new 端点时适用。 |
SPEL 表达式用于计算输入消息中的请求参数。
7.20.1. 选项
使用单引号 () 将表达式属性的文字值括起来。
例如,要设置固定的消息文本,请使用 。
对于固定目标 userId,请使用 。' SpEL text='Fixed Text' userId='666' |
7.21. Twitter 更新接收器
更新身份验证用户的当前文本(例如 Tweeting)。
对于每次更新尝试,都会将更新文本与进行身份验证的用户最近的推文进行比较。 任何会导致重复的尝试都将被阻止,从而导致 403 错误。 用户不能连续两次提交相同的文本。 |
虽然不受 API 的速率限制,但用户一次可以创建的推文数量受到限制。 标准 API 的更新限制为 3 小时内 300 个。 如果用户发布的更新数量达到当前允许的限制,此方法将返回 HTTP 403 错误。
您可以在此处找到 Update API 的详细信息:developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.21.1. 选项
7.22. 波前沉
Wavefront 接收器使用 Messages<?>,将其转换为 Wavefront 数据格式的指标,并将指标直接发送到 Wavefront 或 Wavefront 代理。
支持常见的 ETL 使用案例,其中现有(历史)指标数据必须清理、转换并存储在 Wavefront 中以供进一步分析。
7.23. Websocket 接收器
一个简单的 Websocket Sink 实现。
7.23.2. 示例
要验证 websocket-sink 是否接收来自其他 spring-cloud-stream 应用程序的消息,你可以使用 遵循简单的端到端设置。
第 1 步:启动 Rabbitmq
第 2 步:部署time-source
第 3 步:部署websocket-sink
最后,在 mode 中启动 websocket-sink,以便您可以在日志中看到 the 产生的消息:trace
time-source
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
您应该开始在启动 WebsocketSink 的控制台中看到日志消息,如下所示:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.23.3. 执行器
您可以使用它来访问发送和接收的最后一条消息。您必须
通过提供 来启用它。默认情况下,它通过 .下面是一个示例输出:Endpoint
n
--endpoints.websocketconsumertrace.enabled=true
host:port/websocketconsumertrace
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
还有一个简单的 HTML 页面,您可以在其中的文本区域看到转发的消息。您可以访问
它直接通过您的浏览器。host:port
7.24. XMPP 接收器
“xmpp” 接收器允许向 XMPP 服务器发送消息。
7.24.1. 输入
-
byte[]
7.24.2. 输出
有效载荷
不适用
7.24.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
7.25. ZeroMQ 接收器
“zeromq” sink 支持将消息发送到 ZeroMQ 套接字。
7.25.1. 输入
-
byte[]
7.25.2. 输出
有效载荷
不适用
7.25.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
7.25.5. 示例
java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=