应用
5. 来源
5.1. Debezium 源码
基于 Debezium 引擎的变更数据捕获 (CDC) 源。
它允许捕获数据库更改事件并通过不同的消息绑定器(例如)和所有 Spring Cloud Stream 支持者代理流式传输这些事件。Debezium Source
Apache Kafka
RabbitMQ
此源可以与任何 Spring Cloud Stream 消息 Binder 一起使用。 它不受 Kafka Connect 框架的限制或依赖。尽管这种方法很灵活,但它存在一些限制。 |
支持所有 Debezium 配置属性。
只需在任何 Debezium 属性前面加上前缀即可。
例如,要设置 Debezium 的属性,请改用 source 属性。debezium.properties.
connector.class
debezium.properties.connector.class
5.1.1. 数据库支持
目前支持多个数据存储的 CDC:MySQL、PostgreSQL、MongoDB、Oracle、SQL Server、Db2、Vitess 和 Spanner 数据库。Debezium Source
5.1.2. 选项
按前缀分组的属性:
德贝兹
- debezium-native-configuration
-
<缺少文档>(属性,默认值:
<无>
) - 标头格式
-
{@link ChangeEvent} 标头格式。默认为 'JSON'。(DebeziumFormat,默认值:
<none>
,可能的值:JSON,AVRO,PROTOBUF) - 偏移提交策略
-
定义何时应将偏移量提交到 offset 存储的策略。(DebeziumOffsetCommitPolicy,默认值:
<none>
,可能的值:ALWAYS,PERIODIC,DEFAULT
)
- payload 格式
-
{@link 更改事件}Key 和 Payload 格式。默认为 'JSON'。(DebeziumFormat,默认值:
<none>
,可能的值:JSON,AVRO,PROTOBUF) - 性能
-
用于 debezium 配置属性的 Spring pass-trough 包装器。所有带有 'debezium.properties.*' 前缀的属性都是原生 Debezium 属性。(Map<String, String>,默认值:
<none>
)
debezium.supplier
- 复制标头
-
将 Change Event headers (更改事件标头) 复制到 Message headers (消息标头) 中。(布尔值,默认值:
true
)
事件扁平化配置
Debezium 提供了一种全面的消息格式,可以准确地详细说明系统中发生的更改的信息。
但是,有时这种格式可能不适合下游使用者,这可能需要格式化消息,以便以简化的结构显示字段名称和值。flattened
要简化 Debezium 连接器生成的事件记录的格式,可以使用 Debezium 事件展平消息转换。 使用讨人喜欢的配置,您可以配置简单的消息格式,如下所示:
--debezium.properties.transforms=unwrap
--debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
--debezium.properties.transforms.unwrap.drop.tombstones=false
--debezium.properties.transforms.unwrap.delete.handling.mode=rewrite
--debezium.properties.transforms.unwrap.add.fields=name,db
Debezium 胶印存储
当 Debezium 源运行时,它会从源中读取信息,并定期记录定义它已处理的信息量。
如果源重新启动,它将使用最后记录的偏移量来了解它应该在源信息中的哪个位置继续读取。
开箱即用,提供了以下偏移存储配置选项:offsets
-
内存中
Doesn't persist the offset data but keeps it in memory. Therefore all offsets are lost on debezium source restart.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
-
本地文件系统
Store the offsets in a file on the local file system (the file can be named anything and stored anywhere). Additionally, although the connector records the offsets with every source record it produces, the engine flushes the offsets to the backing store periodically (in the example below, once each minute).
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore --debezium.properties.offset.storage.file.filename=/tmp/offsets.dat (1) --debezium.properties.offset.flush.interval.ms=60000 (2)
1 要存储偏移量的文件的路径。当设置为 . offset.storage`
FileOffsetBackingStore
2 尝试提交偏移量的时间间隔。默认值为 1 分钟。 -
Kafka 主题
Uses a Kafka topic to store offset data.
--debezium.properties.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore --debezium.properties.offset.storage.topic=my-kafka-offset-topic (1) --debezium.properties.offset.storage.partitions=2 (2) --debezium.properties.offset.storage.replication.factor=1 (3) --debezium.properties.offset.flush.interval.ms=60000 (4)
1 要存储偏移量的 Kafka 主题的名称。当设置为 . offset.storage
KafkaOffsetBackingStore
2 创建 offset 存储主题时使用的分区数。 3 创建偏移存储主题时使用的复制因子。 4 尝试提交偏移量的时间间隔。默认值为 1 分钟。
可以在 中实现接口以提供绑定到自定义后端键值存储的偏移量存储。org.apache.kafka.connect.storage.OffsetBackingStore
连接器属性
下表列出了每个连接器的所有可用 Debezium 属性。
可以通过在它们前面加上前缀来使用这些属性。debezium.properties.
5.1.3. 示例和测试
debezium 集成测试使用在本地计算机上运行的数据库夹具。在 Testcontainers 的帮助下,利用预构建的 debezium docker 数据库镜像。
要从 IDE 运行和调试测试,您需要从命令行部署所需的数据库映像。 以下说明介绍了如何从 Docker 映像运行预配置的测试数据库。
MySQL (MySQL的
在 docker 中启动 :debezium/example-mysql
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.3.3.Final
(可选)使用 client 连接到数据库并创建具有所需凭据的用户:mysql debezium |
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
使用以下属性将 Debezium Source 连接到 MySQL DB:
debezium.properties.connector.class=io.debezium.connector.mysql.MySqlConnector (1)
debezium.properties.name=my-connector (2)
debezium.properties.topic.prefix=my-topic (2)
debezium.properties.database.server.id=85744 (2)
debezium.properties.database.user=debezium (3)
debezium.properties.database.password=dbz (3)
debezium.properties.database.hostname=localhost (3)
debezium.properties.database.port=3306 (3)
debezium.properties.schema=true (4)
debezium.properties.key.converter.schemas.enable=true (4)
debezium.properties.value.converter.schemas.enable=true (4)
debezium.properties.transforms=unwrap (5)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (5)
debezium.properties.transforms.unwrap.add.fields=name,db (5)
debezium.properties.transforms.unwrap.delete.handling.mode=none (5)
debezium.properties.transforms.unwrap.drop.tombstones=true (5)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (6)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (6)
1 | 将 Debezium 源配置为使用 MySqlConnector。 |
2 | 用于标识和调度传入事件的元数据。 |
3 | 连接到以 用户身份运行的 MySQL 服务器。localhost:3306 debezium |
4 | 在消息中包含 Change Event Value 架构。ChangeEvent |
5 | 启用 Change Event Flattening。 |
6 | Source state 到 preserver 的 source 状态。 |
您也可以使用此 mysql 配置运行。DebeziumDatabasesIntegrationTest#mysql()
禁用 mysql GenericContainer 测试初始化代码。 |
PostgreSQL 数据库
从 Docker 镜像启动预配置的 postgres 服务器:debezium/example-postgres:1.0
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final
您可以像这样连接到此服务器:
psql -U postgres -h localhost -p 5432
使用以下属性将 Debezium 源连接到 PostgreSQL:
debezium.properties.connector.class=io.debezium.connector.postgresql.PostgresConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=postgres (4)
debezium.properties.database.password=postgres (4)
debezium.properties.database..dbname=postgres (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=5432 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 | 配置为使用 PostgresConnector。Debezium Source |
2 | 将 Debezium 引擎配置为使用 store。memory |
3 | 用于标识和调度传入事件的元数据。 |
4 | 连接到以 用户身份运行的 PostgreSQL 服务器。localhost:5432 postgres |
5 | 在消息中包含 Change Event Value 架构。 |
6 | 启用 Chage 事件展平。 |
您还可以使用此 postgres 配置运行 。DebeziumDatabasesIntegrationTest#postgres()
禁用 postgres GenericContainer 测试初始化代码。 |
MongoDB 数据库
从容器镜像启动预配置的 mongodb:debezium/example-mongodb:2.3.3.Final
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:2.3.3.Final
初始化清单集合
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
在终端输出中,搜索类似 :mongodb
host: "3f95a8a6516e:27017"
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
将条目添加到您的127.0.0.1 3f95a8a6516e
/etc/hosts
使用以下属性将 Debezium 源连接到 MongoDB:
debezium.properties.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector (1)
debezium.properties.topic.prefix=my-topic
debezium.properties.name=my-connector
debezium.properties.database.server.id=85744
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.mongodb.hosts=rs0/localhost:27017 (3)
debezium.properties.topic.prefix=dbserver1 (3)
debezium.properties.mongodb.user=debezium (3)
debezium.properties.mongodb.password=dbz (3)
debezium.properties.database.whitelist=inventory (3)
debezium.properties.tasks.max=1 (4)
debezium.properties.schema=true (5)
debezium.properties.key.converter.schemas.enable=true (5)
debezium.properties.value.converter.schemas.enable=true (5)
debezium.properties.transforms=unwrap (6)
debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState (6)
debezium.properties.transforms.unwrap.add.fields=name,db (6)
debezium.properties.transforms.unwrap.delete.handling.mode=none (6)
debezium.properties.transforms.unwrap.drop.tombstones=true (6)
1 | 配置为使用 MongoDB Connector。Debezium Source |
2 | 将 Debezium 引擎配置为使用 .memory |
3 | 连接到以 user.localhost:27017 debezium |
4 | debezium.io/docs/connectors/mongodb/#tasks |
5 | 在事件中包含 Change Event Value 架构。SourceRecord |
6 | 启用 Chnage 事件展平。 |
您还可以使用此 mongodb 配置运行 。DebeziumDatabasesIntegrationTest#mongodb()
SQL 服务器
从 Docker 镜像启动 a:sqlserver
debezium/example-postgres:1.0
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
使用 debezium SqlServer 教程中的示例数据填充:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
使用以下属性将 Debezium Source 连接到 SQLServer:
debezium.properties.connector.class=io.debezium.connector.sqlserver.SqlServerConnector (1)
debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory (2)
debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore (2)
debezium.properties.topic.prefix=my-topic (3)
debezium.properties.name=my-connector (3)
debezium.properties.database.server.id=85744 (3)
debezium.properties.database.user=sa (4)
debezium.properties.database.password=Password! (4)
debezium.properties.database..dbname=testDB (4)
debezium.properties.database.hostname=localhost (4)
debezium.properties.database.port=1433 (4)
1 | 配置为使用 SqlServerConnector。Debezium Source |
2 | 将 Debezium 引擎配置为使用状态存储。memory |
3 | 用于标识和调度传入事件的元数据。 |
4 | 连接到以用户身份运行的 SQL Server。localhost:1433 sa |
您还可以使用此 SqlServer 配置运行 。DebeziumDatabasesIntegrationTest#sqlServer()
神谕
从 localhost 启动 Oracle reachable,并使用 Debezium Vagrant 设置中描述的配置、用户和授权进行设置
使用 Debezium Oracle 教程中的示例数据填充:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. 文件源
此应用程序轮询目录并将新文件或其内容发送到 output 通道。 默认情况下,file 源以字节数组的形式提供 File 的内容。 但是,这可以使用 --file.supplier.mode 选项进行自定义:
-
ref 提供 java.io.File 引用
-
lines 将逐行拆分文件并为每行发出一条新消息
-
contents 默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 true,则底层 FileSplitter 将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项 withMarkers 默认为 false。--file.supplier.mode=lines
--file.supplier.withMarkers=true
FileSplitter.FileMarker
5.2.1. 选项
文件源具有以下选项:
按前缀分组的属性:
file.consumer 文件
- 标记 JSON
-
当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:
<none>
,可能的值:ref,lines,contents
)
- with-markers
-
设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:
<none>
)
文件.supplier
- empty-时延迟
-
未检测到新文件时的延迟持续时间。(持续时间,默认:
1 秒
) - 目录
-
用于轮询新文件的目录。(文件,默认值:
<无>
) - 文件名模式
-
用于匹配文件的简单 ant 模式。(字符串,默认值:
<none>
) - 文件名-regex
-
用于匹配文件的正则表达式模式。(模式,默认值:
<无>
) - 防止重复
-
设置为 true 以包含防止重复的 AcceptOnceFileListFilter。(布尔值,默认值:
true
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认值:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认值:
25
) - read-capacity (读取容量)
-
读取表上的 capacity。(长,默认值:
1
) - 桌子
-
元数据的表名称。(字符串,默认值:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认值:
<none>
) - 写入容量
-
表上的写入容量。(长,默认值:
1
)
元数据.store.jdbc
- 地区
-
此存储中保留的消息的唯一分组标识符。(字符串,默认值:
DEFAULT
) - 表前缀
-
自定义表名称的前缀。(字符串,默认值:
<none>
)
元数据.store.mongo-db
- 收集
-
元数据的 MongoDB 集合名称。(字符串,默认值:
metadataStore
)
metadata.store.redis
- 钥匙
-
元数据的 Redis 键。(字符串,默认值:
<none>
)
metadata.store
- 类型
-
指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:
<none>
,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)
元数据.store.zookeeper
- 连接字符串
-
HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:
127.0.0.1:2181
) - 编码
-
在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:
UTF-8
) - 重试间隔
-
Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:
1000
) - 根
-
根节点 - 存储条目是此节点的子项。(字符串,默认值:
/SpringIntegration-MetadataStore
)
5.3. FTP 源
此源应用程序支持使用 FTP 协议传输文件。
文件将从目录传输到部署应用程序的目录。
默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是
使用选项进行自定义:remote
local
--mode
-
裁判提供参考
java.io.File
-
线将逐行拆分文件,并为每行发出一条新消息
-
内容默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头和文件结尾标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines
--withMarkers=true
true
FileSplitter
FileSplitter.FileMarker
withMarkers
false
另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。
5.3.1. 输入
N/A (从 FTP 服务器获取文件)。
5.3.2. 输出
mode = 内容
头:
-
Content-Type: application/octet-stream
-
file_originalFile: <java.io.File>
-
file_name: <file name>
有效载荷:
A 填充了文件内容。byte[]
模式 = 行
头:
-
Content-Type: text/plain
-
file_orginalFile: <java.io.File>
-
file_name: <file name>
-
correlationId: <UUID>
(每行相同) -
sequenceNumber: <n>
-
sequenceSize: 0
(在读取文件之前不知道行数)
有效载荷:
A 表示每行。String
第一行前面有一条带有标记有效负载的消息(可选)。
最后一行(可选)后跟带有标记有效负载的消息。START
END
标记存在和格式由 和 属性确定。with-markers
markers-json
模式 = ref
头:
没有。
有效载荷:
一个对象。java.io.File
5.3.3. 选项
ftp 源具有以下选项:
按前缀分组的属性:
file.consumer 文件
- 标记 JSON
-
当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:
<none>
,可能的值:ref,lines,contents
)
- with-markers
-
设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:
<none>
)
ftp.factory
- 缓存会话
-
缓存会话。(布尔值,默认值:
<none>
) - 客户端模式
-
用于 FTP 会话的客户端模式。(ClientMode,默认值:
<none>
,可能的值:ACTIVE,PASSIVE
)
- 主机
-
服务器的主机名。(字符串,默认值:
localhost
) - 密码
-
用于连接到服务器的密码。(字符串,默认值:
<none>
) - 港口
-
服务器的端口。(整数,默认值:
21
) - 用户名
-
用于连接到服务器的用户名。(字符串,默认值:
<none>
)
ftp.supplier
- 自动创建本地目录
-
如果本地目录不存在,则设置为 true 以创建本地目录。(布尔值,默认值:
true
) - empty-时延迟
-
未检测到新文件时的延迟持续时间。(持续时间,默认:
1 秒
) - 删除远程文件
-
设置为 true 可在成功传输后删除远程文件。(布尔值,默认值:
false
) - 文件名模式
-
用于匹配要传输的文件的名称的筛选模式。(字符串,默认值:
<none>
) - 文件名-regex
-
用于匹配要传输的文件的名称的筛选器正则表达式模式。(模式,默认值:
<无>
) - 本地目录
-
用于文件传输的本地目录。(文件,默认值:
<无>
) - preserve-timestamp
-
设置为 true 可保留原始时间戳。(布尔值,默认值:
true
) - 远程目录
-
远程 FTP 目录。(字符串,默认值:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认值:
/
) - tmp-file-suffix 文件后缀
-
传输过程中要使用的后缀。(字符串,默认值:
.tmp
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认值:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认值:
25
) - read-capacity (读取容量)
-
读取表上的 capacity。(长,默认值:
1
) - 桌子
-
元数据的表名称。(字符串,默认值:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认值:
<none>
) - 写入容量
-
表上的写入容量。(长,默认值:
1
)
元数据.store.jdbc
- 地区
-
此存储中保留的消息的唯一分组标识符。(字符串,默认值:
DEFAULT
) - 表前缀
-
自定义表名称的前缀。(字符串,默认值:
<none>
)
元数据.store.mongo-db
- 收集
-
元数据的 MongoDB 集合名称。(字符串,默认值:
metadataStore
)
metadata.store.redis
- 钥匙
-
元数据的 Redis 键。(字符串,默认值:
<none>
)
metadata.store
- 类型
-
指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:
<none>
,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)
元数据.store.zookeeper
- 连接字符串
-
HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:
127.0.0.1:2181
) - 编码
-
在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:
UTF-8
) - 重试间隔
-
Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:
1000
) - 根
-
根节点 - 存储条目是此节点的子项。(字符串,默认值:
/SpringIntegration-MetadataStore
)
5.3.4. 示例
java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
--ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo
5.4. Http 源
一个源应用程序,用于侦听 HTTP 请求并将正文作为消息负载发出。
如果 Content-Type 与 或 匹配,则有效负载将为 String,
否则,有效负载将是一个字节数组。text/*
application/json
5.4.1. 有效载荷:
如果内容类型匹配或text/*
application/json
-
String
如果内容类型不匹配或text/*
application/json
-
byte array
5.4.2. 选项
http 源支持以下配置属性:
按前缀分组的属性:
http.cors 网站
- 允许凭据
-
浏览器是否应包含与正在批注的请求的域关联的任何 Cookie。(布尔值,默认值:
<none>
) - 允许的标头
-
在实际请求期间可以使用的请求标头列表。(String[],默认值:
<none>
) - 允许的来源
-
允许的源列表,例如 https://domain1.com。(String[],默认值:
<none>
)
http
- mapped-request-headers 请求标头
-
将要映射的标头。(String[],默认值:
<none>
) - 路径模式
-
HTTP 端点路径映射。(字符串,默认值:
/
)
服务器
- 港口
-
服务器 HTTP 端口。(整数,默认值:
8080
)
5.5. JDBC 源码
此源从 RDBMS 轮询数据。
此源代码完全基于 ,因此请参阅 Spring Boot JDBC 支持以获取更多信息。DataSourceAutoConfiguration
5.5.1. 有效载荷
-
Map<String, Object>
when(默认)和jdbc.split == true
List<Map<String, Object>>
5.5.2. 选项
jdbc 源具有以下选项:
按前缀分组的属性:
jdbc.supplier
- 最大行数
-
要为查询处理的最大行数。(整数,默认值:
0
) - 查询
-
用于选择数据的查询。(字符串,默认值:
<none>
) - 分裂
-
是否将 SQL 结果拆分为单个消息。(布尔值,默认值:
true
) - 更新
-
一个 SQL 更新语句,用于将轮询的消息标记为 “seen”。(字符串,默认值:
<none>
)
spring.datasource
- 驱动程序类名称
-
JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认值:
<none>
) - 密码
-
数据库的登录密码。(字符串,默认值:
<none>
) - 网址
-
数据库的 JDBC URL。(字符串,默认值:
<none>
) - 用户名
-
数据库的登录用户名。(字符串,默认值:
<none>
)
spring.integration.poller
- cron (定时)
-
Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:
<none>
) - 固定延迟
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:
<无>
) - 固定利率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:
<无>
) - 初始延迟
-
轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:
<无>
) - 最大每次轮询消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认值:
<none>
) - 接收超时
-
轮询消息等待多长时间。(持续时间,默认:
1 秒
)
另请参阅 Spring Boot 文档,了解其他属性和轮询选项。DataSource
TriggerProperties
MaxMessagesProperties
5.6. JMS 源
JMS 源允许从 JMS 接收消息。
5.6.1. 选项
JMS 源具有以下选项:
按前缀分组的属性:
jms.supplier
- 客户端 ID
-
长期订阅的客户端 ID。(字符串,默认值:
<none>
) - 目的地
-
接收消息的目标 (队列或主题)。(字符串,默认值:
<none>
) - 消息选择器
-
消息的选择器。(字符串,默认值:
<none>
) - 会话事务处理
-
如果为true,则启用事务并选择DefaultMessageListenerContainer,如果为false,则选择SimpleMessageListenerContainer。(布尔值,默认值:
true
) - subscription-durable
-
对于长期订阅,为 True。(布尔值,默认值:
<none>
) - 订阅名称
-
长期订阅或共享订阅的名称。(字符串,默认值:
<none>
) - 订阅共享
-
对于共享订阅,则为 True。(布尔值,默认值:
<none>
)
spring.jms 的
- jndi-名称
-
连接工厂 JNDI 名称。设置后,优先于其他连接出厂自动配置。(字符串,默认值:
<none>
) - 发布子域
-
默认目标类型是否为 topic。(布尔值,默认值:
false
)
spring.jms.listener 中
- 确认模式
-
容器的确认模式。默认情况下,侦听器使用自动确认进行交易。(AcknowledgeMode,默认值:
<none>
,可能的值:AUTO,CLIENT
DUPS_OK
) - 自动启动
-
启动时自动启动容器。(布尔值,默认值:
true
) - 并发
-
最小并发使用者数。如果未指定 max-concurrency,则 minimum 也将用作最大值。(整数,默认值:
<none>
) - 最大并发
-
最大并发使用者数。(整数,默认值:
<none>
) - 接收超时
-
用于接收调用的超时。使用 -1 表示无等待接收,使用 0 表示完全没有超时。后者仅在不在事务管理器内运行时才可行,并且通常不鼓励使用,因为它会阻止干净关闭。(持续时间,默认:
1 秒
)
5.7. Apache Kafka 源码
此模块使用来自 Apache Kafka 的消息。
5.7.1. 选项
kafka 源具有以下选项:
(请参阅 Spring for Apache Kafka 配置属性的 Spring Boot 文档)
按前缀分组的属性:
kafka.supplier
- ack 丢弃
-
是否确认 'RecordFilterStrategy' 之后的丢弃记录。(布尔值,默认值:
false
) - 记录过滤器
-
'RecordFilterStrategy' 的 SPEL 表达式,其中 'ConsumerRecord' 作为根对象。(表达式,默认值:
<none>
) - 主题模式
-
Apache Kafka 主题模式进行订阅。(模式,默认值:
<无>
) - 主题
-
要订阅的 Apache Kafka 主题。(String[],默认值:
<none>
)
spring.kafka
- 引导服务器
-
用于建立与 Kafka 集群的初始连接的 host:port 对的逗号分隔列表。除非被覆盖,否则适用于所有零部件。(List<String>,默认值:
<none>
) - 客户端 ID
-
发出请求时传递给服务器的 ID。用于服务器端日志记录。(字符串,默认值:
<none>
) - 性能
-
其他属性,生产者和使用者通用,用于配置客户端。(Map<String, String>,默认值:
<none>
)
spring.kafka.consumer
- 自动提交间隔
-
如果 'enable.auto.commit' 设置为 true,则使用者偏移量自动提交到 Kafka 的频率。(持续时间,默认值:
<无>
) - 自动偏移重置
-
当 Kafka 中没有初始偏移量或当前偏移量在服务器上不再存在时该怎么办。(字符串,默认值:
<none>
) - 引导服务器
-
用于建立与 Kafka 集群的初始连接的 host:port 对的逗号分隔列表。覆盖使用者的 global 属性。(List<String>,默认值:
<none>
) - 客户端 ID
-
发出请求时传递给服务器的 ID。用于服务器端日志记录。(字符串,默认值:
<none>
) - 启用自动提交
-
是否在后台定期提交 Consumer 的偏移量。(布尔值,默认值:
<none>
) - fetch-max-wait
-
如果没有足够的数据来立即满足 “fetch-min-size” 给出的要求,则服务器在响应 fetch 请求之前阻止的最长时间。(持续时间,默认值:
<无>
) - 最小 fetch-size
-
服务器应为 fetch 请求返回的最小数据量。(DataSize,默认值:
<none>
) - 组 ID
-
标识此使用者所属的使用者组的唯一字符串。(字符串,默认值:
<none>
) - 心跳间隔
-
与使用者协调器的检测信号之间的预期时间。(持续时间,默认值:
<无>
) - 隔离级别
-
用于读取以事务方式写入的消息的隔离级别。(IsolationLevel,默认值:
read-uncommitted
,可能的值:READ_UNCOMMITTED,READ_COMMITTED
) - 键解串器
-
键的 Deserializer 类。(类<?>,默认值:
<无>
) - 最大轮询记录
-
对 poll() 的单次调用中返回的最大记录数。(整数,默认值:
<none>
) - 性能
-
用于配置客户端的其他特定于使用者的属性。(Map<String, String>,默认值:
<none>
) - 值反序列化器
-
值的 Deserializer 类。(类<?>,默认值:
<无>
)
spring.kafka.listener 中
- ack 计数
-
ackMode 为 “COUNT” 或 “COUNT_TIME” 时偏移提交之间的记录数。(整数,默认值:
<none>
) - ack 模式
-
侦听器 AckMode 的 AckMode 中。参见 spring-kafka 文档。(AckMode,默认值:
<none>
,可能的值:RECORD,BATCH,TIME,COUNT,COUNT_TIME,MANUAL,MANUAL_IMMEDIATE)
- ack 时间
-
当 ackMode 为 “TIME” 或 “COUNT_TIME” 时,偏移量提交之间的时间。(持续时间,默认值:
<无>
) - 异步 ack
-
支持异步记录确认。仅当spring.kafka.listener.ack-mode为manual或manual-immediate时适用。(布尔值,默认值:
<none>
) - 自动启动
-
是否自动启动容器。(布尔值,默认值:
true
) - 客户端 ID
-
侦听器的使用者 client.id 属性的前缀。(字符串,默认值:
<none>
) - 并发
-
要在侦听器容器中运行的线程数。(整数,默认值:
<none>
) - idle-between-polls 轮询
-
Consumer.poll(Duration) 调用之间的休眠间隔。(持续时间,默认值:
0
) - 空闲事件间隔
-
发布空闲使用者事件(未收到数据)之间的时间。(持续时间,默认值:
<无>
) - 空闲分区事件间隔
-
发布空闲分区使用者事件之间的时间(未收到分区的数据)。(持续时间,默认值:
<无>
) - 立即停止
-
容器是在处理当前记录之后停止,还是在处理上一次轮询中的所有记录之后停止。(布尔值,默认值:
false
) - 日志容器配置
-
是否在初始化期间记录容器配置(INFO 级别)。(布尔值,默认值:
<none>
) - 缺失主题致命
-
如果代理上不存在至少一个已配置的主题,则容器是否应无法启动。(布尔值,默认值:
false
) - 监控间隔
-
无响应使用者的检查间隔时间。如果未指定 duration 后缀,则将使用秒。(持续时间,默认值:
<无>
) - 无轮询阈值
-
应用于 “pollTimeout” 的乘数,用于确定使用者是否无响应。(浮点型,默认值:
<none>
) - 轮询超时
-
轮询使用者时使用的超时。(持续时间,默认值:
<无>
) - 类型
-
侦听器类型。(类型,默认:
single
)
5.8. 负载生成器源
发送生成的数据并将其调度到流的源。
5.8.1. 选项
load-generator 源具有以下选项:
- load-generator.generate-timestamp 生成时间戳
-
是否生成时间戳。(布尔值,默认值:
false
) - load-generator.消息计数
-
消息计数。(整数,默认值:
1000
) - load-generator.消息大小
-
消息大小。(整数,默认值:
1000
) - load-generator.producers
-
生产者数量。(整数,默认值:
1
)
5.9. 邮件源
一个源应用程序,用于侦听 Emails 并将消息正文作为消息负载发出。
5.9.1. 选项
邮件源具有以下选项:
- mail.supplier.charset
-
用于 byte[] mail-to-string 转换的 charset。(字符串,默认值:
UTF-8
) - 邮件.supplier.delete
-
设置为 true 可在下载后删除电子邮件。(布尔值,默认值:
false
) - mail.supplier.expression
-
配置 SpEL 表达式以选择消息。(字符串,默认值:
true
) - 邮件.supplier.idle-IMAP
-
设置为 true 以使用 IdleImap 配置。(布尔值,默认值:
false
) - mail.supplier.java 邮件属性
-
JavaMail 属性作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - mail.supplier.mark-as-read
-
设置为 true 可将电子邮件标记为已读。(布尔值,默认值:
false
) - 邮件.supplier.url
-
用于连接到邮件服务器的邮件连接 URL,例如 'imaps://username:[email protected]:993/Inbox'。(URLName,默认值:
<none>
) - mail.supplier.user-flag
-
当服务器不支持 \Recently 时标记邮件的标志。(字符串,默认值:
<none>
)
5.10. MongoDB 源
此源轮询来自 MongoDB 的数据。
此源完全基于 ,因此请参阅 Spring Boot MongoDB 支持以获取更多信息。MongoDataAutoConfiguration
5.10.1. 选项
mongodb 源具有以下选项:
按前缀分组的属性:
mongodb.supplier
- 收集
-
要查询的 MongoDB 集合。(字符串,默认值:
<none>
) - 查询
-
MongoDB 查询。(字符串,默认值:
{ }
) - 查询表达式
-
MongoDB 中的 SpEL 表达式查询 DSL 样式。(表达式,默认值:
<none>
) - 分裂
-
是否将查询结果拆分为单个消息。(布尔值,默认值:
true
) - update-expression (更新表达式)
-
MongoDB 中的 SpEL 表达式更新 DSL 样式。(表达式,默认值:
<none>
)
spring.data.mongodb
- 附加主机
-
其他服务器主机。不能使用 URI 设置,或者如果未指定 'host' 。其他主机将使用默认的 mongo 端口 27017。如果您想使用不同的端口,可以使用 “host:port” 语法。(List<String>,默认值:
<none>
) - 身份验证数据库
-
身份验证数据库名称。(字符串,默认值:
<none>
) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认值:
<none>
) - 数据库
-
数据库名称。覆盖 URI 中的数据库。(字符串,默认值:
<none>
) - 字段命名策略
-
要使用的 FieldNamingStrategy 的完全限定名称。(类<?>,默认值:
<无>
) - 主机
-
Mongo 服务器主机。不能使用 URI 设置。(字符串,默认值:
<none>
) - 密码
-
mongo 服务器的登录密码。不能使用 URI 设置。(字符[],默认值:
<无>
) - 港口
-
Mongo 服务器端口。不能使用 URI 设置。(整数,默认值:
<none>
) - 副本集名称
-
集群所需的副本集名称。不能使用 URI 设置。(字符串,默认值:
<none>
) - URI
-
Mongo 数据库 URI。覆盖主机、端口、用户名和密码。(字符串,默认值:
mongodb://localhost/test
) - 用户名
-
mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认值:
<none>
) - uuid 表示
-
将 UUID 转换为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认值:
java-legacy
,可能的值:UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)
spring.data.mongodb.gridfs
- 桶
-
GridFS 存储桶名称。(字符串,默认值:
<none>
) - 数据库
-
GridFS 数据库名称。(字符串,默认值:
<none>
)
spring.data.mongodb.ssl
- 捆
-
SSL 捆绑包名称。(字符串,默认值:
<none>
) - 启用
-
是否启用 SSL 支持。如果提供了 “bundle”,则自动启用,除非另有说明。(布尔值,默认值:
<none>
)
另请参阅 Spring Boot 文档以获取其他属性。
请参阅 和 了解轮询选项。MongoProperties
TriggerProperties
5.11. MQTT 源码
允许从 MQTT 接收消息的 Source。
5.11.1. 有效负载:
-
String
如果 Binary 设置为 (default)false
-
byte[]
如果二进制设置为true
5.11.2. 选项
mqtt 源具有以下选项:
按前缀分组的属性:
MQTT 协议
- clean-session
-
客户端和服务器是否应在重启和重新连接时记住状态。(布尔值,默认值:
true
) - 连接超时
-
连接超时(以秒为单位)。(整数,默认值:
30
) - 保持活动间隔
-
ping 间隔(以秒为单位)。(整数,默认值:
60
) - 密码
-
连接到代理时使用的密码。(字符串,默认值:
guest
) - 坚持
-
'memory' 或 'file'。(字符串,默认值:
memory
) - 持久性目录
-
Persistence 目录。(字符串,默认值:
/tmp/paho
) - SSL 属性
-
MQTT 客户端 SSL 属性。(Map<String, String>,默认值:
<none>
) - 网址
-
MQTT 代理的位置(逗号分隔的列表)。(String[],默认值:
[tcp://localhost:1883]
) - 用户名
-
连接到 broker 时要使用的用户名。(字符串,默认值:
guest
)
mqtt.supplier (英语)
- 二元的
-
true 将有效负载保留为 bytes。(布尔值,默认值:
false
) - 字符集
-
用于将字节转换为 String 的字符集(当 binary 为 false 时)。(字符串,默认值:
UTF-8
) - 客户端 ID
-
标识客户端。(字符串,默认:
stream.client.id.source
) - QoS
-
QoS;所有主题的单个值或以逗号分隔的列表以匹配主题。(整数 [],默认值:
[0]
) - 主题
-
源将订阅的主题(逗号分隔)。(String[],默认值:
[stream.mqtt]
)
5.12. RabbitMQ 源码
“rabbit” 源允许从 RabbitMQ 接收消息。
在部署流之前,队列必须存在;它们不是自动创建的。 您可以使用 RabbitMQ Web UI 轻松创建队列。
5.12.1. 输入
不适用
5.12.2. 输出
有效载荷
-
byte[]
5.12.3. 选项
rabbit 源具有以下选项:
按前缀分组的属性:
rabbit.supplier 的
- enable-retry (启用重试)
-
如果为 true,则启用重试。(布尔值,默认值:
false
) - 初始重试间隔
-
启用重试时的初始重试间隔。(整数,默认值:
1000
) - mapped-request-headers 请求标头
-
将要映射的标头。(String[],默认值:
[STANDARD_REQUEST_HEADERS]
) - 最大尝试次数
-
启用重试时的最大传递尝试次数。(整数,默认值:
3
) - 最大重试间隔
-
启用重试时的最大重试间隔。(整数,默认值:
30000
) - own-connection
-
如果为 true,则根据引导属性使用单独的连接。(布尔值,默认值:
false
) - 队列
-
源将侦听消息的队列。(String[],默认值:
<none>
) - 重新排队
-
是否应将被拒绝的邮件重新排队。(布尔值,默认值:
true
) - 重试乘数
-
启用 retry 时重试回退乘数。(双精度,默认值:
2
) - 交易
-
通道是否事务处理。(布尔值,默认值:
false
)
spring.rabbitmq 的
- address-shuffle-mode 地址随机模式
-
用于对配置的地址进行随机排序的模式。(AddressShuffleMode,默认值:
none
,可能的值:NONE,RANDOM,INORDER) - 地址
-
客户端应连接到的地址的逗号分隔列表。设置后,将忽略 host 和 port。(字符串,默认值:
<none>
) - 通道 rpc-timeout
-
通道中 RPC 调用的继续超时。将其设为零可永久等待。(持续时间,默认:
10 分钟
) - 连接超时
-
连接超时。将其设为零可永久等待。(持续时间,默认值:
<无>
) - 主机
-
RabbitMQ 主机。如果设置了地址,则忽略。(字符串,默认值:
localhost
) - 密码
-
登录以对代理进行身份验证。(字符串,默认值:
guest
) - 港口
-
RabbitMQ 端口。如果设置了地址,则忽略。默认为 5672,如果启用了 SSL,则为 5671。(整数,默认值:
<none>
) - publisher-confirm-type
-
确认使用的发布者类型。(ConfirmType,默认值:
<none>
,可能的值:SIMPLE,CORRELATED,NONE) - publisher-returns
-
是否启用发布者退货。(布尔值,默认值:
false
) - 请求通道最大值
-
客户端请求的每个连接的通道数。使用 0 表示无限制。(整数,默认值:
2047
) - 请求的心跳
-
请求的检测信号超时;零表示无。如果未指定 duration 后缀,则将使用秒。(持续时间,默认值:
<无>
) - 用户名
-
登录用户以向 broker 进行身份验证。(字符串,默认值:
guest
) - 虚拟主机
-
连接到 broker 时使用的虚拟主机。(字符串,默认值:
<none>
)
spring.rabbitmq.listener.simple
- 确认模式
-
容器的确认模式。(AcknowledgeMode,默认值:
<none>
,可能的值:NONE,MANUAL,AUTO) - 自动启动
-
是否在启动时自动启动容器。(布尔值,默认值:
true
) - 批量大小
-
Batch size,表示为容器要使用的物理消息数。(整数,默认值:
<none>
) - 并发
-
侦听器调用程序线程的最小数量。(整数,默认值:
<none>
) - 已启用 consumer-batch
-
容器是否根据 'receive-timeout' 和 'batch-size' 创建一批消息。强制将 'de-batching-enabled' 强制设置为 true,以将创建者创建的批处理的内容作为离散记录包含在批处理中。(布尔值,默认值:
false
) - 已启用 de-batching
-
容器是应将批处理消息显示为离散消息,还是使用批处理调用侦听器。(布尔值,默认值:
true
) - default-requeue-rejected (默认重新排队被拒绝)
-
默认情况下,是否将被拒绝的投放重新排队。(布尔值,默认值:
<none>
) - 空闲事件间隔
-
应多久发布一次空闲容器事件。(持续时间,默认值:
<无>
) - 最大并发
-
侦听器调用程序线程的最大数量。(整数,默认值:
<none>
) - 缺少队列致命
-
如果容器声明的队列在代理上不可用,是否失败,和/或在运行时删除一个或多个队列时是否停止容器。(布尔值,默认值:
true
) - 预取
-
每个使用者可以未完成的未确认消息的最大数量。(整数,默认值:
<none>
)
spring.rabbitmq.listener 的
- 类型
-
侦听器容器类型。(ContainerType,默认值:
simple
,可能的值:SIMPLE,DIRECT,STREAM
)
另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的其他属性。
关于重试的说明
使用默认的 ackMode (AUTO) 和 requeue (true) 选项,将重试失败的消息传送
无限期。
由于 rabbit source 中没有太多的处理,因此 source 本身失败的风险很小,除非
由于某种原因,下游未连接。
将 requeue 设置为 false 将导致邮件在第一次尝试时被拒绝(并可能发送到 Dead Letter
Exchange/Queue(如果代理是这样配置的)。
的 enableRetry 选项允许配置重试参数,以便可以重试失败的消息投放,并且
最终在重试用尽时丢弃(或死信)。
传递线程在重试间隔期间暂停。
重试选项包括 enableRetry、maxAttempts、initialRetryInterval、retryMultiplier 和 maxRetryInterval。
消息传送失败并显示 MessageConversionException 永远不会重试;假设是,如果消息
无法转换,则后续尝试也将失败。
此类消息将被丢弃(或死信)。Binder |
5.12.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
5.12.5. 示例
java -jar rabbit-source.jar --rabbit.queues=
5.13. Amazon S3 源
此源应用程序支持使用 Amazon S3 协议传输文件。
文件从目录 (S3 存储桶) 传输到部署应用程序的目录。remote
local
默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是
使用选项进行自定义:--mode
-
裁判提供参考
java.io.File
-
线将逐行拆分文件,并为每行发出一条新消息
-
内容默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头和文件结尾标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines
--withMarkers=true
true
FileSplitter
FileSplitter.FileMarker
withMarkers
false
另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。
5.13.1. 模式 = 行
头:
-
Content-Type: text/plain
-
file_orginalFile: <java.io.File>
-
file_name: <file name>
-
correlationId: <UUID>
(每行相同) -
sequenceNumber: <n>
-
sequenceSize: 0
(在读取文件之前不知道行数)
有效载荷:
A 表示每行。String
第一行前面有一条带有标记有效负载的消息(可选)。
最后一行(可选)后跟带有标记有效负载的消息。START
END
标记存在和格式由 和 属性确定。with-markers
markers-json
5.13.2. 模式 = ref
头:
没有。
有效载荷:
一个对象。java.io.File
5.13.3. 选项
有以下选项:s3-source
按前缀分组的属性:
file.consumer 文件
- 标记 JSON
-
当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:
<none>
,可能的值:ref,lines,contents
)
- with-markers
-
设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:
<none>
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认值:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认值:
25
) - read-capacity (读取容量)
-
读取表上的 capacity。(长,默认值:
1
) - 桌子
-
元数据的表名称。(字符串,默认值:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认值:
<none>
) - 写入容量
-
表上的写入容量。(长,默认值:
1
)
元数据.store.jdbc
- 地区
-
此存储中保留的消息的唯一分组标识符。(字符串,默认值:
DEFAULT
) - 表前缀
-
自定义表名称的前缀。(字符串,默认值:
<none>
)
元数据.store.mongo-db
- 收集
-
元数据的 MongoDB 集合名称。(字符串,默认值:
metadataStore
)
metadata.store.redis
- 钥匙
-
元数据的 Redis 键。(字符串,默认值:
<none>
)
metadata.store
- 类型
-
指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:
<none>
,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)
元数据.store.zookeeper
- 连接字符串
-
HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:
127.0.0.1:2181
) - 编码
-
在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:
UTF-8
) - 重试间隔
-
Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:
1000
) - 根
-
根节点 - 存储条目是此节点的子项。(字符串,默认值:
/SpringIntegration-MetadataStore
)
s3.supplier
- 自动创建本地目录
-
创建或不创建本地目录。(布尔值,默认值:
true
) - 删除远程文件
-
处理后删除或不删除远程文件。(布尔值,默认值:
false
) - 文件名模式
-
用于筛选远程文件的模式。(字符串,默认值:
<none>
) - 文件名-regex
-
用于过滤远程文件的 regexp。(模式,默认值:
<无>
) - 仅列表
-
设置为 true 可返回 s3 对象元数据,而不将文件复制到本地目录。(布尔值,默认值:
false
) - 本地目录
-
用于存储文件的本地目录。(文件,默认值:
<无>
) - preserve-timestamp
-
将远程文件的时间戳传输到本地文件。(布尔值,默认值:
true
) - 远程目录
-
AWS S3 存储桶资源。(字符串,默认值:
bucket
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认值:
/
) - tmp-file-suffix 文件后缀
-
临时文件后缀。(字符串,默认值:
.tmp
)
spring.cloud.aws.credentials
- 访问密钥
-
要用于静态提供程序的访问密钥。(字符串,默认值:
<none>
) - 实例配置文件
-
配置实例配置文件凭证提供程序,无需进一步配置。(布尔值,默认值:
false
) - 轮廓
-
AWS 配置文件。(配置文件,默认值:
<无>
) - 密钥
-
要用于静态提供程序的密钥。(字符串,默认值:
<none>
)
spring.cloud.aws.region
- 实例配置文件
-
配置实例配置文件区域提供程序,无需进一步配置。(布尔值,默认值:
false
) - 轮廓
-
AWS 配置文件。(配置文件,默认值:
<无>
) - 静态的
-
<缺少文档>(字符串,默认值:
<none>
)
spring.cloud.aws.s3 的
- 已启用加速模式
-
在访问 S3 时启用使用加速终端节点的选项。Accelerate 终端节点允许使用 Amazon CloudFront 的全球分布式边缘站点更快地传输对象。(布尔值,默认值:
<none>
) - 已启用校验和验证
-
用于禁用对 S3 中存储的对象的校验和进行验证的选项。(布尔值,默认值:
<none>
) - 已启用 chunked-encoding
-
在对 {@link software.amazon.awssdk.services.s3.model.PutObjectRequest} 和 {@link software.amazon.awssdk.services.s3.model.UploadPartRequest} 的请求负载进行签名时启用使用分块编码的选项。(布尔值,默认值:
<none>
) - 已启用跨区域
-
启用跨区域存储桶访问。(布尔值,默认值:
<none>
) - 端点
-
覆盖默认终端节点。(URI,默认值:
<none>
) - 启用路径样式访问
-
启用使用路径样式访问而不是 DNS 样式访问来访问 S3 对象的选项。DNS 样式访问是首选,因为它将在访问 S3 时实现更好的负载平衡。(布尔值,默认值:
<none>
) - 地区
-
覆盖默认区域。(字符串,默认值:
<none>
) - use-arn-region-enabled 已启用
-
如果 S3 资源 ARN 作为 S3 操作的目标传入,而该操作的区域与客户端配置的区域不同,则必须将此标志设置为“true”,以允许客户端对 ARN 中指定的区域进行跨区域调用,否则将引发异常。(布尔值,默认值:
<none>
)
5.13.4. Amazon AWS 常用选项
Amazon S3 源(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为基础,其自动配置类由 Spring Boot 自动使用。 请查阅其文档,了解必需和有用的自动配置属性。
5.13.5. 示例
java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines
5.14. SFTP 源
此源应用程序支持使用 SFTP 协议传输文件。
文件将从目录传输到部署应用程序的目录。
默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是
使用选项进行自定义:remote
local
--mode
-
裁判提供参考
java.io.File
-
线将逐行拆分文件,并为每行发出一条新消息
-
内容默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头和文件结尾标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines
--withMarkers=true
true
FileSplitter
FileSplitter.FileMarker
withMarkers
false
有关高级配置选项,请参阅 sftp-supplier
。
另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。
5.14.1. 输入
N/A (从 SFTP 服务器获取文件)。
5.14.2. 输出
mode = 内容
头:
-
Content-Type: application/octet-stream
-
file_name: <file name>
-
file_remoteFileInfo <file metadata>
-
file_remoteHostPort: <host:port>
-
file_remoteDirectory: <relative-path>
-
file_remoteFile: <file-name>
-
sftp_selectedServer: <server-key>
(如果是多源)
有效载荷:
A 填充了文件内容。byte[]
模式 = 行
头:
-
Content-Type: text/plain
-
file_name: <file name>
-
correlationId: <UUID>
(每行相同) -
sequenceNumber: <n>
-
sequenceSize: 0
(在读取文件之前不知道行数) -
file_marker : <file marker>
(如果启用了 with-markers)
有效载荷:
A 表示每行。String
第一行前面有一条带有标记有效负载的消息(可选)。
最后一行(可选)后跟带有标记有效负载的消息。START
END
标记存在和格式由 和 属性确定。with-markers
markers-json
模式 = ref
头:
-
file_remoteHostPort: <host:port>
-
file_remoteDirectory: <relative-path>
-
file_remoteFile: <file-name>
-
file_originalFile: <absolute-path-of-local-file>
-
file_name <local-file-name>
-
file_relativePath
-
file_remoteFile: <remote-file-name>
-
sftp_selectedServer: <server-key>
(如果是多源)
有效载荷:
一个对象。java.io.File
5.14.3. 选项
ftp 源具有以下选项:
按前缀分组的属性:
file.consumer 文件
- 标记 JSON
-
当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:
<none>
,可能的值:ref,lines,contents
)
- with-markers
-
设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:
<none>
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认值:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认值:
25
) - read-capacity (读取容量)
-
读取表上的 capacity。(长,默认值:
1
) - 桌子
-
元数据的表名称。(字符串,默认值:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认值:
<none>
) - 写入容量
-
表上的写入容量。(长,默认值:
1
)
元数据.store.jdbc
- 地区
-
此存储中保留的消息的唯一分组标识符。(字符串,默认值:
DEFAULT
) - 表前缀
-
自定义表名称的前缀。(字符串,默认值:
<none>
)
元数据.store.mongo-db
- 收集
-
元数据的 MongoDB 集合名称。(字符串,默认值:
metadataStore
)
metadata.store.redis
- 钥匙
-
元数据的 Redis 键。(字符串,默认值:
<none>
)
metadata.store
- 类型
-
指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:
<none>
,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)
元数据.store.zookeeper
- 连接字符串
-
HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:
127.0.0.1:2181
) - 编码
-
在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:
UTF-8
) - 重试间隔
-
Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:
1000
) - 根
-
根节点 - 存储条目是此节点的子项。(字符串,默认值:
/SpringIntegration-MetadataStore
)
sftp.supplier
- 自动创建本地目录
-
如果本地目录不存在,则设置为 true 以创建本地目录。(布尔值,默认值:
true
) - empty-时延迟
-
未检测到新文件时的延迟持续时间。(持续时间,默认:
1 秒
) - 删除远程文件
-
设置为 true 可在成功传输后删除远程文件。(布尔值,默认值:
false
) - 目录
-
工厂 “name.directory” 对的列表。(String[],默认值:
<none>
) - 工厂
-
工厂名称到工厂的映射。(Map<String,Factory>,默认值:
<none>
) - 公平
-
True 表示多个服务器/目录的公平轮换。默认情况下,这是 false,因此如果源具有多个条目,则将在访问其他源之前收到这些条目。(布尔值,默认值:
false
) - 文件名模式
-
用于匹配要传输的文件的名称的筛选模式。(字符串,默认值:
<none>
) - 文件名-regex
-
用于匹配要传输的文件的名称的筛选器正则表达式模式。(模式,默认值:
<无>
) - 仅列表
-
设置为 true 可返回文件元数据,而不返回整个负载。(布尔值,默认值:
false
) - 本地目录
-
用于文件传输的本地目录。(文件,默认值:
<无>
) - max-fetch (最大获取)
-
每次轮询要获取的最大远程文件数;默认 unlimited 。在列出文件或构建任务启动请求时不适用。(整数,默认值:
<none>
) - preserve-timestamp
-
设置为 true 可保留原始时间戳。(布尔值,默认值:
true
) - 远程目录
-
远程 FTP 目录。(字符串,默认值:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认值:
/
) - 重命名远程文件
-
成功传输后,必须将解析为新名称 remote files 的 SPEL 表达式重命名为该表达式。(表达式,默认值:
<none>
) - 流
-
设置为 true 可流式传输文件,而不是复制到本地目录。(布尔值,默认值:
false
) - tmp-file-suffix 文件后缀
-
传输过程中要使用的后缀。(字符串,默认值:
.tmp
)
sftp.supplier.factory
- 允许未知密钥
-
如果为 True,则允许未知或更改的键。(布尔值,默认值:
false
) - 主机
-
服务器的主机名。(字符串,默认值:
localhost
) - 已知主机表达式
-
解析为已知主机文件位置的 SpEL 表达式。(表达式,默认值:
<none>
) - 密码短语
-
用户私钥的密码。(字符串,默认值:
<空字符串>
) - 密码
-
用于连接到服务器的密码。(字符串,默认值:
<none>
) - 港口
-
服务器的端口。(整数,默认值:
22
) - 私钥
-
用户私钥的资源位置。(资源,默认值:
<无>
) - 用户名
-
用于连接到服务器的用户名。(字符串,默认值:
<none>
)
sftp.supplier.排序方式
- 属性
-
要排序依据的文件列表条目的属性(FILENAME、ATIME:上次访问时间、MTIME:上次修改时间)。(属性,默认值:
<无>
) - 迪尔
-
排序方向(ASC 或 DESC)。(Dir,默认值:
<none>
)
5.14.4. 示例
java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
--sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo
5.15. 系统日志
syslog 源通过 UDP 和/或 TCP 接收 SYSLOG 数据包。支持 RFC3164 (BSD) 和 RFC5424 格式。
5.15.1. 选项
- syslog.supplier.buffer-size
-
解码消息时使用的缓冲区大小;较大的邮件将被拒绝。(整数,默认值:
2048
) - syslog.supplier.nio
-
是否使用 NIO(当支持大量连接时)。(布尔值,默认值:
false
) - syslog.supplier.port
-
要侦听的端口。(整数,默认值:
1514
) - syslog.supplier.protocol
-
用于 SYSLOG 的协议(tcp 或 udp)。(协议,默认值:
<none>
,可能的值:tcp,udp,both
)
- syslog.supplier.reverse-lookup
-
是否对 incoming socket 执行反向查找。(布尔值,默认值:
false
) - syslog.supplier.rfc
-
'5424' 或 '3164' - 根据 RFC 的 syslog 格式;3164 又名“BSD”格式。(字符串,默认值:
3164
) - syslog.supplier.socket-超时
-
套接字超时。(整数,默认值:
0
)
5.16. TCP 协议
源充当服务器,并允许远程方连接到它并通过原始 tcp 套接字提交数据。tcp
TCP 是一种流协议,需要一些机制来在网络上构建消息。许多解码器是 available,默认值为 'CRLF',它与 Telnet 兼容。
TCP 源应用程序生成的消息具有有效负载。byte[]
5.16.1. 选项
按前缀分组的属性:
TCP 协议
- 蔚来
-
是否使用 NIO。(布尔值,默认值:
false
) - 港口
-
要侦听的端口;0 让操作系统选择一个端口。(整数,默认值:
1234
) - 反向查找
-
对远程 IP 地址执行反向 DNS 查找;如果为 false,则邮件报头中仅包含 IP 地址。(布尔值,默认值:
false
) - 套接字超时
-
未收到数据时关闭套接字之前的超时 (ms)。(整数,默认值:
120000
) - 使用直接缓冲区
-
是否使用直接缓冲区。(布尔值,默认值:
false
)
tcp.supplier
- 缓冲区大小
-
解码消息时使用的缓冲区大小;较大的邮件将被拒绝。(整数,默认值:
2048
) - 译码器
-
接收消息时使用的解码器。(编码,默认值:
<none>
,可能的值:CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
5.16.2. 可用的解码器
- CRLF (默认)
-
以回车 (0x0d) 后跟换行符 (0x0a) 结尾的文本
- 如果
-
由换行符终止的文本 (0x0a)
- 零
-
以 null 字节 (0x00) 结尾的文本
- STXETX
-
文本前面有 STX (0x02) 并以 ETX (0x03) 结尾
- 生
-
no structure - 客户端通过关闭套接字来指示完整的消息
- L1 系列
-
数据前面有一个单字节(无符号)长度字段(最多支持 255 个字节)
- L2 (二层)
-
数据前面有一个两字节(无符号)长度的字段(最多 2 个16-1 字节)
- L4 系列
-
数据前面有一个四字节(带符号)长度的字段(最多 2 个31-1 字节)
5.17. 时间源
时间源将每隔一段时间简单地发出一个包含当前时间的 String。
5.17.1. 选项
时间源具有以下选项:
- spring.integration.poller.cron
-
Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:
<none>
) - spring.integration.poller.fixed-delay
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:
<无>
) - spring.integration.poller.fixed-rate
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:
<无>
) - spring.integration.poller.initial-delay
-
轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:
<无>
) - 每次轮询 spring.integration.poller.max 条消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认值:
<none>
) - spring.integration.poller.receive-timeout
-
轮询消息等待多长时间。(持续时间,默认:
1 秒
) - time.date-格式
-
日期值的格式。(字符串,默认值:
MM/dd/yy HH:mm:ss
)
5.18. Twitter 消息源
重复检索过去 30 天内的直接消息(发送和接收),按时间倒序排序。
已释放的消息将缓存(在缓存中)以防止重复。
默认情况下,使用 in-memory。MetadataStore
SimpleMetadataStore
控制消息的数量或返回的消息。twitter.message.source.count
这些属性控制消息轮询间隔。
必须与使用的 API 速率限制保持一致spring.cloud.stream.poller
5.18.1. 选项
按前缀分组的属性:
spring.integration.poller
- cron (定时)
-
Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:
<none>
) - 固定延迟
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:
<无>
) - 固定利率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:
<无>
) - 初始延迟
-
轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:
<无>
) - 最大每次轮询消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认值:
<none>
) - 接收超时
-
轮询消息等待多长时间。(持续时间,默认:
1 秒
)
推特连接
- 访问令牌
-
您的 Twitter 令牌。(字符串,默认值:
<none>
) - 访问令牌密钥
-
您的 Twitter 令牌密钥。(字符串,默认值:
<none>
) - consumer-key (使用者密钥)
-
您的 Twitter 密钥。(字符串,默认值:
<none>
) - 消费者密钥
-
您的 Twitter 秘密。(字符串,默认值:
<none>
) - 已启用调试
-
启用 Twitter4J 调试模式。(布尔值,默认值:
false
) - raw-json 格式
-
启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认值:
true
)
推特.message.source
- 计数
-
要返回的最大事件数。默认为 20。最大 50 (整数,默认值:
20
)
5.19. Twitter 搜索源
Twitter 的标准搜索 API(搜索/推文)允许对最近或热门推文的索引进行简单查询。这提供了对过去 7 天内发布的最近推文样本的连续搜索。“公共”API 集的一部分。Source
返回与指定查询匹配的相关推文的集合。
使用属性可控制连续搜索请求之间的间隔。速率限制 - 每 30 分钟窗口 180 个请求(例如 ~6 r/m,~ 1 个请求/10 秒)spring.cloud.stream.poller
查询属性允许按关键字进行查询,并按时间和地理位置筛选结果。twitter.search
和 根据搜索 API 控制结果分页。twitter.search.count
twitter.search.page
注意:Twitter 的搜索服务以及 Search API 并不是详尽的推文来源。并非所有推文都会被编入索引或通过搜索界面提供。
5.19.1. 选项
按前缀分组的属性:
spring.integration.poller
- cron (定时)
-
Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:
<none>
) - 固定延迟
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:
<无>
) - 固定利率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:
<无>
) - 初始延迟
-
轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:
<无>
) - 最大每次轮询消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认值:
<none>
) - 接收超时
-
轮询消息等待多长时间。(持续时间,默认:
1 秒
)
推特连接
- 访问令牌
-
您的 Twitter 令牌。(字符串,默认值:
<none>
) - 访问令牌密钥
-
您的 Twitter 令牌密钥。(字符串,默认值:
<none>
) - consumer-key (使用者密钥)
-
您的 Twitter 密钥。(字符串,默认值:
<none>
) - 消费者密钥
-
您的 Twitter 秘密。(字符串,默认值:
<none>
) - 已启用调试
-
启用 Twitter4J 调试模式。(布尔值,默认值:
false
) - raw-json 格式
-
启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认值:
true
)
推特搜索
- 计数
-
每个页面要返回的推文数(例如,每个请求),最多 100 条。(整数,默认值:
100
) - 朗
-
将搜索的推文限制为给定的语言,由 http://en.wikipedia.org/wiki/ISO_639-1 指定。(字符串,默认值:
<none>
) - 页
-
在再次从最近的推文开始搜索之前,要向后搜索(从最新到最早的推文)的页面数(例如请求)。向后搜索的推文总数为 (page * count) (整数,默认值:
3
) - 查询
-
按搜索查询字符串搜索推文。(字符串,默认值:
<none>
) - 从最近空响应重新启动
-
从最新的 tweets 重新开始搜索 empty response。仅在第一次重启后应用(例如,当 since_id != UNBOUNDED 时)(布尔值,默认值:
false
) - result-type
-
指定您希望接收的搜索结果类型。当前默认值为 “mixed”。有效值包括:mixed :在响应中同时包含常用结果和实时结果。recent :仅返回响应中的最新结果 popular :仅返回响应中最受欢迎的结果(ResultType,默认值:
<none>
,可能的值:popular,mixed,recent)
- 因为
-
如果指定,则返回自给定日期以来的推文。日期的格式应为 YYYY-MM-DD。(字符串,默认值:
<none>
)
twitter.search.geocode
- 纬度
-
用户的纬度。(Double,默认值:
-1
) - 经度
-
用户的经度。(Double,默认值:
-1
) - 半径
-
围绕(纬度、经度)点的半径(以公里为单位)。(Double,默认值:
-1
)
5.20. Twitter 流源
-
返回与一个或多个筛选条件谓词匹配的公有状态。 多个参数允许使用与 Streaming API 的单个连接。 提示:、 、 和 字段与 OR 运算符组合在一起! 查询并返回与 OR 匹配的推文 由 用户 创建。
Filter API
track
follow
locations
track=foo
follow=1234
test
1234
-
这将返回所有公共状态的小型随机样本。 默认访问级别返回的推文是相同的,因此,如果两个不同的客户端连接到此终端节点,它们将看到相同的推文。
Sample API
默认访问级别最多允许 400 个跟踪关键词、5,000 个关注用户 ID 和 25 个 0.1-360 度位置框。
5.20.1. 选项
按前缀分组的属性:
推特连接
- 访问令牌
-
您的 Twitter 令牌。(字符串,默认值:
<none>
) - 访问令牌密钥
-
您的 Twitter 令牌密钥。(字符串,默认值:
<none>
) - consumer-key (使用者密钥)
-
您的 Twitter 密钥。(字符串,默认值:
<none>
) - 消费者密钥
-
您的 Twitter 秘密。(字符串,默认值:
<none>
) - 已启用调试
-
启用 Twitter4J 调试模式。(布尔值,默认值:
false
) - raw-json 格式
-
启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认值:
true
)
推特.stream.filter
- 计数
-
指示在过渡到实时流之前要流式传输的先前状态的数量。(整数,默认值:
0
) - filter-level (过滤器级别)
-
筛选条件级别将流中显示的推文限制为具有最小 filterLevel 属性值的推文。none (无)、low (低) 或 medium (中等) 之一。(FilterLevel,默认值:
<none>
) - 跟随
-
按 ID 指定要从中接收公共推文的用户。(List<Long>,默认值:
<none>
) - 语言
-
指定流的推文语言。(List<String>,默认值:
<none>
) - 地点
-
要跟踪的位置。内部表示为 2D 数组。边界框无效:52.38、4.90、51.51、-0.12。第一对必须是框的 SW 角(List<BoundingBox>,默认值:
<none>
) - 跟踪
-
指定要跟踪的关键字。(List<String>,默认值:
<none>
)
推特
- 类型
-
<缺少文档>(StreamType,默认值:
<none>
,可能的值:sample,filter,firehose,link
)
5.21. Websocket 源码
通过 Web 套接字生成消息的源。Websocket
5.21.1. 选项
按前缀分组的属性:
websocket.supplier
- 允许的来源
-
允许的源。(字符串,默认值:
*
) - 路径
-
公开服务器 WebSocket 处理程序的路径。(字符串,默认值:
/websocket
)
websocket.supplier.sock-js
- 使
-
在服务器上启用 Sockjs 服务。默认值为 'false' (布尔值,默认值:
false
)
5.21.2. 示例
要验证 websocket-source 是否从 Websocket 客户端接收消息,您可以使用以下简单的端到端设置。
第 1 步:启动 Kafka
第 2 步:在特定端口上部署,例如 8080websocket-source
第 3 步:在 8080 端口路径 “/websocket” 上连接一个 websocket 客户端,并发送一些消息。
您可以启动 Kafka 控制台使用者并在其中查看消息。
5.22. XMPP 源
“xmpp” 源允许从 XMPP 服务器接收消息。
5.22.1. 输入
不适用
5.22.2. 输出
有效载荷
-
byte[]
5.22.3. 选项
xmpp 源具有以下选项:
按前缀分组的属性:
xmpp.工厂
- 主机
-
要连接的 XMPP 主机服务器。(字符串,默认值:
<none>
) - 密码
-
已连接用户的密码。(字符串,默认值:
<none>
) - 港口
-
用于连接到主机的端口。- 默认客户端端口:5222(整数,默认值:
5222
) - 资源
-
要在 XMPP 主机上绑定到的资源。- 可以为空,如果未设置,服务器将生成一个(字符串,默认值:
<none>
) - 安全模式
-
<缺少文档> (SecurityMode,默认值:
<none>
,可能的值:required,ifpossible,disabled
)
- 服务名称
-
要为 XMPP 域设置的服务名称。(字符串,默认值:
<none>
) - 订阅模式
-
<缺少文档>(SubscriptionMode,默认值:
<none>
,可能的值:accept_all,reject_all,manual)
- 用户
-
连接应连接的 User。(字符串,默认值:
<none>
)
xmpp.supplier
- 有效载荷表达式
-
<缺少文档>(表达式,默认值:
<none>
) - 节过滤器
-
<缺少文档>(StanzaFilter,默认值:
<none>
)
另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的其他属性。
5.22.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
5.22.5. 示例
java -jar xmpp-source.jar --xmpp.factory.host=localhost --xmpp.factory.port=5222 --xmpp.factory.user=jane --xmpp.factory.password=secret --xmpp.factory.service-name=localhost
5.23. ZeroMQ 源码
“zeromq” 源允许从 ZeroMQ 接收消息。
5.23.1. 输入
不适用
5.23.2. 输出
有效载荷
-
byte[]
5.23.3. 选项
zeromq 源具有以下选项:
- zeromq.supplier.bind-port
-
Bind Port 用于创建 ZeroMQ Socket;0 选择随机端口。(整数,默认值:
0
) - zeromq.supplier.connect-url
-
ZeroMQ 套接字的连接 URL。(字符串,默认值:
<none>
) - zeromq.supplier.consume-delay
-
未收到数据时从 ZeroMQ Socket 消耗的延迟。(持续时间,默认:
1 秒
) - zeromq.supplier.socket类型
-
连接应进行的 Socket Type。(SocketType,默认值:
<none>
,可能的值:PAIR,PUBSUB
,
REQ,REP,DEALER,ROUTER,PULL,PUSH,XPUB,XSUB,STREAM,CLIENT,SERVER,RADIO,DISH,CHANNEL,PEER,RAW,SCATTER,GATHER
)
- zeromq.supplier.topics
-
要订阅的主题。(String[],默认值:
[]
)
另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的其他属性。
5.23.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
5.23.5. 例子
java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=
6. 处理器
6.1. 聚合器处理器
聚合器处理器使应用程序能够将传入消息聚合到组中,并将其发布到输出目标中。
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。
6.1.1. 负载
如果输入有效负载是 a 且 content-type 标头是 JSON,则函数会尝试将此有效负载反序列化为 a,以便在聚合器函数的输出上更好地表示数据。
此外,这样的数据表示可以轻松地从下面提到的 SPEL 表达式中访问有效负载内容。
否则(包括反序列化错误),输入有效负载将保持原样 - 并且是将其转换为所需形式的目标应用程序配置。byte[]
JsonBytesToMap
Map
Map
6.1.2. 选项
按前缀分组的属性:
聚合
- 集合体
-
聚合策略的 SpEL 表达式。默认值是有效负载的集合。(表达式,默认值:
<none>
) - 相关
-
相关键的 SPEL 表达式。默认为 correlationId 标头。(表达式,默认值:
<none>
) - 组超时
-
超时到过期未完成的组的 SPEL 表达式。(表达式,默认值:
<none>
) - 消息存储实体
-
持久性消息存储实体:RDBMS 中的表前缀、MongoDB 中的集合名称等(字符串,默认值:
<none>
) - 消息存储类型
-
消息存储类型。(字符串,默认值:
<none>
) - 释放
-
用于发布策略的 SPEL 表达式。默认基于 sequenceSize 标头。(表达式,默认值:
<none>
)
spring.data.mongodb
- 附加主机
-
其他服务器主机。不能使用 URI 设置,或者如果未指定 'host' 。其他主机将使用默认的 mongo 端口 27017。如果您想使用不同的端口,可以使用 “host:port” 语法。(List<String>,默认值:
<none>
) - 身份验证数据库
-
身份验证数据库名称。(字符串,默认值:
<none>
) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认值:
<none>
) - 数据库
-
数据库名称。覆盖 URI 中的数据库。(字符串,默认值:
<none>
) - 字段命名策略
-
要使用的 FieldNamingStrategy 的完全限定名称。(类<?>,默认值:
<无>
) - 主机
-
Mongo 服务器主机。不能使用 URI 设置。(字符串,默认值:
<none>
) - 密码
-
mongo 服务器的登录密码。不能使用 URI 设置。(字符[],默认值:
<无>
) - 港口
-
Mongo 服务器端口。不能使用 URI 设置。(整数,默认值:
<none>
) - 副本集名称
-
集群所需的副本集名称。不能使用 URI 设置。(字符串,默认值:
<none>
) - URI
-
Mongo 数据库 URI。覆盖主机、端口、用户名和密码。(字符串,默认值:
mongodb://localhost/test
) - 用户名
-
mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认值:
<none>
) - uuid 表示
-
将 UUID 转换为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认值:
java-legacy
,可能的值:UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)
spring.data.mongodb.gridfs
- 桶
-
GridFS 存储桶名称。(字符串,默认值:
<none>
) - 数据库
-
GridFS 数据库名称。(字符串,默认值:
<none>
)
spring.data.mongodb.ssl
- 捆
-
SSL 捆绑包名称。(字符串,默认值:
<none>
) - 启用
-
是否启用 SSL 支持。如果提供了 “bundle”,则自动启用,除非另有说明。(布尔值,默认值:
<none>
)
spring.data.redis 的
- 客户端名称
-
要在使用 CLIENT SETNAME 的连接上设置的客户端名称。(字符串,默认值:
<none>
) - 客户端类型
-
要使用的客户端类型。默认情况下,根据 Classpath 自动检测。(ClientType,默认值:
<none>
,可能的值:LETTUCE,JEDIS
)
- 连接超时
-
连接超时。(持续时间,默认值:
<无>
) - 数据库
-
连接工厂使用的数据库索引。(整数,默认值:
0
) - 主机
-
Redis 服务器主机。(字符串,默认值:
localhost
) - 密码
-
redis 服务器的登录密码。(字符串,默认值:
<none>
) - 港口
-
Redis 服务器端口。(整数,默认值:
6379
) - 超时
-
读取超时。(持续时间,默认值:
<无>
) - 网址
-
连接 URL。覆盖主机、端口、用户名和密码。示例:redis://user:[email protected]:6379 (字符串,默认值:
<none>
) - 用户名
-
redis 服务器的登录用户名。(字符串,默认值:
<none>
)
spring.data.redis.cluster
- 最大重定向数
-
在整个集群中执行命令时要遵循的最大重定向数。(整数,默认值:
<none>
) - 节点
-
要从中引导的 “host:port” 对的逗号分隔列表。这表示群集节点的“初始”列表,并且至少需要一个条目。(List<String>,默认值:
<none>
)
spring.data.redis.jedis.pool
- 启用
-
是否启用池。如果“commons-pool2”可用,则自动启用。使用 Jedis,在 Sentinel 模式下隐式启用池化,此设置仅适用于单节点设置。(布尔值,默认值:
<none>
) - 最大活动
-
池在给定时间可分配的最大连接数。使用负值表示无限制。(整数,默认值:
8
) - max-idle (最大空闲)
-
池中 “空闲” 连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认值:
8
) - 最大等待
-
当池耗尽时,连接分配在引发异常之前应阻止的最长时间。使用负值可无限期阻止。(持续时间,默认值:
-1ms
) - 最小空闲
-
目标是要在池中维护的最小空闲连接数。仅当 it 和驱逐运行之间的时间均为正数时,此设置才有效。(整数,默认值:
0
) - 驱逐运行之间的时间
-
空闲对象 evictor 线程运行之间的时间。当为正数时,空闲对象驱逐线程将启动,否则不执行空闲对象驱逐。(持续时间,默认值:
<无>
)
spring.data.redis.lettuce.pool
- 启用
-
是否启用池。如果“commons-pool2”可用,则自动启用。使用 Jedis,在 Sentinel 模式下隐式启用池化,此设置仅适用于单节点设置。(布尔值,默认值:
<none>
) - 最大活动
-
池在给定时间可分配的最大连接数。使用负值表示无限制。(整数,默认值:
8
) - max-idle (最大空闲)
-
池中 “空闲” 连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认值:
8
) - 最大等待
-
当池耗尽时,连接分配在引发异常之前应阻止的最长时间。使用负值可无限期阻止。(持续时间,默认值:
-1ms
) - 最小空闲
-
目标是要在池中维护的最小空闲连接数。仅当 it 和驱逐运行之间的时间均为正数时,此设置才有效。(整数,默认值:
0
) - 驱逐运行之间的时间
-
空闲对象 evictor 线程运行之间的时间。当为正数时,空闲对象驱逐线程将启动,否则不执行空闲对象驱逐。(持续时间,默认值:
<无>
)
spring.data.redis.lettuce
- shutdown-timeout (关闭超时)
-
关闭超时。(持续时间,默认值:
100 毫秒
)
spring.data.redis.sentinel
- 主人
-
Redis 服务器的名称。(字符串,默认值:
<none>
) - 节点
-
以逗号分隔的 “host:port” 对列表。(List<String>,默认值:
<none>
) - 密码
-
用于使用 Sentinel 进行身份验证的密码。(字符串,默认值:
<none>
) - 用户名
-
用于向 Sentinel 进行身份验证的登录用户名。(字符串,默认值:
<none>
)
spring.data.redis.ssl
- 捆
-
SSL 捆绑包名称。(字符串,默认值:
<none>
) - 启用
-
是否启用 SSL 支持。如果提供了 “bundle”,则自动启用,除非另有说明。(布尔值,默认值:
<none>
)
spring.datasource
- 驱动程序类名称
-
JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认值:
<none>
) - 密码
-
数据库的登录密码。(字符串,默认值:
<none>
) - 网址
-
数据库的 JDBC URL。(字符串,默认值:
<none>
) - 用户名
-
数据库的登录用户名。(字符串,默认值:
<none>
)
6.3. Filter处理器
筛选器处理器使应用程序能够检查传入的有效负载,然后对其应用谓词,以决定是否需要继续记录。
例如,如果传入的有效负载是 type 并且您想要过滤掉少于 5 个字符的任何内容,则可以运行过滤器处理器,如下所示。String
java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。
6.3.1. 负载
您可以将任何类型作为有效负载传递,然后对其应用 SpEL 表达式以进行筛选。
如果传入类型为 且内容类型设置为 或 ,则应用程序会将 转换为 。byte[]
text/plain
application/json
byte[]
String
6.3.2. 选项
- filter.function.expression
-
要针对要筛选的请求消息应用的布尔 SpEL 表达式。(表达式,默认值:
<none>
)
6.4. Groovy 处理器
对消息应用 Groovy 脚本的处理器。
6.4.1. 选项
groovy-processor 处理器具有以下选项:
- groovy-processor.script
-
对用于处理消息的脚本的引用。(资源,默认值:
<无>
) - groovy-processor.variables
-
变量绑定作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - groovy-processor.variables-位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<无>
)
6.5. Header Enricher 处理器
使用 header-enricher 应用程序添加消息标头。
标头以换行分隔的键值对的形式提供,其中键是标头名称,值是 SpEL 表达式。
例如。--headers='foo=payload.someProperty \n bar=payload.otherProperty'
6.5.1. 选项
header-enricher 处理器具有以下选项:
- header.enricher.headers
-
\n 分隔的属性表示标头,其中的值是 SpEL 表达式,例如 foo='bar' \n baz=payload.baz。(属性,默认值:
<无>
) - header.enricher.overwrite
-
设置为 true 可覆盖任何现有邮件标头。(布尔值,默认值:
false
)
6.6. Http 请求处理器
一个处理器应用程序,它向 HTTP 资源发出请求并将响应正文作为消息负载发出。
6.6.1. 输入
头
任何必需的 HTTP 标头都必须通过 or 属性显式设置。请参阅下面的示例。
标头值也可用于构造:headers
headers-expression
-
在属性中引用时的请求正文。
body-expression
-
在属性中引用的 HTTP 方法。
http-method-expression
-
在属性中引用时的 URL。
url-expression
有效载荷
默认情况下,有效负载用作 POST 请求的请求正文,可以是任何 Java 类型。 对于 GET 请求,它应为空 String。 payload 还可用于构造:
-
在属性中引用时的请求正文。
body-expression
-
在属性中引用的 HTTP 方法。
http-method-expression
-
在属性中引用时的 URL。
url-expression
底层 WebClient 支持 Jackson JSON 序列化,以在必要时支持任何请求和响应类型。
默认情况下,该属性可以设置为应用程序类路径中的任何类。
请注意,用户定义的有效负载类型需要向 pom 文件添加所需的依赖项。expected-response-type
String.class
6.6.2. 输出
头
没有 HTTP 消息标头映射到出站消息。
有效载荷
原始输出对象是 ResponseEntity<?>它的任何字段(例如、、)或访问器方法 () 都可以作为 .
默认情况下,出站 Message 负载是响应正文。
请注意, ResponseEntity (由表达式引用) 默认情况下不能由 Jackson 反序列化,但可以呈现为 .body
headers
statusCode
reply-expression
#root
HashMap
6.6.3. 选项
http-request 处理器有以下选项:
6.6.4. 选项
按前缀分组的属性:
http.request 请求
- 身体表情
-
一个 SPEL 表达式,用于从传入消息派生请求正文。(表达式,默认值:
<none>
) - 预期响应类型
-
用于解释响应的类型。(类<?>,默认值:
<无>
) - headers-表达式
-
用于派生要使用的 http 标头映射的 SPEL 表达式。(表达式,默认值:
<none>
) - http-method-expression 表达式
-
一个 SpEL 表达式,用于从传入消息中派生请求方法。(表达式,默认值:
<none>
) - 回复表达式
-
用于计算最终结果的 SPEL 表达式,应用于整个 http {@link org.springframework.http.ResponseEntity}。(表达式,默认值:
<none>
) - 超时
-
请求超时(以毫秒为单位)。(长,默认值:
30000
) - url 表达式
-
针对传入消息的 SPEL 表达式,用于确定要使用的 URL。(表达式,默认值:
<none>
)
spring.codec
- 日志请求详细信息
-
是否在 DEBUG 级别记录表单数据,在 TRACE 级别记录标头。(布尔值,默认值:
false
) - 最大内存大小
-
每当需要聚合 input stream 时可以缓冲的字节数的限制。这仅适用于自动配置的 WebFlux 服务器和 WebClient 实例。默认情况下,未设置此项,在这种情况下,将应用单个编解码器的默认值。默认情况下,大多数编解码器限制为 256K。(DataSize,默认值:
<none>
)
6.7. 图像识别处理器
使用 Inception 模型进行分类的处理器 在实时图像中分为不同的类别(例如标签)。
模型的输入是二进制数组的图像。
输出是以下格式的 JSON 消息:
{
"labels" : [
{"giant panda":0.98649305}
]
}
Result 包含已识别类别的名称(例如 label)以及图像表示此类别的置信度(例如 confidence)。
如果 the 设置为大于 1 的值,则结果将包含最可能的标签。例如,将返回:response-seize
response-seize
response-size=3
{
"labels": [
{"giant panda":0.98649305},
{"badger":0.010562794},
{"ice bear":0.001130851}
]
}
6.7.1. 负载
如果传入类型为 且内容类型设置为 ,则应用程序会将输入图像处理为并输出增强图像有效负载和 json 标头。byte[]
application/octet-stream
byte[]
byte[]
6.7.2. 选项
- image.recognition.cache-model
-
缓存预先训练的 TensorFlow 模型。(布尔值,默认值:
true
) - image.recognition.debug-output
-
<缺少文档> (布尔值,默认值:
false
) - image.recognition.debug-output-path
-
<缺少文档> (字符串,默认值:
image-recognition-result.png
) - image.recognition.model
-
预先训练的 TensorFlow 图像识别模型。请注意,模型必须与所选模型类型匹配!(字符串,默认值:
https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb
) - image.recognition.model-type
-
支持三种不同的预训练 tensorflow 图像识别模型:Inception、MobileNetV1 和 MobileNetV2 1。Inception Graph 使用 'input' 作为输入,使用 'output' 作为输出。2. MobileNetV2 预训练模型:https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - 标准化图像大小始终为正方形(例如 H=W) - 图形使用“input”作为输入,“MobilenetV2/Predictions/Reshape_1”作为输出。3. MobileNetV1 预训练模型:https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - 图形使用“input”作为输入,“MobilenetV1/Predictions/Reshape_1”作为输出。(ModelType,默认值:
<none>
,可能的值:inception,mobilenetv1,mobilenetv2
) - image.recognition.normalized-image-size
-
标准化图像大小。(整数,默认值:
224
) - image.recognition.response-size
-
已识别图像的数量。(整数,默认值:
5
)
6.8. 对象检测处理器
Object Detection 处理器为 TensorFlow Object Detection API 提供开箱即用的支持。它允许在单个图像或图像流中实时定位和识别多个对象。Object Detection 处理器构建在对象检测功能之上。
以下是一些合理的配置默认值:
-
object.detection.model
:storage.googleapis.com/scdf-tensorflow-models/object-detection/faster_rcnn_resnet101_coco_2018_01_28_frozen_inference_graph.pb
-
object.detection.labels
:storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt
-
object.detection.with-masks
:false
下图显示了一个 Spring Cloud Data Flow,即流式管道,它实时预测输入图像流中的对象类型。
处理器的输入是一个图像字节数组,输出是一个增强的图像,以及一个名为 的标头,它提供检测到的对象的文本描述:detected_objects
{
"labels" : [
{"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
{"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
{"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
{"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
]
}
标头格式为:detected_objects
-
object-name:confidence - 检测到的对象(例如标签)的人类可读名称,其置信度为 [0-1] 之间的浮点数
-
x1, y1, x2, y2 - 响应还提供表示为 的检测到的对象的边界框。坐标是相对于图像大小的大小的。
(x1, y1, x2, y2)
-
cid - 在提供的标签配置文件中定义的分类标识符。
6.8.1. 负载
传入类型为 ,内容类型为 。处理器处理输入图像并输出增强图像有效负载和 JSON 标头 ()。byte[]
application/octet-stream
byte[]
byte[]
detected_objects
6.8.2. 选项
- 对象.detection.cache-model
-
<缺少文档> (布尔值,默认值:
true
) - 对象.detection.confidence
-
<缺少文档> (浮点数,默认值:
0.4
) - object.detection.debug-output
-
<缺少文档> (布尔值,默认值:
false
) - object.detection.debug-output-path
-
<缺少文档> (字符串,默认值:
object-detection-result.png
) - 对象.detection.labels
-
标签 URI。(字符串,默认值:
https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt
) - 对象.detection.model
-
预先训练的 TensorFlow 对象检测模型。(字符串,默认值:
https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb
) - object.detection.response-size 对象
-
<缺少文档> (整数,默认值:
<none>
) - 对象.detection.with-masks
-
<缺少文档> (布尔值,默认值:
false
)
6.9. 语义分割处理器
基于最先进的 DeepLab Tensorflow 模型的图像语义分割。
这是将图像的每个像素与类标签(如花朵、人物、道路、天空、海洋或汽车)相关联的过程。
与生成实例感知区域掩码的 不同,生成类感知掩码的 。
有关实施的信息,请改用对象检测服务。Semantic Segmentation
Instance Segmentation
Semantic Segmentation
Instance Segmentation
它使用语义分割函数库和 TensorFlow 服务。Semantic Segmentation Processor
6.9.1. 负载
传入类型为 ,内容类型为 。处理器处理输入图像并输出增强图像有效负载和 json 标头。byte[]
application/octet-stream
byte[]
byte[]
处理器的输入是一个图像字节数组,输出是一个增强的图像字节数组,以及一个格式为 JSON 标头:semantic_segmentation
[
[ 0, 0, 0 ],
[ 127, 127, 127 ],
[ 255, 255, 255 ],
...
]
输出标头 json 格式表示从输入图像计算的彩色像素图。
6.9.2. 选项
- 语义.segmentation.color-map-uri
-
每个预训练模型都基于特定的对象颜色映射。预定义的选项包括: - classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json (字符串,默认值:
classpath:/colormap/citymap_colormap.json
) - 语义.segmentation.debug-output
-
save output image 在本地 debugOutputPath 路径中。(布尔值,默认值:
false
) - semantic.segmentation.debug-output-path
-
<缺少文档> (字符串,默认值:
semantic-segmentation-result.png
) - semantic.segmentation.mask-transparency
-
计算的分段蒙版图像的 Alpha 颜色。(浮点数,默认值:
0.45
) - 语义.segmentation.model
-
预先训练的 TensorFlow 语义分割模型。(字符串,默认值:
https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb
) - semantic.segmentation.output-type
-
指定输出图像类型。您可以返回带有计算的蒙版叠加的输入图像,也可以单独返回蒙版。(OutputType,默认值:
<none>
,可能的值:blended,mask
)
6.10. 脚本处理器
使用脚本转换消息的处理器。脚本正文是直接提供的 作为属性值。可以指定脚本的语言 (groovy/javascript/ruby/python)。
6.10.1. 选项
script-processor 处理器具有以下选项:
- script-processor.language 语言
-
script 属性中文本的语言。支持:groovy、javascript、ruby、python。(字符串,默认值:
<none>
) - script-processor.脚本
-
脚本的文本。(字符串,默认值:
<none>
) - script-processor.variables
-
变量绑定作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - 脚本处理器.变量位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<无>
)
6.11. 分路器处理器
splitter 应用程序构建在 Spring Integration 中的同名概念之上,并允许将单个消息拆分为多个不同的消息。
处理器使用一个函数,该函数将 a 作为输入,然后根据各种属性生成 as 输出(见下文)。
您可以使用 SPEL 表达式或分隔符来指定要如何拆分传入消息。Message<?>
List<Message<?>
6.11.1. 负载
-
传入有效负载 -
Message<?
>
如果传入类型为 且内容类型设置为 或 ,则应用程序会将 转换为 。byte[]
text/plain
application/json
byte[]
String
-
传出有效负载 -
List<Message<?>
6.11.2. 选项
- splitter.apply-sequence
-
在 header 中添加关联/序列信息,以方便以后的聚合。(布尔值,默认值:
true
) - splitter.charset
-
将基于文本的文件中的字节转换为 String 时使用的字符集。(字符串,默认值:
<none>
) - splitter.delimiters
-
当 expression 为 null 时,在对 {@link String} 有效负载进行标记时使用的分隔符。(字符串,默认值:
<none>
) - splitter.expression
-
用于拆分有效负载的 SPEL 表达式。(字符串,默认值:
<none>
) - splitter.file-markers 文件标记
-
设置为 true 或 false 以使用包含(或不包含)文件开头/结尾标记的 {@code FileSplitter}(按行拆分基于文本的文件)。(布尔值,默认值:
<none>
) - splitter.markers-json 文件
-
当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:
true
)
6.12. 转换处理器
Transformer 处理器允许您根据 SPEL 表达式转换消息有效负载结构。
下面是如何运行此应用程序的示例。
java -jar transform-processor-kafka-<version>.jar \
--spel.function.expression=payload.toUpperCase()
如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。
6.12.1. 负载
传入消息可以包含任何类型的有效负载。
6.12.2. 选项
- spel.function.expression
-
要应用的 SpEL 表达式。(字符串,默认值:
<none>
)
6.13. Twitter 趋势和趋势位置处理器
可以返回热门主题或热门主题的 Locations 的处理器。
该属性 允许 选择查询类型。twitter.trend.trend-query-type
6.13.1. 在某个位置检索热门话题(可选)
对于此模式,设置为 。twitter.trend.trend-query-type
trend
基于 Trends API 的处理器。 返回特定纬度、经度位置附近的热门主题。
6.13.2. 获取趋势位置
对于此模式,设置为 。twitter.trend.trend-query-type
trendLocation
按位置检索热门主题的完整列表或附近的位置列表。
如果未提供 , 参数,则处理器将执行 Trends Available API 并返回 Twitter 具有其热门主题信息的位置。latitude
longitude
如果提供了 , 参数,则处理器将执行 Trends Closest API 并返回 Twitter 具有其热门主题信息、最接近指定位置的位置。latitude
longitude
响应是一个数组,用于编码位置的 WOEID 和一些其他人类可读的信息,例如位置所属的规范名称和国家/地区。locations
6.13.3. 选项
按前缀分组的属性:
推特.trend.closest
- 纬度
-
如果提供 long 参数,则可用的趋势位置将按距离(距离坐标对最近到最远)进行排序。经度的有效范围为 -180.0 到 +180.0(西为负,东为正)。(表达式,默认值:
<none>
) - 离子
-
如果提供了 lat 参数,则可用的趋势位置将按距离(从最近到最远的距离)进行排序。经度的有效范围为 -180.0 到 +180.0(西为负,东为正)。(表达式,默认值:
<none>
)
推特
- 位置 ID
-
雅虎Where On Earth ID 要返回其趋势信息的位置。全局信息可通过使用 1 作为 WOEID 来获得。(表达式,默认值:
payload
) - 趋势查询类型
-
<缺少文档>(TrendQueryType,默认值:
<none>
,可能的值:trend,trendLocation
)
7. 水槽
7.1. Cassandra 接收器
此接收器应用程序将其收到的每条消息的内容写入 Cassandra。
它需要 JSON String 的有效负载,并使用其属性映射到表列。
7.1.1. 负载
一个 JSON 字符串或字节数组,表示要持久保存的实体(或实体列表)。
7.1.2. 选项
cassandra sink 具有以下选项:
按前缀分组的属性:
cassandra.cluster
- 创建密钥空间
-
在应用程序启动时创建(或不创建)密钥空间的标志。(布尔值,默认值:
false
) - 实体基础包
-
用于扫描使用表注释注释的实体的基础包。(String[],默认值:
[]
) - init-script (初始化脚本)
-
使用 CQL 脚本(以 ';' 分隔)来初始化密钥空间架构的资源。(资源,默认值:
<无>
) - skip-ssl 验证
-
用于验证服务器的 SSL 证书的标志。(布尔值,默认值:
false
)
Cassandra
- 一致性级别
-
写入操作的一致性级别。(ConsistencyLevel,默认值:
<none>
) - 引入查询
-
摄取 Cassandra 查询。(字符串,默认值:
<none>
) - query-type 查询类型
-
Cassandra 接收器的 QueryType。(类型,默认值:
<none>
,可能的值:INSERT,UPDATE,DELETE,STATEMENT
)
- 语句表达式
-
Cassandra 查询 DSL 样式的表达式。(表达式,默认值:
<none>
) - ttl
-
Time-to-live 选项的 WriteOptions 的 WriteOptions 的 Time-to-live 选项。(整数,默认值:
0
)
spring.cassandra
- 压缩
-
Cassandra 二进制协议支持的压缩。(压缩,默认值:
none
,可能的值:LZ4,SNAPPY,NONE) - 配置
-
要使用的配置文件的位置。(资源,默认值:
<无>
) - 触点
-
群集节点地址采用 'host:port' 格式,或简单的 'host' 以使用配置的端口。(List<String>,默认值:
[127.0.0.1:9042]
) - keyspace-name (键空间名称)
-
要使用的 Keyspace 名称。(字符串,默认值:
<none>
) - 本地数据中心
-
被视为 “local” 的数据中心。联系点应来自此数据中心。(字符串,默认值:
<none>
) - 密码
-
服务器的登录密码。(字符串,默认值:
<none>
) - 港口
-
如果联系点未指定端口,则要使用的端口。(整数,默认值:
9042
) - 架构操作
-
启动时要采取的架构操作。(字符串,默认值:
无
) - 会话名称
-
Cassandra 会话的名称。(字符串,默认值:
<none>
) - 用户名
-
服务器的登录用户。(字符串,默认值:
<none>
)
spring.cassandra.connection
- 连接超时
-
建立驱动程序连接时使用的超时。(持续时间,默认:
5 秒
) - init-query-timeout (初始化查询超时)
-
用于作为初始化过程的一部分运行的内部查询的超时,就在打开连接之后。(持续时间,默认:
5 秒
)
spring.cassandra.control连接
- 超时
-
用于控制查询的超时。(持续时间,默认:
5 秒
)
spring.cassandra.pool
- 心跳间隔
-
检测信号间隔,在此间隔之后,将在空闲连接上发送消息,以确保它仍处于活动状态。(持续时间,默认:
30 秒
) - 空闲超时
-
删除空闲连接之前的空闲超时。(持续时间,默认:
5 秒
)
spring.cassandra.request
- 一致性
-
查询一致性级别。(DefaultConsistencyLevel,默认值:
<none>
,可能的值:ANY
、ONE
、TWO
、THREE、
QUORUM、
ALL
LOCAL_ONE
、LOCAL_QUORUM
、EACH_QUORUM
、SERIAL
LOCAL_SERIAL
) - 页面大小
-
在单个网络往返中将同时检索多少行。(整数,默认值:
5000
) - 串行一致性
-
查询串行一致性级别。(DefaultConsistencyLevel,默认值:
<none>
,可能的值:ANY
、ONE
、TWO
、THREE、
QUORUM、
ALL
LOCAL_ONE
、LOCAL_QUORUM
、EACH_QUORUM
、SERIAL
LOCAL_SERIAL
) - 超时
-
驱动程序等待请求完成的时间。(持续时间,默认:
2 秒
)
spring.cassandra.request.throttler
- 排水间隔
-
限制程序尝试取消请求排队的频率。将此设置得足够高,以便每次尝试都将处理队列中的多个条目,但不会将请求延迟太多。(持续时间,默认值:
<无>
) - 最大并发请求数
-
允许并行执行的最大请求数。(整数,默认值:
<none>
) - 最大队列大小
-
超过限制阈值时可以排队的最大请求数。(整数,默认值:
<none>
) - 每秒最大请求数
-
允许的最大请求速率。(整数,默认值:
<none>
) - 类型
-
请求限制类型。(ThrottlerType,默认值:
none
,可能的值:CONCURRENCY_LIMITING,RATE_LIMITING,NONE
)
spring.cassandra.ssl 中
- 捆
-
SSL 捆绑包名称。(字符串,默认值:
<none>
) - 启用
-
是否启用 SSL 支持。(布尔值,默认值:
<none>
)
7.2. Analytics 接收器
Sink 应用程序,构建在 Analytics Consumer 之上,用于计算输入消息的分析,并将分析作为指标发布到各种监控系统。它利用千分尺库在最流行的监控系统中提供统一的编程体验,并公开 Spring 表达式语言 (SpEL) 属性,用于定义如何从输入数据计算指标 Name、Values 和 Tags。
分析接收器可以生成两种指标类型:
仪表(例如 Counter 或 Gauge)由其 和 唯一标识(术语 dimensions 和 tags 可互换使用)。维度允许对特定的命名量度进行切片,以便向下钻取和推理数据。name
dimensions
由于量度由其 和 唯一标识,因此您可以为每个量度分配多个标签(e.g. key/值对),但之后无法随机更改这些标签!如果具有相同名称的指标具有不同的标签集,Prometheus 等监控系统将会抱怨。name dimensions |
使用 or 属性设置输出分析指标的名称。如果未设置,则指标名称默认为应用程序名称。analytics.name
analytics.name-expression
使用 , 属性向量度添加一个或多个标签。在属性定义中使用的将在量度中显示为标记名称。TAG_VALUE 是一个表达式,用于动态计算传入消息的标签值。analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>
TAG_NAME
SpEL
表达式使用 and 关键字来访问消息的标头和有效负载值。SpEL
headers
payload
你可以使用 literals (例如 ) 来设置具有固定值的标签。'fixed value' |
所有 Stream 应用程序都支持三种最流行的监控系统,并且您可以通过声明方式启用它们。
您只需将 micrometer meter-registry 依赖项添加到应用程序中,即可添加对其他监控系统的支持。Wavefront
Prometheus
InfluxDB
Analytics Sink
请访问 Spring Cloud 数据流流监控,了解有关配置监控系统的详细说明。以下快速代码段可以帮助您开始。
-
要启用 Prometheus 计量注册表,请设置以下属性。
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
要启用 Wavefront 仪表注册表,请设置以下属性。
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
要启用 InfluxDB 计量注册表,请设置以下属性。
management.metrics.export.influx.enabled=true management.metrics.export.influx.uri={influxdb-server-url}
如果启用了 Data Flow Server Monitoring,则将重复使用提供的指标配置。Analytics Sink |
下图说明了如何帮助收集股票交易业务内部的实时管道。Analytics Sink
7.2.1. 负载
传入消息可以包含任何类型的有效负载。
7.2.2. 选项
按前缀分组的属性:
分析学
- 金额表达式
-
用于计算输出度量值(例如 amount)的 SPEL 表达式。它默认为 1.0(表达式,默认值:
<none>
) - 米型
-
Micrometer meter 类型,用于向后端报告指标。(MeterType,默认值:
<none>
,可能的值:counter,gauge
) - 名字
-
输出指标的名称。'name' 和 'nameExpression' 是互斥的。只能设置其中一个。(字符串,默认值:
<none>
) - 名称表达式
-
一个 SPEL 表达式,用于计算输入消息的输出指标名称。'name' 和 'nameExpression' 是互斥的。只能设置其中一个。(表达式,默认值:
<none>
)
analytics.tag
- 表达
-
从 SpEL 表达式计算标签。单个 SPEL 表达式可以生成一个值数组,这反过来意味着不同的名称/值标签。每个 name/value 标签都会产生一个单独的计量增量。标签表达式格式为:analytics.tag.expression。[tag-name]=[SPEL 表达式](Map<String,表达式>,默认值:
<none>
) - 固定
-
已弃用:请将 analytics.tag.expression 与文本 SPEL 表达式一起使用。自定义、固定的 Tags。这些标签具有常量值,创建一次,然后与每个发布的指标一起发送。定义固定标记的约定为:<code> analytics.tag.fixed。[标签名称]=[标签值] </code> (Map<String, String>,默认值:
<无>
)
7.3. Elasticsearch 接收器
Sink,用于将文档索引到 Elasticsearch 中。
此 Elasticsearch 接收器仅支持为 JSON 文档编制索引。
它使用来自输入目标的数据,然后将其索引到 Elasticsearch。
输入数据可以是纯 json 字符串,也可以是表示 JSON 的 a。
它还接受 Elasticsearch 提供的数据。
但是,这种情况很少见,因为中间件不太可能将记录保存为 。
这主要用于消费者的直接调用。java.util.Map
XContentBuilder
XContentBuilder
7.3.1. 选项
Elasticsearch 接收器具有以下选项:
按前缀分组的属性:
elasticsearch.consumer
- 异步
-
指示索引操作是否为异步操作。默认情况下,索引是同步完成的。(布尔值,默认值:
false
) - 批量大小
-
每个请求要编制索引的项目数。它默认为 1。对于大于 1 的值,将使用批量索引 API。(整数,默认值:
1
) - 组超时
-
超时(以毫秒为单位),当批量索引处于活动状态时,将刷新消息组。它默认为 -1,这意味着不会自动刷新空闲消息组。(长整型,默认值:
-1
) - 身份证
-
要编制索引的文档的 ID。如果设置,则 INDEX_ID Headers 值将基于每条消息覆盖此属性。(表达式,默认值:
<none>
) - 指数
-
索引的名称。如果设置,则 INDEX_NAME Headers 值将基于每条消息覆盖此属性。(字符串,默认值:
<none>
) - 路由
-
指示要路由到的分片。如果未提供,Elasticsearch 将默认使用文档 ID 的哈希值。(字符串,默认值:
<none>
) - timeout-seconds (超时秒)
-
分片可用的超时时间。如果未设置,则默认为 Elasticsearch 客户端设置的 1 分钟。(长整型,默认值:
0
)
spring.elasticsearch
- 连接超时
-
与 Elasticsearch 通信时使用的连接超时。(持续时间,默认:
1 秒
) - 密码
-
用于使用 Elasticsearch 进行身份验证的密码。(字符串,默认值:
<none>
) - path-prefix (路径前缀)
-
添加到发送到 Elasticsearch 的每个请求的路径的前缀。(字符串,默认值:
<none>
) - 套接字保持活动状态
-
是否开启 client 和 Elasticsearch 之间的 socket keep alive。(布尔值,默认值:
false
) - 套接字超时
-
与 Elasticsearch 通信时使用的套接字超时。(持续时间,默认:
30 秒
) - URI
-
要使用的 Elasticsearch 实例的逗号分隔列表。(List<String>,默认值:
[http://localhost:9200]
) - 用户名
-
用于使用 Elasticsearch 进行身份验证的用户名。(字符串,默认值:
<none>
)
spring.elasticsearch.restclient.sniffer
- 失败后延迟
-
在失败后计划的嗅探执行延迟。(持续时间,默认:
1 分钟
) - 间隔
-
连续普通探查执行之间的间隔。(持续时间,默认:
5 分钟
)
spring.elasticsearch.restclient.ssl
- 捆
-
SSL 捆绑包名称。(字符串,默认值:
<none>
)
7.3.2. 运行此 sink 的示例
-
从文件夹 :
elasticsearch-sink
./mvnw clean package
-
CD 应用程序
-
cd 复制到适当的 Binder 生成的应用程序(Kafka 或 RabbitMQ)
-
./mvnw clean package
-
确保您正在运行 Elasticsearch。例如,您可以使用以下命令将其作为 docker 容器运行。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
-
如果中间件(Kafka 或 RabbitMQ)尚未运行,请启动它。
-
java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing
-
将一些 JSON 数据发送到中间件目标。例如:
{"foo":"bar"}
-
验证数据是否已编制索引:
curl localhost:9200/testing/_search
7.4. 文件接收器
文件接收器应用将其收到的每条消息写入文件。
7.4.1. 负载
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.4.2. 选项
有以下选项:file-sink
- 文件.consumer.binary
-
一个标志,用于指示是否应禁止在写入后添加换行符。(布尔值,默认值:
false
) - 文件.consumer.charset
-
编写文本内容时使用的字符集。(字符串,默认值:
UTF-8
) - 文件.consumer.directory
-
目标文件的父目录。(文件,默认值:
<无>
) - 文件.consumer.directory-expression
-
要为目标文件的父目录计算的表达式。(字符串,默认值:
<none>
) - 文件.consumer.mode
-
如果目标文件已存在,则要使用的 FileExistsMode。(FileExistsMode,默认值:
<none>
,可能的值:APPEND
APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED
) - file.consumer.name
-
目标文件的名称。(字符串,默认值:
file-consumer
) - file.consumer.name-表达式
-
要计算目标文件名称的表达式。(字符串,默认值:
<none>
) - file.consumer.suffix
-
要附加到文件名的后缀。(字符串,默认值:
<空字符串>
)
7.5. FTP 接收器
FTP 接收器是将文件从传入邮件推送到 FTP 服务器的简单选项。
它使用 ,因此传入消息可以是对象、(文件内容)
或 (file content 的数组)。ftp-outbound-adapter
java.io.File
String
bytes
要使用此接收器,您需要用户名和密码才能登录。
默认情况下,如果未指定任何名称,则 Spring Integration 将使用。 将确定文件名
根据 中的标头值(如果存在),或者如果 的有效负载已经是 ,则它将
使用该文件的原始名称。o.s.i.file.DefaultFileNameGenerator DefaultFileNameGenerator file_name MessageHeaders Message java.io.File |
7.5.1. 标头
-
file_name
(见上面的注释)
7.5.2. 负载
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.5.3. 输出
N/A (写入 FTP 服务器)。
7.5.4. 选项
ftp 接收器有以下选项:
按前缀分组的属性:
ftp.consumer 的
- 自动创建目录
-
是否创建远程目录。(布尔值,默认值:
true
) - filename-expression 文件名
-
用于生成远程文件名的 SPEL 表达式。(字符串,默认值:
<none>
) - 模式
-
如果远程文件已存在,则要执行的操作。(FileExistsMode,默认值:
<none>
,可能的值:APPEND
APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED
) - 远程目录
-
远程 FTP 目录。(字符串,默认值:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认值:
/
) - 临时远程目录
-
如果 '#isUseTemporaryFilename()' 为 true,则将写入文件的临时目录。(字符串,默认值:
/
) - tmp-file-suffix 文件后缀
-
传输过程中要使用的后缀。(字符串,默认值:
.tmp
) - 使用临时文件名
-
是否写入临时文件并重命名。(布尔值,默认值:
true
)
ftp.factory
- 缓存会话
-
缓存会话。(布尔值,默认值:
<none>
) - 客户端模式
-
用于 FTP 会话的客户端模式。(ClientMode,默认值:
<none>
,可能的值:ACTIVE,PASSIVE
)
- 主机
-
服务器的主机名。(字符串,默认值:
localhost
) - 密码
-
用于连接到服务器的密码。(字符串,默认值:
<none>
) - 港口
-
服务器的端口。(整数,默认值:
21
) - 用户名
-
用于连接到服务器的用户名。(字符串,默认值:
<none>
)
7.6. JDBC 接收器
JDBC sink 允许您将传入的有效负载持久化到 RDBMS 数据库中。
该属性表示 where (连同冒号) 是可选的成对。
在这种情况下,值是通过生成的表达式(如 )计算的,因此这样我们就可以从对象属性直接映射到表列。
例如,我们有一个 JSON 有效负载,如下所示:jdbc.consumer.columns
COLUMN_NAME[:EXPRESSION_FOR_VALUE]
EXPRESSION_FOR_VALUE
payload.COLUMN_NAME
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
因此,我们可以使用 ,并使用配置将其插入到表中:name
city
street
--jdbc.consumer.columns=name,city:address.city,street:address.street
只要底层 JDBC 驱动程序支持,此接收器就支持批量插入。
批量插入通过 和 属性进行配置:
传入消息将聚合,直到消息出现,然后作为批处理插入。
如果毫秒过去了,没有新消息,则即使聚合批处理小于 ,也会插入聚合批处理,从而限制最大延迟。batch-size
idle-timeout
batch-size
idle-timeout
batch-size
该模块还使用 Spring Boot 的DataSource支持来配置数据库连接,因此诸如etc.之类的属性适用。spring.datasource.url |
7.6.1. 示例
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
--jdbc.consumer.columns=name \
--spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.6.2. 负载
7.6.3. 选项
jdbc sink 具有以下选项:
按前缀分组的属性:
jdbc.consumer
- 批量大小
-
将数据刷新到数据库表的消息数阈值。(整数,默认值:
1
) - 列
-
逗号分隔基于冒号的列名对和要插入/更新的值的 SPEL 表达式。在初始化时使用名称来颁发 DDL。(字符串,默认值:
payload:payload.toString()
) - 空闲超时
-
数据自动刷新到数据库表时的空闲超时(以毫秒为单位)。(长整型,默认值:
-1
) - 初始化
-
'true'、'false' 或表的自定义初始化脚本的位置。(字符串,默认值:
false
) - 表名
-
要写入的表的名称。(字符串,默认值:
messages
)
spring.datasource
- 驱动程序类名称
-
JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认值:
<none>
) - 密码
-
数据库的登录密码。(字符串,默认值:
<none>
) - 网址
-
数据库的 JDBC URL。(字符串,默认值:
<none>
) - 用户名
-
数据库的登录用户名。(字符串,默认值:
<none>
)
7.7. Apache Kafka 接收器
此模块将消息发布到 Apache Kafka。
7.7.1. 选项
kafka sink 具有以下选项:
(请参阅 Spring for Apache Kafka 配置属性的 Spring Boot 文档)
按前缀分组的属性:
kafka.publisher
- 钥匙
-
Kafka 记录键 - 如果提供,则由 keyExpression 覆盖。(字符串,默认值:
<none>
) - 键表达式
-
计算结果为 Kafka 记录键的 SPEL 表达式。(表达式,默认值:
<none>
) - 映射标头
-
将要映射的标头。(String[],默认值:
[*]
) - 分区
-
Kafka 主题分区 - 如果提供,则由 partitionExpression 覆盖。(整数,默认值:
<none>
) - 分区表达式
-
计算结果为 Kafka 主题分区的 SpEL 表达式。(表达式,默认值:
<none>
) - 发送超时
-
Kafka 生产者处理程序应等待发送操作结果的时间。默认为 10 秒。(持续时间,默认:
10 秒
) - 同步
-
如果 Kafka 生产者处理程序应在同步模式下运行,则为 True。(布尔值,默认值:
false
) - 时间戳
-
Kafka 记录时间戳 - 如果提供,则由 timestampExpression 覆盖。(长,默认值:
<none>
) - 时间戳表达式
-
一个计算结果为 Kafka 记录时间戳的 SpEL 表达式。(表达式,默认值:
<none>
) - 主题
-
Kafka 主题 - 如果提供,则由 topicExpression 覆盖。默认为 KafkaTemplate.getDefaultTopic() (String, default:
<none>
) - 主题表达式
-
计算结果为 Kafka 主题的 SPEL 表达式。(表达式,默认值:
<none>
) - 使用模板转换器
-
是否使用模板的消息转换器创建 Kafka 记录。(布尔值,默认值:
false
)
spring.kafka
- 引导服务器
-
用于建立与 Kafka 集群的初始连接的 host:port 对的逗号分隔列表。除非被覆盖,否则适用于所有零部件。(List<String>,默认值:
<none>
) - 客户端 ID
-
发出请求时传递给服务器的 ID。用于服务器端日志记录。(字符串,默认值:
<none>
) - 性能
-
其他属性,生产者和使用者通用,用于配置客户端。(Map<String, String>,默认值:
<none>
)
spring.kafka.producer 中
- ACK 系列
-
生产者要求领导者在将请求视为完成之前收到的确认数。(字符串,默认值:
<none>
) - 批量大小
-
默认批处理大小。较小的批处理大小将使批处理不太常见,并且可能会降低吞吐量(批处理大小为零将完全禁用批处理)。(DataSize,默认值:
<none>
) - 引导服务器
-
用于建立与 Kafka 集群的初始连接的 host:port 对的逗号分隔列表。覆盖生产者的 global 属性。(List<String>,默认值:
<none>
) - 缓冲区内存
-
创建者可用于缓冲等待发送到服务器的记录的总内存大小。(DataSize,默认值:
<none>
) - 客户端 ID
-
发出请求时传递给服务器的 ID。用于服务器端日志记录。(字符串,默认值:
<none>
) - 压缩型
-
创建者生成的所有数据的压缩类型。(字符串,默认值:
<none>
) - 键串码器
-
键的 Serializer 类。(类<?>,默认值:
<无>
) - 性能
-
用于配置客户端的其他特定于创建者的属性。(Map<String, String>,默认值:
<none>
) - 重试
-
当大于零时,启用失败发送的重试。(整数,默认值:
<none>
) - 交易 ID 前缀
-
当非空时,启用对 producer 的事务支持。(字符串,默认值:
<none>
) - 值序列化器
-
值的 Serializer 类。(类<?>,默认值:
<无>
)
spring.kafka.template
- 默认主题
-
消息发送到的默认主题。(字符串,默认值:
<none>
) - 交易 ID 前缀
-
Transaction id prefix (事务 ID 前缀),覆盖创建者工厂中的事务 ID 前缀。(字符串,默认值:
<none>
)
7.8. 日志接收器
sink 使用应用程序记录器输出数据以供检查。log
请理解 sink 使用无类型的处理程序,这会影响实际日志记录的执行方式。
这意味着,如果 content-type 是 textual,则原始有效负载字节将转换为 String,否则将记录原始字节。
请参阅用户指南中的更多信息。log
7.8.1. 选项
日志接收器具有以下选项:
- log.expression
-
一个 SPEL 表达式(针对传入消息),用于评估为记录的消息。(字符串,默认值:
payload
) - log.level 级别
-
记录消息的级别。(级别,默认值:
<none>
,可能的值:FATAL,ERROR,WARN,INFO,DEBUG,TRACE
)
- log.name
-
要使用的 Logger 的名称。(字符串,默认值:
<none>
)
7.9. MongoDB 接收器
此接收器应用程序将传入数据提取到 MongoDB 中。
此应用程序完全基于 ,因此请参阅 Spring Boot MongoDB 支持以获取更多信息。MongoDataAutoConfiguration
7.9.1. 输入
有效载荷
-
任何 POJO
-
String
-
byte[]
7.9.2. 选项
mongodb sink 具有以下选项:
按前缀分组的属性:
mongodb.consumer
- 收集
-
用于存储数据的 MongoDB 集合。(字符串,默认值:
<none>
) - 集合表达式
-
用于评估 MongoDB 集合的 SPEL 表达式。(表达式,默认值:
<none>
)
spring.data.mongodb
- 附加主机
-
其他服务器主机。不能使用 URI 设置,或者如果未指定 'host' 。其他主机将使用默认的 mongo 端口 27017。如果您想使用不同的端口,可以使用 “host:port” 语法。(List<String>,默认值:
<none>
) - 身份验证数据库
-
身份验证数据库名称。(字符串,默认值:
<none>
) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认值:
<none>
) - 数据库
-
数据库名称。覆盖 URI 中的数据库。(字符串,默认值:
<none>
) - 字段命名策略
-
要使用的 FieldNamingStrategy 的完全限定名称。(类<?>,默认值:
<无>
) - 主机
-
Mongo 服务器主机。不能使用 URI 设置。(字符串,默认值:
<none>
) - 密码
-
mongo 服务器的登录密码。不能使用 URI 设置。(字符[],默认值:
<无>
) - 港口
-
Mongo 服务器端口。不能使用 URI 设置。(整数,默认值:
<none>
) - 副本集名称
-
集群所需的副本集名称。不能使用 URI 设置。(字符串,默认值:
<none>
) - URI
-
Mongo 数据库 URI。覆盖主机、端口、用户名和密码。(字符串,默认值:
mongodb://localhost/test
) - 用户名
-
mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认值:
<none>
) - uuid 表示
-
将 UUID 转换为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认值:
java-legacy
,可能的值:UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)
spring.data.mongodb.gridfs
- 桶
-
GridFS 存储桶名称。(字符串,默认值:
<none>
) - 数据库
-
GridFS 数据库名称。(字符串,默认值:
<none>
)
spring.data.mongodb.ssl
- 捆
-
SSL 捆绑包名称。(字符串,默认值:
<none>
) - 启用
-
是否启用 SSL 支持。如果提供了 “bundle”,则自动启用,除非另有说明。(布尔值,默认值:
<none>
)
7.10. MQTT 接收器
此模块将消息发送到 MQTT。
7.10.1. 负载:
-
byte[]
-
String
7.10.2. 选项
mqtt sink 有以下选项:
按前缀分组的属性:
MQTT 协议
- clean-session
-
客户端和服务器是否应在重启和重新连接时记住状态。(布尔值,默认值:
true
) - 连接超时
-
连接超时(以秒为单位)。(整数,默认值:
30
) - 保持活动间隔
-
ping 间隔(以秒为单位)。(整数,默认值:
60
) - 密码
-
连接到代理时使用的密码。(字符串,默认值:
guest
) - 坚持
-
'memory' 或 'file'。(字符串,默认值:
memory
) - 持久性目录
-
Persistence 目录。(字符串,默认值:
/tmp/paho
) - SSL 属性
-
MQTT 客户端 SSL 属性。(Map<String, String>,默认值:
<none>
) - 网址
-
MQTT 代理的位置(逗号分隔的列表)。(String[],默认值:
[tcp://localhost:1883]
) - 用户名
-
连接到 broker 时要使用的用户名。(字符串,默认值:
guest
)
mqtt.consumer
- 异步
-
是否使用 async sends。(布尔值,默认值:
false
) - 字符集
-
用于将 String 有效负载转换为 byte[] 的字符集。(字符串,默认值:
UTF-8
) - 客户端 ID
-
标识客户端。(字符串,默认:
stream.client.id.sink
) - QoS
-
要使用的服务质量。(整数,默认值:
1
) - 保留
-
是否设置 'retained' 标志。(布尔值,默认值:
false
) - 主题
-
接收器将发布到的主题。(字符串,默认:
stream.mqtt
)
7.11. pgcopy 接收器
使用 PostgreSQL COPY 命令将其传入负载写入 RDBMS 的模块。
7.11.1. 输入
头
有效载荷
-
任何
Column expression 将根据消息进行评估,并且表达式通常只与一种类型(例如 Map 或 bean 等)兼容。
7.11.2. 输出
不适用
7.11.3. 选项
jdbc sink 具有以下选项:
- spring.datasource.driver-class-name
-
JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认值:
<none>
) - spring.datasource.password
-
数据库的登录密码。(字符串,默认值:
<none>
) - spring.datasource.url
-
数据库的 JDBC URL。(字符串,默认值:
<none>
) - spring.datasource.username
-
数据库的登录用户名。(字符串,默认值:
<none>
)
该模块还使用 Spring Boot 的DataSource支持来配置数据库连接,因此诸如etc.之类的属性适用。spring.datasource.url |
7.11.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后,你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
要运行集成测试,请在 localhost 上启动 PostgreSQL 数据库:
docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest
7.11.5. 示例
java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.12. RabbitMQ 接收器
该模块向 RabbitMQ 发送消息。
7.12.1. 选项
rabbit sink 具有以下选项:
(有关 RabbitMQ 连接属性,请参阅 Spring Boot 文档)
按前缀分组的属性:
兔
- 转换器 bean 名称
-
自定义消息转换器的 Bean 名称;如果省略,则使用SimpleMessageConverter。如果为 'jsonConverter',将为您创建一个 Jackson2JsonMessageConverter bean。(字符串,默认值:
<none>
) - 交换
-
Exchange name - 如果提供,则由 exchangeNameExpression 覆盖。(字符串,默认值:
<空字符串>
) - 交换表达式
-
计算结果为 exchange 名称的 SPEL 表达式。(表达式,默认值:
<none>
) - 标头映射的最后一个
-
在映射出站消息的报头时,请确定报头是在转换邮件之前还是之后映射的。(布尔值,默认值:
true
) - mapped-request-headers 请求标头
-
将要映射的标头。(String[],默认值:
[*]
) - own-connection
-
如果为 true,则根据引导属性使用单独的连接。(布尔值,默认值:
false
) - 持久传递模式
-
当 'amqp_deliveryMode' 标头不存在时的默认传递模式,对于 PERSISTENT,则为 true。(布尔值,默认值:
false
) - 路由密钥
-
路由密钥 - 如果提供,则由 routingKeyExpression 覆盖。(字符串,默认值:
<none>
) - 路由密钥表达式
-
计算结果为路由密钥的 SPEL 表达式。(表达式,默认值:
<none>
)
spring.rabbitmq 的
- address-shuffle-mode 地址随机模式
-
用于对配置的地址进行随机排序的模式。(AddressShuffleMode,默认值:
none
,可能的值:NONE,RANDOM,INORDER) - 地址
-
客户端应连接到的地址的逗号分隔列表。设置后,将忽略 host 和 port。(字符串,默认值:
<none>
) - 通道 rpc-timeout
-
通道中 RPC 调用的继续超时。将其设为零可永久等待。(持续时间,默认:
10 分钟
) - 连接超时
-
连接超时。将其设为零可永久等待。(持续时间,默认值:
<无>
) - 主机
-
RabbitMQ 主机。如果设置了地址,则忽略。(字符串,默认值:
localhost
) - 密码
-
登录以对代理进行身份验证。(字符串,默认值:
guest
) - 港口
-
RabbitMQ 端口。如果设置了地址,则忽略。默认为 5672,如果启用了 SSL,则为 5671。(整数,默认值:
<none>
) - publisher-confirm-type
-
确认使用的发布者类型。(ConfirmType,默认值:
<none>
,可能的值:SIMPLE,CORRELATED,NONE) - publisher-returns
-
是否启用发布者退货。(布尔值,默认值:
false
) - 请求通道最大值
-
客户端请求的每个连接的通道数。使用 0 表示无限制。(整数,默认值:
2047
) - 请求的心跳
-
请求的检测信号超时;零表示无。如果未指定 duration 后缀,则将使用秒。(持续时间,默认值:
<无>
) - 用户名
-
登录用户以向 broker 进行身份验证。(字符串,默认值:
guest
) - 虚拟主机
-
连接到 broker 时使用的虚拟主机。(字符串,默认值:
<none>
)
spring.rabbitmq.template
- 默认接收队列
-
当 NONE 显式指定时,要从中接收消息的默认队列的名称。(字符串,默认值:
<none>
) - 交换
-
用于发送操作的默认 exchange 的名称。(字符串,默认值:
<空字符串>
) - 命令的
-
是否启用强制消息。(布尔值,默认值:
<none>
) - 接收超时
-
receive() 操作超时。(持续时间,默认值:
<无>
) - 回复超时
-
sendAndReceive() 操作超时。(持续时间,默认值:
<无>
) - 路由密钥
-
用于发送操作的默认路由键的值。(字符串,默认值:
<空字符串>
)
7.13. Redis 接收器
向 Redis 发送消息。
7.13.1. 选项
redis sink 具有以下选项:
按前缀分组的属性:
redis.consumer
- 钥匙
-
存储到键时使用的文本键名称。(字符串,默认值:
<none>
) - 键表达式
-
用于存储到键的 SPEL 表达式。(字符串,默认值:
<none>
) - 队列
-
在队列中存储时使用的文本队列名称。(字符串,默认值:
<none>
) - 队列表达式
-
用于队列的 SpEL 表达式。(字符串,默认值:
<none>
) - 主题
-
发布到主题时要使用的文本主题名称。(字符串,默认值:
<none>
) - 主题表达式
-
用于主题的 SPEL 表达式。(字符串,默认值:
<none>
)
spring.data.redis 的
- 客户端名称
-
要在使用 CLIENT SETNAME 的连接上设置的客户端名称。(字符串,默认值:
<none>
) - 客户端类型
-
要使用的客户端类型。默认情况下,根据 Classpath 自动检测。(ClientType,默认值:
<none>
,可能的值:LETTUCE,JEDIS
)
- 连接超时
-
连接超时。(持续时间,默认值:
<无>
) - 数据库
-
连接工厂使用的数据库索引。(整数,默认值:
0
) - 主机
-
Redis 服务器主机。(字符串,默认值:
localhost
) - 密码
-
redis 服务器的登录密码。(字符串,默认值:
<none>
) - 港口
-
Redis 服务器端口。(整数,默认值:
6379
) - 超时
-
读取超时。(持续时间,默认值:
<无>
) - 网址
-
连接 URL。覆盖主机、端口、用户名和密码。示例:redis://user:[email protected]:6379 (字符串,默认值:
<none>
) - 用户名
-
redis 服务器的登录用户名。(字符串,默认值:
<none>
)
spring.data.redis.cluster
- 最大重定向数
-
在整个集群中执行命令时要遵循的最大重定向数。(整数,默认值:
<none>
) - 节点
-
要从中引导的 “host:port” 对的逗号分隔列表。这表示群集节点的“初始”列表,并且至少需要一个条目。(List<String>,默认值:
<none>
)
spring.data.redis.jedis.pool
- 启用
-
是否启用池。如果“commons-pool2”可用,则自动启用。使用 Jedis,在 Sentinel 模式下隐式启用池化,此设置仅适用于单节点设置。(布尔值,默认值:
<none>
) - 最大活动
-
池在给定时间可分配的最大连接数。使用负值表示无限制。(整数,默认值:
8
) - max-idle (最大空闲)
-
池中 “空闲” 连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认值:
8
) - 最大等待
-
当池耗尽时,连接分配在引发异常之前应阻止的最长时间。使用负值可无限期阻止。(持续时间,默认值:
-1ms
) - 最小空闲
-
目标是要在池中维护的最小空闲连接数。仅当 it 和驱逐运行之间的时间均为正数时,此设置才有效。(整数,默认值:
0
) - 驱逐运行之间的时间
-
空闲对象 evictor 线程运行之间的时间。当为正数时,空闲对象驱逐线程将启动,否则不执行空闲对象驱逐。(持续时间,默认值:
<无>
)
spring.data.redis.lettuce.pool
- 启用
-
是否启用池。如果“commons-pool2”可用,则自动启用。使用 Jedis,在 Sentinel 模式下隐式启用池化,此设置仅适用于单节点设置。(布尔值,默认值:
<none>
) - 最大活动
-
池在给定时间可分配的最大连接数。使用负值表示无限制。(整数,默认值:
8
) - max-idle (最大空闲)
-
池中 “空闲” 连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认值:
8
) - 最大等待
-
当池耗尽时,连接分配在引发异常之前应阻止的最长时间。使用负值可无限期阻止。(持续时间,默认值:
-1ms
) - 最小空闲
-
目标是要在池中维护的最小空闲连接数。仅当 it 和驱逐运行之间的时间均为正数时,此设置才有效。(整数,默认值:
0
) - 驱逐运行之间的时间
-
空闲对象 evictor 线程运行之间的时间。当为正数时,空闲对象驱逐线程将启动,否则不执行空闲对象驱逐。(持续时间,默认值:
<无>
)
spring.data.redis.lettuce
- shutdown-timeout (关闭超时)
-
关闭超时。(持续时间,默认值:
100 毫秒
)
spring.data.redis.sentinel
- 主人
-
Redis 服务器的名称。(字符串,默认值:
<none>
) - 节点
-
以逗号分隔的 “host:port” 对列表。(List<String>,默认值:
<none>
) - 密码
-
用于使用 Sentinel 进行身份验证的密码。(字符串,默认值:
<none>
) - 用户名
-
用于向 Sentinel 进行身份验证的登录用户名。(字符串,默认值:
<none>
)
spring.data.redis.ssl
- 捆
-
SSL 捆绑包名称。(字符串,默认值:
<none>
) - 启用
-
是否启用 SSL 支持。如果提供了 “bundle”,则自动启用,除非另有说明。(布尔值,默认值:
<none>
)
7.14. 路由器接收器
此应用程序将消息路由到命名通道。
7.14.1. 选项
router sink 有以下选项:
- router.default-output-binding 文件
-
将不可路由的消息发送到何处。(字符串,默认值:
<none>
) - router.destination 映射
-
目标映射为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - router.expression 的
-
要应用于消息的表达式,以确定要路由到的通道。请注意,文本、json 或 xml 等内容类型的负载连线格式是 byte[] 而不是 String!。有关如何处理字节数组有效负载内容的文档。(表达式,默认值:
<none>
) - router.refresh-delay
-
检查脚本更改的频率(以毫秒为单位)(如果存在);< 0 表示不刷新。(整数,默认值:
60000
) - router.resolution-required
-
是否需要通道解析。(布尔值,默认值:
false
) - router.script 的
-
返回通道或通道映射解析键的 groovy 脚本的位置。(资源,默认值:
<无>
) - router.variables 的
-
变量绑定作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - router.variables-位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<无>
)
此路由器接收器基于 Spring Cloud Stream 的 API,因此可以根据需要创建目标。
在这种情况下,只有当 key 未包含在 中时,才能到达 a 。
如果没有 map 并且没有声明相应的绑定,则 this 会忽略并引发异常。StreamBridge defaultOutputBinding destinationMappings resolutionRequired = true defaultOutputBinding |
您可以使用 该属性限制动态绑定的创建。
默认情况下,所有解析的目标都将动态绑定;如果此属性具有以逗号分隔的目标名称列表,则只会绑定这些名称。
解析到不在此列表中的目标的邮件将被路由到 ,该目标也必须出现在列表中。spring.cloud.stream.dynamicDestinations
defaultOutputBinding
用于将评估结果映射到实际目标名称。destinationMappings
7.14.2. 基于 SPEL 的路由
该表达式根据消息进行计算,并返回通道名称或通道名称映射的键。
有关更多信息,请参见 Spring 集成参考手册配置通用路由器部分中的“路由器和 Spring 表达式语言(SPEL)”小节。
7.14.3. 基于 Groovy 的路由
还可以使用 Groovy 脚本代替 SpEL 表达式。让我们在文件系统的 “file:/my/path/router.groovy” 或 “classpath:/my/path/router.groovy” 处创建一个 Groovy 脚本:
println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
如果要将变量值传递给脚本,可以使用 variables 选项静态绑定值,或者使用 propertiesLocation 选项选择性地将路径传递给包含绑定的属性文件。 文件中的所有属性都将作为变量提供给脚本。您可以同时指定 variables 和 propertiesLocation,在这种情况下,作为变量提供的任何重复值都会覆盖 propertiesLocation 中提供的值。 请注意,payload 和 headers 是隐式绑定的,以允许您访问消息中包含的数据。
有关更多信息,请参阅 Spring 集成参考手册 Groovy 支持。
7.15. RSocket 接收器
RSocket sink 使用 RSocket 协议的 fire and forget 策略发送数据。
7.15.1. 选项
rsocket sink 有以下选项:
- rsocket.consumer.host
-
RSocket 主机。(字符串,默认值:
localhost
) - rsocket.consumer.port
-
RSocket 端口。(整数,默认值:
7000
) - rsocket.consumer.route
-
用于 RSocket 的路由。(字符串,默认值:
<none>
) - rsocket.consumer.uri 中
-
URI 的 URI 中,该 URI 可用于基于 websocket 的传输。(URI,默认值:
<none>
)
7.16. Amazon S3 接收器
此接收器应用程序支持将对象传输到 Amazon S3 存储桶。
文件负载(和递归目录)将传输到部署应用程序的目录(S3 存储桶)中。remote
local
此接收器接受的消息必须包含以下 as 之一:payload
-
File
,包括用于递归上传的目录; -
InputStream
; -
byte[]
7.16.1. 选项
s3 接收器具有以下选项:
按前缀分组的属性:
s3.consumer
- ACL
-
S3 对象访问控制列表。(ObjectCannedACL,默认值:
<none>
,可能的值:private,public-read,public-read-write,authenticated-read,aws-exec-read,bucket-owner-read,bucket-owner-full-control,null)
- ACL 表达式
-
用于评估 S3 对象访问控制列表的表达式。(表达式,默认值:
<none>
) - 桶
-
用于存储目标文件的 AWS 存储桶。(字符串,默认值:
<none>
) - 存储桶表达式
-
用于评估 AWS 存储桶名称的表达式。(表达式,默认值:
<none>
) - 键表达式
-
用于评估 S3 对象键的表达式。(表达式,默认值:
<none>
)
spring.cloud.aws.credentials
- 访问密钥
-
要用于静态提供程序的访问密钥。(字符串,默认值:
<none>
) - 实例配置文件
-
配置实例配置文件凭证提供程序,无需进一步配置。(布尔值,默认值:
false
) - 轮廓
-
AWS 配置文件。(配置文件,默认值:
<无>
) - 密钥
-
要用于静态提供程序的密钥。(字符串,默认值:
<none>
)
spring.cloud.aws.region
- 实例配置文件
-
配置实例配置文件区域提供程序,无需进一步配置。(布尔值,默认值:
false
) - 轮廓
-
AWS 配置文件。(配置文件,默认值:
<无>
) - 静态的
-
<缺少文档>(字符串,默认值:
<none>
)
spring.cloud.aws.s3 的
- 已启用加速模式
-
在访问 S3 时启用使用加速终端节点的选项。Accelerate 终端节点允许使用 Amazon CloudFront 的全球分布式边缘站点更快地传输对象。(布尔值,默认值:
<none>
) - 已启用校验和验证
-
用于禁用对 S3 中存储的对象的校验和进行验证的选项。(布尔值,默认值:
<none>
) - 已启用 chunked-encoding
-
在对 {@link software.amazon.awssdk.services.s3.model.PutObjectRequest} 和 {@link software.amazon.awssdk.services.s3.model.UploadPartRequest} 的请求负载进行签名时启用使用分块编码的选项。(布尔值,默认值:
<none>
) - 已启用跨区域
-
启用跨区域存储桶访问。(布尔值,默认值:
<none>
) - 端点
-
覆盖默认终端节点。(URI,默认值:
<none>
) - 启用路径样式访问
-
启用使用路径样式访问而不是 DNS 样式访问来访问 S3 对象的选项。DNS 样式访问是首选,因为它将在访问 S3 时实现更好的负载平衡。(布尔值,默认值:
<none>
) - 地区
-
覆盖默认区域。(字符串,默认值:
<none>
) - use-arn-region-enabled 已启用
-
如果 S3 资源 ARN 作为 S3 操作的目标传入,而该操作的区域与客户端配置的区域不同,则必须将此标志设置为“true”,以允许客户端对 ARN 中指定的区域进行跨区域调用,否则将引发异常。(布尔值,默认值:
<none>
)
spring.cloud.aws.s3.crt
- 初始读取缓冲区大小(以字节为单位)
-
配置客户端将用于缓冲从 S3 下载的段的起始缓冲区大小。保持更大的窗口以保持较高的下载吞吐量;除非窗口足够大以容纳多个 Part,否则 Parts 无法并行下载。保持较小的窗口以限制内存中缓冲的数据量。(长,默认值:
<none>
) - 最大并发
-
指定在传输期间应建立的 S3 连接的最大数量。(整数,默认值:
<none>
) - 最小部分大小(字节)
-
设置传输零件的最小零件尺寸。减小最小部分大小会导致多部分传输拆分为大量较小的部分。将此值设置得太低会对传输速度产生负面影响,导致每个部分的额外延迟和网络通信。(长,默认值:
<无>
) - 目标吞吐量 (GBPS)
-
传输请求的目标吞吐量。值越高,意味着将打开的 S3 连接越多。Transfer Manager 能否实现配置的目标吞吐量取决于各种因素,例如环境的网络带宽和配置的“最大并发”。(双精度,默认值:
<无>
)
基于 的目标生成的应用程序可以通过注入到 bean 中的 and/或 进行增强。
有关更多详细信息,请参阅 Spring 集成 AWS 支持。AmazonS3SinkConfiguration
S3MessageHandler.UploadMetadataProvider
S3ProgressListener
S3MessageHandler
7.16.2. Amazon AWS 通用选项
Amazon S3 Sink(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为其基础,其自动配置 类由 Spring Boot 自动使用。 请查阅其文档,了解必需和有用的自动配置属性。
其中一些与 AWS 凭证有关:
-
spring.cloud.aws.credentials.accessKey
-
spring.cloud.aws.credentials.secretKey
-
spring.cloud.aws.credentials.instanceProfile
-
spring.cloud.aws.credentials.profileName
-
spring.cloud.aws.credentials.profilePath
其他用于 AWS 定义:Region
-
cloud.aws.region.auto
-
cloud.aws.region.static
例子
java -jar s3-sink.jar --s3.bucket=/tmp/bar
7.17. SFTP 接收器
SFTP 接收器是将文件从传入邮件推送到 SFTP 服务器的简单选项。
它使用 ,因此传入消息可以是对象、(文件内容)
或 (file content 的数组)。sftp-outbound-adapter
java.io.File
String
bytes
要使用此接收器,您需要用户名和密码才能登录。
默认情况下,如果未指定任何名称,则 Spring Integration 将使用。 将确定文件名
根据 中的标头值(如果存在),或者如果 的有效负载已经是 ,则它将
使用该文件的原始名称。o.s.i.file.DefaultFileNameGenerator DefaultFileNameGenerator file_name MessageHeaders Message java.io.File |
配置选项时,评估的根对象是应用程序上下文,例如。sftp.factory.known-hosts-expression
sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
7.17.1. 输入
头
-
file_name
(见上面的注释)
有效载荷
-
java.io.File
-
java.io.InputStream
-
byte[]
-
String
7.17.2. 输出
N/A(写入 SFTP 服务器)。
7.17.3. 选项
sftp 接收器具有以下选项:
按前缀分组的属性:
sftp.consumer
- 自动创建目录
-
是否创建远程目录。(布尔值,默认值:
true
) - filename-expression 文件名
-
用于生成远程文件名的 SPEL 表达式。(字符串,默认值:
<none>
) - 模式
-
如果远程文件已存在,则要执行的操作。(FileExistsMode,默认值:
<none>
,可能的值:APPEND
APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED
) - 远程目录
-
远程 FTP 目录。(字符串,默认值:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认值:
/
) - 临时远程目录
-
如果 'isUseTemporaryFilename()' 为 true,则将写入文件的临时目录。(字符串,默认值:
/
) - tmp-file-suffix 文件后缀
-
传输过程中要使用的后缀。(字符串,默认值:
.tmp
) - 使用临时文件名
-
是否写入临时文件并重命名。(布尔值,默认值:
true
)
sftp.consumer.factory
- 允许未知密钥
-
如果为 True,则允许未知或更改的键。(布尔值,默认值:
false
) - 缓存会话
-
缓存会话。(布尔值,默认值:
<none>
) - 主机
-
服务器的主机名。(字符串,默认值:
localhost
) - 已知主机表达式
-
解析为已知主机文件位置的 SpEL 表达式。(表达式,默认值:
<none>
) - 密码短语
-
用户私钥的密码。(字符串,默认值:
<空字符串>
) - 密码
-
用于连接到服务器的密码。(字符串,默认值:
<none>
) - 港口
-
服务器的端口。(整数,默认值:
22
) - 私钥
-
用户私钥的资源位置。(资源,默认值:
<无>
) - 用户名
-
用于连接到服务器的用户名。(字符串,默认值:
<none>
)
7.18. TCP 接收器
此模块使用 Encoder 将消息写入 TCP。
TCP 是一种流协议,需要一些机制来在网络上构建消息。许多编码器是 available,默认值为 'CRLF'。
7.18.1. 选项
tcp sink 具有以下选项:
按前缀分组的属性:
tcp.consumer
- 字符集
-
从 bytes 转换为 String 时使用的字符集。(字符串,默认值:
UTF-8
) - 关闭
-
是否在每条消息后关闭套接字。(布尔值,默认值:
false
) - 编码器
-
发送消息时使用的编码器。(编码,默认值:
<none>
,可能的值:CRLF,LF,NULL,STXETX,RAW,L1,L2,L4)
- 主机
-
此接收器将连接到的主机。(字符串,默认值:
<none>
)
TCP 协议
- 蔚来
-
是否使用 NIO。(布尔值,默认值:
false
) - 港口
-
要侦听的端口;0 让操作系统选择一个端口。(整数,默认值:
1234
) - 反向查找
-
对远程 IP 地址执行反向 DNS 查找;如果为 false,则邮件报头中仅包含 IP 地址。(布尔值,默认值:
false
) - 套接字超时
-
未收到数据时关闭套接字之前的超时 (ms)。(整数,默认值:
120000
) - 使用直接缓冲区
-
是否使用直接缓冲区。(布尔值,默认值:
false
)
7.18.2. 可用的编码器
- CRLF (默认)
-
以回车 (0x0d) 后跟换行符 (0x0a) 结尾的文本
- 如果
-
由换行符终止的文本 (0x0a)
- 零
-
以 null 字节 (0x00) 结尾的文本
- STXETX
-
文本前面有 STX (0x02) 并以 ETX (0x03) 结尾
- 生
-
no structure - 客户端通过关闭套接字来指示完整的消息
- L1 系列
-
数据前面有一个单字节(无符号)长度字段(最多支持 255 个字节)
- L2 (二层)
-
数据前面有一个两字节(无符号)长度的字段(最多 2 个16-1 字节)
- L4 系列
-
数据前面有一个四字节(带符号)长度的字段(最多 2 个31-1 字节)
7.19. 吞吐量接收器
Sink,它将对消息进行计数,并按选定的时间间隔记录观察到的吞吐量。
7.19.1. 选项
吞吐量接收器具有以下选项:
- 吞吐量.报告每毫秒
-
报告的频率。(整数,默认值:
1000
)
7.20. Twitter 消息接收器
从身份验证用户向指定用户发送私信。
需要将 JSON POST 正文和标头设置为 。Content-Type
application/json
收到用户的消息后,您可以在 24 小时内发送最多 5 条消息作为响应。 收到的每条消息都会重置 24 小时窗口和分配的 5 条消息。 在 24 小时内发送第 6 条消息或在 24 小时时段外发送消息将计入速率限制。 此行为仅在使用 POST direct_messages/events/new 端点时适用。 |
SPEL 表达式用于计算输入消息中的请求参数。
7.20.1. 选项
使用单引号 () 将表达式属性的文字值括起来。
例如,要设置固定的消息文本,请使用 。
对于固定目标 userId,请使用 。' SpEL text='Fixed Text' userId='666' |
- twitter.message.update.media-id
-
要与消息关联的媒体 ID。私信只能引用一个媒体 ID。(表达式,默认值:
<none>
) - twitter.message.update.screen-name
-
向其发送私信的用户的屏幕名称。(表达式,默认值:
<none>
) - 推特.message.update.text
-
私信文本。URL 编码。最大长度为 10,000 个字符。(表达式,默认值:
payload
) - twitter.message.update.user-id
-
向其发送私信的用户的用户 ID。(表达式,默认值:
<none>
)
7.21. Twitter 更新接收器
更新身份验证用户的当前文本(例如 Tweeting)。
对于每次更新尝试,都会将更新文本与进行身份验证的用户最近的推文进行比较。 任何会导致重复的尝试都将被阻止,从而导致 403 错误。 用户不能连续两次提交相同的文本。 |
虽然不受 API 的速率限制,但用户一次可以创建的推文数量受到限制。 标准 API 的更新限制为 3 小时内 300 个。 如果用户发布的更新数量达到当前允许的限制,此方法将返回 HTTP 403 错误。
您可以在此处找到 Update API 的详细信息:developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.21.1. 选项
按前缀分组的属性:
推特
- 附件 URL
-
(SPEL 表达式)为了使 URL 不计入扩展推文的文本正文中,请提供 URL 作为推文附件。此 URL 必须是推文永久链接或私信深层链接。任意的非 Twitter URL 必须保留在文本文本中。传递给 attachment_url 参数的 URL 与推文永久链接或私信深层链接不匹配,将在创建推文时失败并导致异常。(表达式,默认值:
<none>
) - 显示坐标
-
(SPEL 表达式)是否在发送 Tweet 的确切坐标上放置一个固定点。(表达式,默认值:
<none>
) - in-reply-to-status-id (回复状态 ID)
-
(SPEL 表达式)更新要回复的现有文本的 ID。注意:除非在文本文本中提及此参数引用的推文的作者,否则将忽略此参数。因此,你必须在更新中包含 @username,其中 username 是引用推文的作者。设置 inReplyToStatusId 时,auto_populate_reply_metadata也会自动设置。稍后 确保从原始推文中查找领先@mentions,并从那里添加到新推文中。随着回复链的增长,这会将@mentions附加到扩展推文的元数据中,直到达到@mentions限制。如果原始推文已被删除,则回复将失败。(表达式,默认值:
<none>
) - 媒体 ID
-
(SPEL 表达式)要与推文关联的media_ids的逗号分隔列表。你可以在一条推文中包含最多 4 张照片、1 张动画 GIF 或 1 个视频。有关上传媒体的更多详细信息,请参阅上传媒体。(表达式,默认值:
<none>
) - 地点 ID
-
(SPEL 表达式)世界上的一个地方。(表达式,默认值:
<none>
) - 发短信
-
(SPEL 表达式)文本的文本将更新。URL 编码。t.co 链接换行都会影响字符数。默认为消息的有效负载(表达式,默认值:
payload
)
推特.update.location
- 纬度
-
此推文所指位置的纬度。除非此参数在 -90.0 到 +90.0(北为正)范围内(包括 -90.0 到 +90.0 为正)内,否则将忽略此参数。如果没有相应的 long 参数,它也将被忽略。(表达式,默认值:
<none>
) - 离子
-
此推文所指位置的经度。经度的有效范围为 -180.0 到 +180.0(东为正),包括正数。如果超出该范围、不是数字、禁用geo_enabled或没有相应的 lat 参数,则此参数将被忽略。(表达式,默认值:
<none>
)
7.22. 波前沉
Wavefront 接收器使用 Messages<?>,将其转换为 Wavefront 数据格式的指标,并将指标直接发送到 Wavefront 或 Wavefront 代理。
支持常见的 ETL 使用案例,其中现有(历史)指标数据必须清理、转换并存储在 Wavefront 中以供进一步分析。
7.22.1. 选项
Wavefront sink 具有以下选项:
- wavefront.api-token
-
Wavefront API 访问令牌。(字符串,默认值:
<none>
) - wavefront.metric-表达式
-
计算结果为度量值的 SPEL 表达式。(表达式,默认值:
<none>
) - wavefront.metric-名称
-
指标的名称。默认为应用程序名称。(字符串,默认值:
<none>
) - wavefront.proxy-uri
-
Wavefront 代理的 URL。(字符串,默认值:
<none>
) - wavefront.source
-
发出指标的唯一应用程序、主机、容器或实例。(字符串,默认值:
<none>
) - wavefront.tag-表达式
-
与指标关联的自定义元数据的集合。点标签不能为空。键的有效字符:字母数字、连字符 ('-')、下划线('_')、点 ('.')。对于值,允许使用任何字符,包括空格。要包含双引号,请使用反斜杠对其进行转义,反斜杠不能是标签值中的最后一个字符。点标签键和值组合的最大允许长度为 254 个字符(255 个字符,包括分隔键和值的“=”字符)。如果值较长,则拒绝并记录该点(Map<String、Expression>,默认值:
<none>
) - wavefront.timestamp 表达式
-
一个 SPEL 表达式,计算结果为指标的时间戳(可选)。(表达式,默认值:
<none>
) - wavefront.uri 中
-
Wavefront 环境的 URL。(字符串,默认值:
<none>
)
7.23. Websocket 接收器
一个简单的 Websocket Sink 实现。
7.23.1. 选项
支持以下选项:
- websocket.consumer.log 级
-
netty 通道的 logLevel 。默认值为 <tt>WARN</tt> (String, default:
<none>
) - websocket.consumer.path
-
WebsocketSink 使用者需要连接的路径。默认值为 <tt>/websocket</tt> (String,默认值:
/websocket
) - websocket.consumer.port
-
Netty 服务器侦听的端口。默认值为 <tt>9292</tt> (整数,默认值:
9292
) - websocket.consumer.ssl
-
是否创建一个 {@link io.netty.handler.ssl.SslContext}。(布尔值,默认值:
false
) - websocket.consumer.threads
-
Netty 的线程数 {@link io.netty.channel.EventLoopGroup}。默认值为 <tt>1</tt> (整数,默认值:
1
)
7.23.2. 示例
要验证 websocket-sink 是否接收来自其他 spring-cloud-stream 应用程序的消息,你可以使用 遵循简单的端到端设置。
第 1 步:启动 Rabbitmq
第 2 步:部署time-source
第 3 步:部署websocket-sink
最后,在 mode 中启动 websocket-sink,以便您可以在日志中看到 the 产生的消息:trace
time-source
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
您应该开始在启动 WebsocketSink 的控制台中看到日志消息,如下所示:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.23.3. 执行器
您可以使用它来访问发送和接收的最后一条消息。您必须
通过提供 来启用它。默认情况下,它通过 .下面是一个示例输出:Endpoint
n
--endpoints.websocketconsumertrace.enabled=true
host:port/websocketconsumertrace
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
还有一个简单的 HTML 页面,您可以在其中的文本区域看到转发的消息。您可以访问
它直接通过您的浏览器。host:port
7.24. XMPP 接收器
“xmpp” 接收器允许向 XMPP 服务器发送消息。
7.24.1. 输入
-
byte[]
7.24.2. 输出
有效载荷
不适用
7.24.3. 选项
zeromq sink 有以下选项:
按前缀分组的属性:
xmpp.consumer
- 聊天对象
-
要向其发送消息的 XMPP 句柄。(字符串,默认值:
<none>
)
xmpp.工厂
- 主机
-
要连接的 XMPP 主机服务器。(字符串,默认值:
<none>
) - 密码
-
已连接用户的密码。(字符串,默认值:
<none>
) - 港口
-
用于连接到主机的端口。- 默认客户端端口:5222(整数,默认值:
5222
) - 资源
-
要在 XMPP 主机上绑定到的资源。- 可以为空,如果未设置,服务器将生成一个(字符串,默认值:
<none>
) - 安全模式
-
<缺少文档> (SecurityMode,默认值:
<none>
,可能的值:required,ifpossible,disabled
)
- 服务名称
-
要为 XMPP 域设置的服务名称。(字符串,默认值:
<none>
) - 订阅模式
-
<缺少文档>(SubscriptionMode,默认值:
<none>
,可能的值:accept_all,reject_all,manual)
- 用户
-
连接应连接的 User。(字符串,默认值:
<none>
)
7.24.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
7.25. ZeroMQ 接收器
“zeromq” sink 支持将消息发送到 ZeroMQ 套接字。
7.25.1. 输入
-
byte[]
7.25.2. 输出
有效载荷
不适用
7.25.3. 选项
zeromq sink 有以下选项:
- zeromq.consumer.connect-url
-
用于连接到 ZeroMQ 套接字的连接 URL。(字符串,默认值:
<none>
) - zeromq.consumer.socket类型
-
连接应建立的 Socket Type。(SocketType,默认值:
<none>
,可能的值:PAIR,PUBSUB
,
REQ,REP,DEALER,ROUTER,PULL,PUSH,XPUB,XSUB,STREAM,CLIENT,SERVER,RADIO,DISH,CHANNEL,PEER,RAW,SCATTER,GATHER
)
- zeromq.consumer.topic
-
一个 Topic SpEL 表达式,用于在向订阅者发送消息之前评估主题。(表达式,默认值:
<none>
)
7.25.4. 构建
$ ./mvnw clean install -PgenerateApps
$ cd apps
您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:
$ ./mvnw clean package
7.25.5. 示例
java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=