应用
5. 来源
5.1. CDC 源
变更数据捕获 (CDC),用于从各种数据库捕获和流式传输变更事件。
目前支持 、 、 和 数据库。source
MySQL
PostgreSQL
MongoDB
Oracle
SQL Server
它基于 Debezium Embedded Connector 构建,允许通过不同的消息绑定器(如 Apache Kafka、RabbitMQ 和所有 Spring Cloud Stream 支持代理)捕获和流式传输数据库更改。CDC Source
它支持所有 Debezium 配置属性。只需将前缀添加到现有的 Debezium 属性中即可。例如,要设置 Debezium 的属性,请改用 source 属性。cdc.config.
connector.class
cdc.config.connector.class
我们为最常用的 Debezium 属性提供了方便的快捷方式。例如,您可以使用我们的快捷方式来代替长 Debezium 属性。下表列出了所有可用的快捷键以及它们所代表的 Debezium 属性。
Debezium 属性(例如 )总是优先于快捷键!cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector
cdc.connector=mysql
cdc.config.XXX
CDC 源引入了基于 MetadataStore 服务的新默认配置。Later 提供了各种微服务友好的方式来存储偏移元数据。BackingOffsetStore
5.1.1. 选项
按前缀分组的属性:
美国疾病控制与预防中心
- 配置
-
用于 debezium 配置属性的 Spring pass-trough 包装器。所有带有 'cdc.config.' 前缀的属性都是本机 Debezium 属性。前缀被删除,将它们转换为 Debezium io.debezium.config.Configuration。(Map<String, String>,默认值:
<none>
) - 连接器
-
cdc.config.connector.class 属性的快捷方式。只要它们彼此不矛盾,它们中的任何一个都可以使用。(ConnectorType,默认值:
<none>
,可能的值:mysql,postgres,mongodb,oracle,sqlserver)
- 名字
-
此 sourceConnector 实例的唯一名称。(字符串,默认值:
<none>
) - 图式
-
将架构作为出站消息的一部分。(布尔值,默认值:
false
)
cdc.拼合
- 添加字段
-
要添加到拼合消息的元数据字段的逗号分隔列表。这些字段将以 “__” 或 “__[<]struct]__” 为前缀,具体取决于结构的规范。(字符串,默认值:
<none>
) - 添加标头
-
逗号分隔列表指定要添加到拼合消息标头的元数据字段列表。字段将以 “__” 或 “__[struct]__” 为前缀。(字符串,默认值:
<none>
) - 删除处理模式
-
处理已删除记录的选项:(1) 无 - 传递记录,(2) 删除记录,以及 (3) 重写 - 向记录添加“__deleted”字段。(DeleteHandlingMode,默认值:
<none>
,可能的值:drop,rewrite,none
) - 放置式逻辑删除
-
默认情况下,Debezium 会生成逻辑删除记录,以对已删除的记录启用 Kafka 压缩。dropTombstone 可以隐藏 Tombstone 记录。(布尔值,默认值:
true
) - 启用
-
启用拼合源记录事件 (https://debezium.io/docs/configuration/event-flattening)。(布尔值,默认值:
true
)
cdc.offset
- 提交超时
-
在取消进程并恢复要在将来尝试提交的偏移数据之前,等待刷新记录并将分区偏移数据提交到偏移存储的最大毫秒数。(持续时间,默认:
5000 毫秒
) - flush-interval 刷新间隔
-
尝试提交偏移量的时间间隔。默认值为 1 分钟。(持续时间,默认:
60000 毫秒
) - 政策
-
偏移存储提交策略。(OffsetPolicy,默认值:
<none>
) - 存储
-
Kafka 连接器跟踪已处理记录的数量,并定期将计数(作为“偏移量”)存储在预配置的元数据存储中。重新启动时,连接器会从最后记录的源偏移量恢复读数。(OffsetStorageType,默认值:
<none>
,可能的值:memory,file,kafka,metadata
)
cdc.stream.header
- convert-connect-headers (转换连接标头)
-
当 true 时,{@link org.apache.kafka.connect.header.Header} 将转换为消息头,其中 {@link org.apache.kafka.connect.header.Header#key()} 作为名称,{@link org.apache.kafka.connect.header.Header#value()}。(布尔值,默认值:
true
) - 抵消
-
将源记录的偏移元数据序列化到 cdc.offset 下的出站消息标头中。(布尔值,默认值:
false
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认值:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认值:
25
) - read-capacity (读取容量)
-
读取表上的 capacity。(长,默认值:
1
) - 桌子
-
元数据的表名称。(字符串,默认值:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认值:
<none>
) - 写入容量
-
表上的写入容量。(长,默认值:
1
)
metadata.store
- 类型
-
指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:
<none>
,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)
元数据.store.zookeeper
- 连接字符串
-
HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:
127.0.0.1:2181
) - 编码
-
在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:
UTF-8
) - 重试间隔
-
Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:
1000
) - 根
-
根节点 - 存储条目是此节点的子项。(字符串,默认值:
/SpringIntegration-MetadataStore
)
Debezium 属性快捷方式映射
下表列出了所有可用的快捷键以及它们所代表的 Debezium 属性。
捷径 | 源语言 | 描述 |
---|---|---|
cdc.连接器 |
cdc.config.connector.class |
|
cdc.name |
cdc.config.name |
|
cdc.offset.flush-interval |
cdc.config.offset.flush.interval.ms |
|
cdc.offset.commit-timeout |
cdc.config.offset.flush.timeout.ms |
|
cdc.offset.policy |
cdc.config.offset.commit.policy |
|
cdc.offset.storage |
cdc.config.offset.storage |
|
cdc.flattening.drop-tombstones |
cdc.config.drop.tombstones |
|
cdc.flattening.delete-handling-mode |
cdc.config.delete.handling.mode |
|
5.1.2. 数据库支持
它使用 Debezium 实用程序,目前支持五个数据存储的 CDC:、、 和 数据库。CDC Source
MySQL
PostgreSQL
MongoDB
Oracle
SQL Server
5.1.3. 示例和测试
[CdcSourceIntegrationTest]()、[CdcDeleteHandlingIntegrationTest]() 和 [CdcFlatteningIntegrationTest]() 集成测试使用在本地计算机上运行的测试数据库夹具。
我们使用预构建的 debezium docker 数据库镜像。
Maven 构建在 .docker-maven-plugin
要从 IDE 运行和调试测试,您需要从命令行部署所需的数据库映像。 以下说明介绍了如何从 Docker 映像运行预配置的测试数据库。
MySQL (MySQL的
在 docker 中启动 :debezium/example-mysql
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0
(可选)使用 client 连接到数据库并创建具有所需凭据的用户:
|
使用以下属性将 CDC 源连接到 MySQL DB:
cdc.connector=mysql (1)
cdc.name=my-sql-connector (2)
cdc.config.database.server.id=85744 (2)
cdc.config.database.server.name=my-app-connector (2)
cdc.config.database.user=debezium (3)
cdc.config.database.password=dbz (3)
cdc.config.database.hostname=localhost (3)
cdc.config.database.port=3306 (3)
cdc.schema=true (4)
cdc.flattening.enabled=true (5)
1 | 将 CDC 源配置为使用 MySqlConnector。(相当于 设置 )。cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector |
2 | 用于标识和调度传入事件的元数据。 |
3 | 连接到以 用户身份运行的 MySQL 服务器。localhost:3306 debezium |
4 | 在事件中包含 Change Event Value 架构。SourceRecord |
5 | 启用 CDC 事件平展。 |
您也可以使用此 mysql 配置运行。CdcSourceIntegrationTests#CdcMysqlTests
PostgreSQL 数据库
从 Docker 镜像启动预配置的 postgres 服务器:debezium/example-postgres:1.0
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0
您可以像这样连接到此服务器:
psql -U postgres -h localhost -p 5432
使用以下属性将 CDC 源连接到 PostgreSQL:
cdc.connector=postgres (1)
cdc.offset.storage=memory (2)
cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)
cdc.config.database.user=postgres (4)
cdc.config.database.password=postgres (4)
cdc.config.database..dbname=postgres (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=5432 (4)
cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 | 配置为使用 PostgresConnector。等效于设置 。CDC Source cdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector |
2 | 配置 Debezium 引擎以使用(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)后备偏移存储。memory |
3 | 用于标识和调度传入事件的元数据。 |
4 | 连接到以 用户身份运行的 PostgreSQL 服务器。localhost:5432 postgres |
5 | 在事件中包含 Change Event Value 架构。SourceRecord |
6 | 启用 CDC 事件平展。 |
您也可以使用此 mysql 配置运行。CdcSourceIntegrationTests#CdcPostgresTests
MongoDB 数据库
从 Docker 镜像启动预配置的 mongodb:debezium/example-mongodb:0.10
docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz debezium/example-mongodb:0.10
初始化清单集合
docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'
在终端输出中,搜索类似 :mongodb
host: "3f95a8a6516e:27017"
2019-01-10T13:46:10.004+0000 I COMMAND [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms
将条目添加到您的127.0.0.1 3f95a8a6516e
/etc/hosts
使用以下属性将 CDC 源连接到 MongoDB:
cdc.connector=mongodb (1)
cdc.offset.storage=memory (2)
cdc.config.mongodb.hosts=rs0/localhost:27017 (3)
cdc.config.mongodb.name=dbserver1 (3)
cdc.config.mongodb.user=debezium (3)
cdc.config.mongodb.password=dbz (3)
cdc.config.database.whitelist=inventory (3)
cdc.config.tasks.max=1 (4)
cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 | 配置为使用 MongoDB Connector。这会映射到 .CDC Source cdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector |
2 | 配置 Debezium 引擎以使用(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)后备偏移存储。memory |
3 | 连接到以 user.localhost:27017 debezium |
4 | debezium.io/docs/connectors/mongodb/#tasks |
5 | 在事件中包含 Change Event Value 架构。SourceRecord |
6 | 启用 CDC 事件平展。 |
您也可以使用此 mysql 配置运行。CdcSourceIntegrationTests#CdcPostgresTests
SQL 服务器
从 Docker 镜像启动 a:sqlserver
debezium/example-postgres:1.0
docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2
使用 debezium 的 sqlserver 教程中的示例数据进行填充:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
使用以下属性将 CDC 源连接到 SQLServer:
cdc.connector=sqlserver (1)
cdc.offset.storage=memory (2)
cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)
cdc.config.database.user=sa (4)
cdc.config.database.password=Password! (4)
cdc.config.database..dbname=testDB (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=1433 (4)
1 | 配置为使用 SqlServerConnector。等效于设置 。CDC Source cdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector |
2 | 配置 Debezium 引擎以使用(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)后备偏移存储。memory |
3 | 用于标识和调度传入事件的元数据。 |
4 | 连接到以用户身份运行的 SQL Server。localhost:1433 sa |
您也可以使用此 mysql 配置运行。CdcSourceIntegrationTests#CdcSqlServerTests
神谕
从 localhost 启动 Oracle reachable,并使用 Debezium Vagrant 设置中描述的配置、用户和授权进行设置
使用 Debezium 的 Oracle 教程中的示例数据进行填充:
wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
5.2. 文件源
此应用程序轮询目录并将新文件或其内容发送到 output 通道。 默认情况下,file 源以字节数组的形式提供 File 的内容。 但是,这可以使用 --file.supplier.mode 选项进行自定义:
-
ref 提供 java.io.File 引用
-
lines 将逐行拆分文件并为每行发出一条新消息
-
contents 默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 true,则底层 FileSplitter 将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项 withMarkers 默认为 false。--file.supplier.mode=lines
--file.supplier.withMarkers=true
FileSplitter.FileMarker
5.2.1. 选项
文件源具有以下选项:
按前缀分组的属性:
file.consumer 文件
- 标记 JSON
-
当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:
<none>
,可能的值:ref,lines,contents
)
- with-markers
-
设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:
<none>
)
文件.supplier
- empty-时延迟
-
未检测到新文件时的延迟持续时间。(持续时间,默认:
1 秒
) - 目录
-
用于轮询新文件的目录。(文件,默认值:
<无>
) - 文件名模式
-
用于匹配文件的简单 ant 模式。(字符串,默认值:
<none>
) - 文件名-regex
-
用于匹配文件的正则表达式模式。(模式,默认值:
<无>
) - 防止重复
-
设置为 true 以包含防止重复的 AcceptOnceFileListFilter。(布尔值,默认值:
true
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认值:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认值:
25
) - read-capacity (读取容量)
-
读取表上的 capacity。(长,默认值:
1
) - 桌子
-
元数据的表名称。(字符串,默认值:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认值:
<none>
) - 写入容量
-
表上的写入容量。(长,默认值:
1
)
5.3. FTP 源
此源应用程序支持使用 FTP 协议传输文件。
文件将从目录传输到部署应用程序的目录。
默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是
使用选项进行自定义:remote
local
--mode
-
裁判提供参考
java.io.File
-
线将逐行拆分文件,并为每行发出一条新消息
-
内容默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头和文件结尾标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines
--withMarkers=true
true
FileSplitter
FileSplitter.FileMarker
withMarkers
false
另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。
5.3.2. 输出
5.3.3. 选项
ftp 源具有以下选项:
按前缀分组的属性:
file.consumer 文件
- 标记 JSON
-
当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:
<none>
,可能的值:ref,lines,contents
)
- with-markers
-
设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:
<none>
)
ftp.factory
- 缓存会话
-
缓存会话。(布尔值,默认值:
<none>
) - 客户端模式
-
用于 FTP 会话的客户端模式。(ClientMode,默认值:
<none>
,可能的值:ACTIVE,PASSIVE
)
- 主机
-
服务器的主机名。(字符串,默认值:
localhost
) - 密码
-
用于连接到服务器的密码。(字符串,默认值:
<none>
) - 港口
-
服务器的端口。(整数,默认值:
21
) - 用户名
-
用于连接到服务器的用户名。(字符串,默认值:
<none>
)
ftp.supplier
- 自动创建本地目录
-
如果本地目录不存在,则设置为 true 以创建本地目录。(布尔值,默认值:
true
) - empty-时延迟
-
未检测到新文件时的延迟持续时间。(持续时间,默认:
1 秒
) - 删除远程文件
-
设置为 true 可在成功传输后删除远程文件。(布尔值,默认值:
false
) - 文件名模式
-
用于匹配要传输的文件的名称的筛选模式。(字符串,默认值:
<none>
) - 文件名-regex
-
用于匹配要传输的文件的名称的筛选器正则表达式模式。(模式,默认值:
<无>
) - 本地目录
-
用于文件传输的本地目录。(文件,默认值:
<无>
) - preserve-timestamp
-
设置为 true 可保留原始时间戳。(布尔值,默认值:
true
) - 远程目录
-
远程 FTP 目录。(字符串,默认值:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认值:
/
) - tmp-file-suffix 文件后缀
-
传输过程中要使用的后缀。(字符串,默认值:
.tmp
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认值:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认值:
25
) - read-capacity (读取容量)
-
读取表上的 capacity。(长,默认值:
1
) - 桌子
-
元数据的表名称。(字符串,默认值:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认值:
<none>
) - 写入容量
-
表上的写入容量。(长,默认值:
1
)
5.3.4. 示例
java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
--ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo
sources.adoc 中未解析的指令 - include::https://raw.githubusercontent.com/spring-cloud/stream-applications/main/applications/source/geode-source/README.adoc[tags=ref-doc]
5.4. Http 源
一个源应用程序,用于侦听 HTTP 请求并将正文作为消息负载发出。
如果 Content-Type 与 或 匹配,则有效负载将为 String,
否则,有效负载将是一个字节数组。text/*
application/json
5.5. JDBC 源码
此源从 RDBMS 轮询数据。
此源代码完全基于 ,因此请参阅 Spring Boot JDBC 支持以获取更多信息。DataSourceAutoConfiguration
5.5.2. 选项
jdbc 源具有以下选项:
按前缀分组的属性:
jdbc.supplier
- 最大行数
-
要为查询处理的最大行数。(整数,默认值:
0
) - 查询
-
用于选择数据的查询。(字符串,默认值:
<none>
) - 分裂
-
是否将 SQL 结果拆分为单个消息。(布尔值,默认值:
true
) - 更新
-
一个 SQL 更新语句,用于将轮询的消息标记为 “seen”。(字符串,默认值:
<none>
)
spring.datasource
- 驱动程序类名称
-
JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认值:
<none>
) - 密码
-
数据库的登录密码。(字符串,默认值:
<none>
) - 网址
-
数据库的 JDBC URL。(字符串,默认值:
<none>
) - 用户名
-
数据库的登录用户名。(字符串,默认值:
<none>
)
spring.integration.poller
- cron (定时)
-
Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:
<none>
) - 固定延迟
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:
<无>
) - 固定利率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:
<无>
) - 初始延迟
-
轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:
<无>
) - 最大每次轮询消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认值:
<none>
) - 接收超时
-
轮询消息等待多长时间。(持续时间,默认:
1 秒
)
另请参阅 Spring Boot 文档,了解其他属性和轮询选项。DataSource
TriggerProperties
MaxMessagesProperties
5.6. JMS 源
JMS 源允许从 JMS 接收消息。
5.6.1. 选项
JMS 源具有以下选项:
按前缀分组的属性:
jms.supplier
- 客户端 ID
-
长期订阅的客户端 ID。(字符串,默认值:
<none>
) - 目的地
-
接收消息的目标 (队列或主题)。(字符串,默认值:
<none>
) - 消息选择器
-
消息的选择器。(字符串,默认值:
<none>
) - 会话事务处理
-
如果为true,则启用事务并选择DefaultMessageListenerContainer,如果为false,则选择SimpleMessageListenerContainer。(布尔值,默认值:
true
) - subscription-durable
-
对于长期订阅,为 True。(布尔值,默认值:
<none>
) - 订阅名称
-
长期订阅或共享订阅的名称。(字符串,默认值:
<none>
) - 订阅共享
-
对于共享订阅,则为 True。(布尔值,默认值:
<none>
)
5.8. 邮件源
一个源应用程序,用于侦听 Emails 并将消息正文作为消息负载发出。
5.8.1. 选项
邮件源具有以下选项:
- mail.supplier.charset
-
用于 byte[] mail-to-string 转换的 charset。(字符串,默认值:
UTF-8
) - 邮件.supplier.delete
-
设置为 true 可在下载后删除电子邮件。(布尔值,默认值:
false
) - mail.supplier.expression
-
配置 SpEL 表达式以选择消息。(字符串,默认值:
true
) - 邮件.supplier.idle-IMAP
-
设置为 true 以使用 IdleImap 配置。(布尔值,默认值:
false
) - mail.supplier.java 邮件属性
-
JavaMail 属性作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - mail.supplier.mark-as-read
-
设置为 true 可将电子邮件标记为已读。(布尔值,默认值:
false
) - 邮件.supplier.url
-
用于连接到邮件服务器的邮件连接 URL,例如 'imaps://username:[email protected]:993/Inbox'。(URLName,默认值:
<none>
) - mail.supplier.user-flag
-
当服务器不支持 \Recently 时标记邮件的标志。(字符串,默认值:
<none>
)
5.9. MongoDB 源
此源轮询来自 MongoDB 的数据。
此源完全基于 ,因此请参阅 Spring Boot MongoDB 支持以获取更多信息。MongoDataAutoConfiguration
5.9.1. 选项
mongodb 源具有以下选项:
按前缀分组的属性:
mongodb.supplier
- 收集
-
要查询的 MongoDB 集合。(字符串,默认值:
<none>
) - 查询
-
MongoDB 查询。(字符串,默认值:
{ }
) - 查询表达式
-
MongoDB 中的 SpEL 表达式查询 DSL 样式。(表达式,默认值:
<none>
) - 分裂
-
是否将查询结果拆分为单个消息。(布尔值,默认值:
true
) - update-expression (更新表达式)
-
MongoDB 中的 SpEL 表达式更新 DSL 样式。(表达式,默认值:
<none>
)
spring.data.mongodb
- 附加主机
-
其他服务器主机。不能使用 URI 设置,或者如果未指定 'host' 。其他主机将使用默认的 mongo 端口 27017,如果您想使用不同的端口,可以使用 “host:port” 语法。(List<String>,默认值:
<none>
) - 身份验证数据库
-
身份验证数据库名称。(字符串,默认值:
<none>
) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认值:
<none>
) - 数据库
-
数据库名称。(字符串,默认值:
<none>
) - 字段命名策略
-
要使用的 FieldNamingStrategy 的完全限定名称。(类<?>,默认值:
<无>
) - 主机
-
Mongo 服务器主机。不能使用 URI 设置。(字符串,默认值:
<none>
) - 密码
-
mongo 服务器的登录密码。不能使用 URI 设置。(字符[],默认值:
<无>
) - 港口
-
Mongo 服务器端口。不能使用 URI 设置。(整数,默认值:
<none>
) - 副本集名称
-
集群所需的副本集名称。不能使用 URI 设置。(字符串,默认值:
<none>
) - URI
-
Mongo 数据库 URI。覆盖主机、端口、用户名、密码和数据库。(字符串,默认值:
mongodb://localhost/test
) - 用户名
-
mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认值:
<none>
) - uuid 表示
-
将 UUID 转换为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认值:
java-legacy
,可能的值:UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)
另请参阅 Spring Boot 文档以获取其他属性。
请参阅 和 了解轮询选项。MongoProperties
TriggerProperties
5.10. MQTT 源码
允许从 MQTT 接收消息的 Source。
5.10.2. 选项
mqtt 源具有以下选项:
按前缀分组的属性:
MQTT 协议
- clean-session
-
客户端和服务器是否应在重启和重新连接时记住状态。(布尔值,默认值:
true
) - 连接超时
-
连接超时(以秒为单位)。(整数,默认值:
30
) - 保持活动间隔
-
ping 间隔(以秒为单位)。(整数,默认值:
60
) - 密码
-
连接到代理时使用的密码。(字符串,默认值:
guest
) - 坚持
-
'memory' 或 'file'。(字符串,默认值:
memory
) - 持久性目录
-
Persistence 目录。(字符串,默认值:
/tmp/paho
) - SSL 属性
-
MQTT 客户端 SSL 属性。(Map<String, String>,默认值:
<none>
) - 网址
-
MQTT 代理的位置(逗号分隔的列表)。(String[],默认值:
[tcp://localhost:1883]
) - 用户名
-
连接到 broker 时要使用的用户名。(字符串,默认值:
guest
)
5.11. RabbitMQ 源码
“rabbit” 源允许从 RabbitMQ 接收消息。
在部署流之前,队列必须存在;它们不是自动创建的。 您可以使用 RabbitMQ Web UI 轻松创建队列。
5.11.3. 选项
rabbit 源具有以下选项:
按前缀分组的属性:
rabbit.supplier 的
- enable-retry (启用重试)
-
如果为 true,则启用重试。(布尔值,默认值:
false
) - 初始重试间隔
-
启用重试时的初始重试间隔。(整数,默认值:
1000
) - mapped-request-headers 请求标头
-
将要映射的标头。(String[],默认值:
[STANDARD_REQUEST_HEADERS]
) - 最大尝试次数
-
启用重试时的最大传递尝试次数。(整数,默认值:
3
) - 最大重试间隔
-
启用重试时的最大重试间隔。(整数,默认值:
30000
) - own-connection
-
如果为 true,则根据引导属性使用单独的连接。(布尔值,默认值:
false
) - 队列
-
源将侦听消息的队列。(String[],默认值:
<none>
) - 重新排队
-
是否应将被拒绝的邮件重新排队。(布尔值,默认值:
true
) - 重试乘数
-
启用 retry 时重试回退乘数。(双精度,默认值:
2
) - 交易
-
通道是否事务处理。(布尔值,默认值:
false
)
spring.rabbitmq 的
- address-shuffle-mode 地址随机模式
-
用于对配置的地址进行随机排序的模式。(AddressShuffleMode,默认值:
none
,可能的值:NONE,RANDOM,INORDER) - 地址
-
客户端应连接到的地址的逗号分隔列表。设置后,将忽略 host 和 port。(字符串,默认值:
<none>
) - 通道 rpc-timeout
-
通道中 RPC 调用的继续超时。将其设为零可永久等待。(持续时间,默认:
10 分钟
) - 连接超时
-
连接超时。将其设为零可永久等待。(持续时间,默认值:
<无>
) - 主机
-
RabbitMQ 主机。如果设置了地址,则忽略。(字符串,默认值:
localhost
) - 密码
-
登录以对代理进行身份验证。(字符串,默认值:
guest
) - 港口
-
RabbitMQ 端口。如果设置了地址,则忽略。默认为 5672,如果启用了 SSL,则为 5671。(整数,默认值:
<none>
) - publisher-confirm-type
-
确认使用的发布者类型。(ConfirmType,默认值:
<none>
,可能的值:SIMPLE,CORRELATED,NONE) - publisher-returns
-
是否启用发布者退货。(布尔值,默认值:
false
) - 请求通道最大值
-
客户端请求的每个连接的通道数。使用 0 表示无限制。(整数,默认值:
2047
) - 请求的心跳
-
请求的检测信号超时;零表示无。如果未指定 duration 后缀,则将使用秒。(持续时间,默认值:
<无>
) - 用户名
-
登录用户以向 broker 进行身份验证。(字符串,默认值:
guest
) - 虚拟主机
-
连接到 broker 时使用的虚拟主机。(字符串,默认值:
<none>
)
另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的其他属性。
关于重试的说明
使用默认的 ackMode (AUTO) 和 requeue (true) 选项,将重试失败的消息传送
无限期。
由于 rabbit source 中没有太多的处理,因此 source 本身失败的风险很小,除非
由于某种原因,下游未连接。
将 requeue 设置为 false 将导致邮件在第一次尝试时被拒绝(并可能发送到 Dead Letter
Exchange/Queue(如果代理是这样配置的)。
的 enableRetry 选项允许配置重试参数,以便可以重试失败的消息投放,并且
最终在重试用尽时丢弃(或死信)。
传递线程在重试间隔期间暂停。
重试选项包括 enableRetry、maxAttempts、initialRetryInterval、retryMultiplier 和 maxRetryInterval。
消息传送失败并显示 MessageConversionException 永远不会重试;假设是,如果消息
无法转换,则后续尝试也将失败。
此类消息将被丢弃(或死信)。Binder |
5.12. Amazon S3 源
此源应用程序支持使用 Amazon S3 协议传输文件。
文件从目录 (S3 存储桶) 传输到部署应用程序的目录。remote
local
默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是
使用选项进行自定义:--mode
-
裁判提供参考
java.io.File
-
线将逐行拆分文件,并为每行发出一条新消息
-
内容默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头和文件结尾标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines
--withMarkers=true
true
FileSplitter
FileSplitter.FileMarker
withMarkers
false
另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。
模式 = 行
5.12.3. 选项
s3 源具有以下选项:
按前缀分组的属性:
file.consumer 文件
- 标记 JSON
-
当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:
<none>
,可能的值:ref,lines,contents
)
- with-markers
-
设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:
<none>
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认值:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认值:
25
) - read-capacity (读取容量)
-
读取表上的 capacity。(长,默认值:
1
) - 桌子
-
元数据的表名称。(字符串,默认值:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认值:
<none>
) - 写入容量
-
表上的写入容量。(长,默认值:
1
)
metadata.store
- 类型
-
指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:
<none>
,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)
元数据.store.zookeeper
- 连接字符串
-
HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:
127.0.0.1:2181
) - 编码
-
在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:
UTF-8
) - 重试间隔
-
Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:
1000
) - 根
-
根节点 - 存储条目是此节点的子项。(字符串,默认值:
/SpringIntegration-MetadataStore
)
s3.common
- 端点 URL
-
用于连接到 s3 兼容存储的可选终端节点 URL。(字符串,默认值:
<none>
) - path-style-access (路径样式访问)
-
使用路径样式访问。(布尔值,默认值:
false
)
s3.supplier
- 自动创建本地目录
-
创建或不创建本地目录。(布尔值,默认值:
true
) - 删除远程文件
-
处理后删除或不删除远程文件。(布尔值,默认值:
false
) - 文件名模式
-
用于筛选远程文件的模式。(字符串,默认值:
<none>
) - 文件名-regex
-
用于过滤远程文件的 regexp。(模式,默认值:
<无>
) - 仅列表
-
设置为 true 可返回 s3 对象元数据,而不将文件复制到本地目录。(布尔值,默认值:
false
) - 本地目录
-
用于存储文件的本地目录。(文件,默认值:
<无>
) - preserve-timestamp
-
将远程文件的时间戳传输到本地文件。(布尔值,默认值:
true
) - 远程目录
-
AWS S3 存储桶资源。(字符串,默认值:
bucket
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认值:
/
) - tmp-file-suffix 文件后缀
-
临时文件后缀。(字符串,默认值:
.tmp
)
5.12.4. Amazon AWS 通用选项
Amazon S3 源(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为其基础,其自动配置 类由 Spring Boot 自动使用。 请查阅其文档,了解必需和有用的自动配置属性。
其中一些与 AWS 凭证有关:
-
cloud.aws.credentials.accessKey
-
cloud.aws.credentials.secretKey
-
cloud.aws.credentials.instance配置文件
-
cloud.aws.credentials.profileName
-
cloud.aws.credentials.profilePath
其他用于 AWS 定义:Region
-
cloud.aws.region.auto
-
cloud.aws.region.static
对于 AWS:Stack
-
cloud.aws.stack.auto
-
cloud.aws.stack.name
5.13. SFTP 源
此源应用程序支持使用 SFTP 协议传输文件。
文件将从目录传输到部署应用程序的目录。
默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是
使用选项进行自定义:remote
local
--mode
-
裁判提供参考
java.io.File
-
线将逐行拆分文件,并为每行发出一条新消息
-
内容默认值。以字节数组的形式提供文件的内容
使用 时,您还可以提供附加选项 。
如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头和文件结尾标记消息。
这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines
--withMarkers=true
true
FileSplitter
FileSplitter.FileMarker
withMarkers
false
有关高级配置选项,请参阅 sftp-supplier
。
另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。
5.13.2. 输出
mode = 内容
模式 = 行
5.13.3. 选项
ftp 源具有以下选项:
按前缀分组的属性:
file.consumer 文件
- 标记 JSON
-
当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:
true
) - 模式
-
用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:
<none>
,可能的值:ref,lines,contents
)
- with-markers
-
设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:
<none>
)
元数据.store.dynamo-db
- 创建延迟
-
创建表重试之间的延迟。(整数,默认值:
1
) - 创建重试
-
创建表请求的重试次数。(整数,默认值:
25
) - read-capacity (读取容量)
-
读取表上的 capacity。(长,默认值:
1
) - 桌子
-
元数据的表名称。(字符串,默认值:
<none>
) - 生存时间
-
表条目的 TTL。(整数,默认值:
<none>
) - 写入容量
-
表上的写入容量。(长,默认值:
1
)
metadata.store
- 类型
-
指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:
<none>
,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory)
元数据.store.zookeeper
- 连接字符串
-
HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:
127.0.0.1:2181
) - 编码
-
在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:
UTF-8
) - 重试间隔
-
Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:
1000
) - 根
-
根节点 - 存储条目是此节点的子项。(字符串,默认值:
/SpringIntegration-MetadataStore
)
sftp.supplier
- 自动创建本地目录
-
如果本地目录不存在,则设置为 true 以创建本地目录。(布尔值,默认值:
true
) - empty-时延迟
-
未检测到新文件时的延迟持续时间。(持续时间,默认:
1 秒
) - 删除远程文件
-
设置为 true 可在成功传输后删除远程文件。(布尔值,默认值:
false
) - 目录
-
工厂 “name.directory” 对的列表。(String[],默认值:
<none>
) - 工厂
-
工厂名称到工厂的映射。(Map<String,Factory>,默认值:
<none>
) - 公平
-
True 表示多个服务器/目录的公平轮换。默认情况下,这是 false,因此如果源具有多个条目,则将在访问其他源之前收到这些条目。(布尔值,默认值:
false
) - 文件名模式
-
用于匹配要传输的文件的名称的筛选模式。(字符串,默认值:
<none>
) - 文件名-regex
-
用于匹配要传输的文件的名称的筛选器正则表达式模式。(模式,默认值:
<无>
) - 仅列表
-
设置为 true 可返回文件元数据,而不返回整个负载。(布尔值,默认值:
false
) - 本地目录
-
用于文件传输的本地目录。(文件,默认值:
<无>
) - max-fetch (最大获取)
-
每次轮询要获取的最大远程文件数;默认 unlimited 。在列出文件或构建任务启动请求时不适用。(整数,默认值:
<none>
) - preserve-timestamp
-
设置为 true 可保留原始时间戳。(布尔值,默认值:
true
) - 远程目录
-
远程 FTP 目录。(字符串,默认值:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认值:
/
) - 重命名远程文件
-
成功传输后,必须将解析为新名称 remote files 的 SPEL 表达式重命名为该表达式。(表达式,默认值:
<none>
) - 流
-
设置为 true 可流式传输文件,而不是复制到本地目录。(布尔值,默认值:
false
) - tmp-file-suffix 文件后缀
-
传输过程中要使用的后缀。(字符串,默认值:
.tmp
)
5.14. 系统日志
syslog 源通过 UDP 和/或 TCP 接收 SYSLOG 数据包。支持 RFC3164 (BSD) 和 RFC5424 格式。
5.14.1. 选项
- syslog.supplier.buffer-size
-
解码消息时使用的缓冲区大小;较大的邮件将被拒绝。(整数,默认值:
2048
) - syslog.supplier.nio
-
是否使用 NIO(当支持大量连接时)。(布尔值,默认值:
false
) - syslog.supplier.port
-
要侦听的端口。(整数,默认值:
1514
) - syslog.supplier.protocol
-
用于 SYSLOG 的协议(tcp 或 udp)。(协议,默认值:
<none>
,可能的值:tcp,udp,both
)
- syslog.supplier.reverse-lookup
-
是否对 incoming socket 执行反向查找。(布尔值,默认值:
false
) - syslog.supplier.rfc
-
'5424' 或 '3164' - 根据 RFC 的 syslog 格式;3164 又名“BSD”格式。(字符串,默认值:
3164
) - syslog.supplier.socket-超时
-
套接字超时。(整数,默认值:
0
)
5.15. TCP 协议
源充当服务器,并允许远程方连接到它并通过原始 tcp 套接字提交数据。tcp
TCP 是一种流协议,需要一些机制来在网络上构建消息。许多解码器是 available,默认值为 'CRLF',它与 Telnet 兼容。
TCP 源应用程序生成的消息具有有效负载。byte[]
5.15.1. 选项
按前缀分组的属性:
5.15.2. 可用的解码器
- CRLF (默认)
-
以回车 (0x0d) 后跟换行符 (0x0a) 结尾的文本
- 如果
-
由换行符终止的文本 (0x0a)
- 零
-
以 null 字节 (0x00) 结尾的文本
- STXETX
-
文本前面有 STX (0x02) 并以 ETX (0x03) 结尾
- 生
-
no structure - 客户端通过关闭套接字来指示完整的消息
- L1 系列
-
数据前面有一个单字节(无符号)长度字段(最多支持 255 个字节)
- L2 (二层)
-
数据前面有一个两字节(无符号)长度的字段(最多 2 个16-1 字节)
- L4 系列
-
数据前面有一个四字节(带符号)长度的字段(最多 2 个31-1 字节)
5.16. 时间源
时间源将每隔一段时间简单地发出一个包含当前时间的 String。
5.16.1. 选项
时间源具有以下选项:
- spring.integration.poller.cron
-
Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:
<none>
) - spring.integration.poller.fixed-delay
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:
<无>
) - spring.integration.poller.fixed-rate
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:
<无>
) - spring.integration.poller.initial-delay
-
轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:
<无>
) - 每次轮询 spring.integration.poller.max 条消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认值:
<none>
) - spring.integration.poller.receive-timeout
-
轮询消息等待多长时间。(持续时间,默认:
1 秒
) - time.date-格式
-
日期值的格式。(字符串,默认值:
MM/dd/yy HH:mm:ss
)
5.17. Twitter 消息源
重复检索过去 30 天内的直接消息(发送和接收),按时间倒序排序。
已释放的消息将缓存(在缓存中)以防止重复。
默认情况下,使用 in-memory。MetadataStore
SimpleMetadataStore
控制消息的数量或返回的消息。twitter.message.source.count
这些属性控制消息轮询间隔。
必须与使用的 API 速率限制保持一致spring.cloud.stream.poller
5.17.1. 选项
按前缀分组的属性:
spring.integration.poller
- cron (定时)
-
Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:
<none>
) - 固定延迟
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:
<无>
) - 固定利率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:
<无>
) - 初始延迟
-
轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:
<无>
) - 最大每次轮询消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认值:
<none>
) - 接收超时
-
轮询消息等待多长时间。(持续时间,默认:
1 秒
)
推特连接
- 访问令牌
-
您的 Twitter 令牌。(字符串,默认值:
<none>
) - 访问令牌密钥
-
您的 Twitter 令牌密钥。(字符串,默认值:
<none>
) - consumer-key (使用者密钥)
-
您的 Twitter 密钥。(字符串,默认值:
<none>
) - 消费者密钥
-
您的 Twitter 秘密。(字符串,默认值:
<none>
) - 已启用调试
-
启用 Twitter4J 调试模式。(布尔值,默认值:
false
) - raw-json 格式
-
启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认值:
true
)
5.18. Twitter 搜索源
Twitter 的标准搜索 API(搜索/推文)允许对最近或热门推文的索引进行简单查询。这提供了对过去 7 天内发布的最近推文样本的连续搜索。“公共”API 集的一部分。Source
返回与指定查询匹配的相关推文的集合。
使用属性可控制连续搜索请求之间的间隔。速率限制 - 每 30 分钟窗口 180 个请求(例如 ~6 r/m,~ 1 个请求/10 秒)spring.cloud.stream.poller
查询属性允许按关键字进行查询,并按时间和地理位置筛选结果。twitter.search
和 根据搜索 API 控制结果分页。twitter.search.count
twitter.search.page
注意:Twitter 的搜索服务以及 Search API 并不是详尽的推文来源。并非所有推文都会被编入索引或通过搜索界面提供。
5.18.1. 选项
按前缀分组的属性:
spring.integration.poller
- cron (定时)
-
Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:
<none>
) - 固定延迟
-
轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:
<无>
) - 固定利率
-
轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:
<无>
) - 初始延迟
-
轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:
<无>
) - 最大每次轮询消息数
-
每个轮询周期要轮询的最大消息数。(整数,默认值:
<none>
) - 接收超时
-
轮询消息等待多长时间。(持续时间,默认:
1 秒
)
推特连接
- 访问令牌
-
您的 Twitter 令牌。(字符串,默认值:
<none>
) - 访问令牌密钥
-
您的 Twitter 令牌密钥。(字符串,默认值:
<none>
) - consumer-key (使用者密钥)
-
您的 Twitter 密钥。(字符串,默认值:
<none>
) - 消费者密钥
-
您的 Twitter 秘密。(字符串,默认值:
<none>
) - 已启用调试
-
启用 Twitter4J 调试模式。(布尔值,默认值:
false
) - raw-json 格式
-
启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认值:
true
)
推特搜索
- 计数
-
每个页面要返回的推文数(例如,每个请求),最多 100 条。(整数,默认值:
100
) - 朗
-
将搜索的推文限制为给定的语言,由 http://en.wikipedia.org/wiki/ISO_639-1 指定。(字符串,默认值:
<none>
) - 页
-
在再次从最近的推文开始搜索之前,要向后搜索(从最新到最早的推文)的页面数(例如请求)。向后搜索的推文总数为 (page * count) (整数,默认值:
3
) - 查询
-
按搜索查询字符串搜索推文。(字符串,默认值:
<none>
) - 从最近空响应重新启动
-
从最新的 tweets 重新开始搜索 empty response。仅在第一次重启后应用(例如,当 since_id != UNBOUNDED 时)(布尔值,默认值:
false
) - result-type
-
指定您希望接收的搜索结果类型。当前默认值为 “mixed”。有效值包括:mixed :在响应中同时包含常用结果和实时结果。recent :仅返回响应中的最新结果 popular :仅返回响应中最受欢迎的结果(ResultType,默认值:
<none>
,可能的值:popular,mixed,recent)
- 因为
-
如果指定,则返回自给定日期以来的推文。日期的格式应为 YYYY-MM-DD。(字符串,默认值:
<none>
)
5.19. Twitter 流源
-
返回与一个或多个筛选条件谓词匹配的公有状态。 多个参数允许使用与 Streaming API 的单个连接。 提示:、 、 和 字段与 OR 运算符组合在一起! 查询并返回与 OR 匹配的推文 由 用户 创建。
Filter API
track
follow
locations
track=foo
follow=1234
test
1234
-
这将返回所有公共状态的小型随机样本。 默认访问级别返回的推文是相同的,因此,如果两个不同的客户端连接到此终端节点,它们将看到相同的推文。
Sample API
默认访问级别最多允许 400 个跟踪关键词、5,000 个关注用户 ID 和 25 个 0.1-360 度位置框。
5.19.1. 选项
按前缀分组的属性:
推特连接
- 访问令牌
-
您的 Twitter 令牌。(字符串,默认值:
<none>
) - 访问令牌密钥
-
您的 Twitter 令牌密钥。(字符串,默认值:
<none>
) - consumer-key (使用者密钥)
-
您的 Twitter 密钥。(字符串,默认值:
<none>
) - 消费者密钥
-
您的 Twitter 秘密。(字符串,默认值:
<none>
) - 已启用调试
-
启用 Twitter4J 调试模式。(布尔值,默认值:
false
) - raw-json 格式
-
启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认值:
true
)
推特.stream.filter
- 计数
-
指示在过渡到实时流之前要流式传输的先前状态的数量。(整数,默认值:
0
) - filter-level (过滤器级别)
-
筛选条件级别将流中显示的推文限制为具有最小 filterLevel 属性值的推文。none (无)、low (低) 或 medium (中等) 之一。(FilterLevel,默认值:
<none>
) - 跟随
-
按 ID 指定要从中接收公共推文的用户。(List<Long>,默认值:
<none>
) - 语言
-
指定流的推文语言。(List<String>,默认值:
<none>
) - 地点
-
要跟踪的位置。内部表示为 2D 数组。边界框无效:52.38、4.90、51.51、-0.12。第一对必须是框的 SW 角(List<BoundingBox>,默认值:
<none>
) - 跟踪
-
指定要跟踪的关键字。(List<String>,默认值:
<none>
)
5.21. ZeroMQ 源码
“zeromq” 源允许从 ZeroMQ 接收消息。
5.21.3. 选项
zeromq 源具有以下选项:
- zeromq.supplier.bind-port
-
Bind Port 用于创建 ZeroMQ Socket;0 选择随机端口。(整数,默认值:
0
) - zeromq.supplier.connect-url
-
ZeroMQ 套接字的连接 URL。(字符串,默认值:
<none>
) - zeromq.supplier.consume-delay
-
未收到数据时从 ZeroMQ Socket 消耗的延迟。(持续时间,默认:
1 秒
) - zeromq.supplier.socket类型
-
连接应进行的 Socket Type。(SocketType,默认值:
<none>
,可能的值:PAIR,PUBSUBREQ,REP,DEALER,ROUTER,PULL,PUSH,XPUB,XSUB,STREAM) - zeromq.supplier.topics
-
要订阅的主题。(String[],默认值:
[]
)
另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的其他属性。
6. 处理器
6.1. 聚合器处理器
聚合器处理器使应用程序能够将传入消息聚合到组中,并将其发布到输出目标中。
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。
6.1.2. 选项
按前缀分组的属性:
聚合
- 集合体
-
聚合策略的 SpEL 表达式。默认值是有效负载的集合。(表达式,默认值:
<none>
) - 相关
-
相关键的 SPEL 表达式。默认为 correlationId 标头。(表达式,默认值:
<none>
) - 组超时
-
超时到过期未完成的组的 SPEL 表达式。(表达式,默认值:
<none>
) - 消息存储实体
-
持久性消息存储实体:RDBMS 中的表前缀、MongoDB 中的集合名称等(字符串,默认值:
<none>
) - 消息存储类型
-
消息存储类型。(字符串,默认值:
<none>
) - 释放
-
用于发布策略的 SPEL 表达式。默认基于 sequenceSize 标头。(表达式,默认值:
<none>
)
spring.data.mongodb
- 身份验证数据库
-
身份验证数据库名称。(字符串,默认值:
<none>
) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认值:
<none>
) - 数据库
-
数据库名称。(字符串,默认值:
<none>
) - 字段命名策略
-
要使用的 FieldNamingStrategy 的完全限定名称。(类<?>,默认值:
<无>
) - 网格 fs-数据库
-
<缺少文档>(字符串,默认值:
<none>
) - 主机
-
Mongo 服务器主机。不能使用 URI 设置。(字符串,默认值:
<none>
) - 密码
-
mongo 服务器的登录密码。不能使用 URI 设置。(字符[],默认值:
<无>
) - 港口
-
Mongo 服务器端口。不能使用 URI 设置。(整数,默认值:
<none>
) - 副本集名称
-
集群所需的副本集名称。不能使用 URI 设置。(字符串,默认值:
<none>
) - URI
-
Mongo 数据库 URI。不能使用 host、port、credentials 和 replica set name 进行设置。(字符串,默认值:
mongodb://localhost/test
) - 用户名
-
mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认值:
<none>
) - uuid 表示
-
将 UUID 转换为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认值:
java-legacy
,可能的值:UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)
spring.datasource
- 出错时继续
-
初始化数据库时发生错误时是否停止。(布尔值,默认值:
false
) - 数据
-
数据 (DML) 脚本资源引用。(List<String>,默认值:
<none>
) - 数据密码
-
用于执行 DML 脚本的数据库密码(如果不同)。(字符串,默认值:
<none>
) - 数据用户名
-
执行 DML 脚本的数据库的用户名(如果不同)。(字符串,默认值:
<none>
) - 驱动程序类名称
-
JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认值:
<none>
) - 嵌入式数据库连接
-
嵌入式数据库的连接详细信息。默认为 Classpath 上可用的最合适的嵌入式数据库。(EmbeddedDatabaseConnection,默认值:
<none>
,可能的值:NONE,H2,DERBY,HSQL,HSQLDB) - 生成唯一名称
-
是否生成随机数据源名称。(布尔值,默认值:
true
) - 初始化模式
-
在确定是否应使用可用的 DDL 和 DML 脚本执行 DataSource 初始化时应用的模式。(DataSourceInitializationMode,默认值:
embedded
,可能的值:ALWAYS,EMBEDDED,NEVER
) - jndi-名称
-
数据源的 JNDI 位置。设置时,将忽略 Class、url、username 和 password。(字符串,默认值:
<none>
) - 名字
-
如果 “generate-unique-name” 为 false,则要使用的数据源名称。使用嵌入式数据库时默认为 “testdb”,否则为 null。(字符串,默认值:
<none>
) - 密码
-
数据库的登录密码。(字符串,默认值:
<none>
) - 平台
-
在 DDL 或 DML 脚本中使用的平台(例如 schema-${platform}.sql 或 data-${platform}.sql)。(字符串,默认值:
all
) - 图式
-
架构 (DDL) 脚本资源引用。(List<String>,默认值:
<none>
) - 架构密码
-
用于执行 DDL 脚本的数据库密码(如果不同)。(字符串,默认值:
<none>
) - 架构用户名
-
执行 DDL 脚本的数据库的用户名(如果不同)。(字符串,默认值:
<none>
) - 分隔符
-
SQL 初始化脚本中的语句分隔符。(字符串,默认值:
;
) - sql-script-encoding
-
SQL 脚本编码。(字符集,默认值:
<无>
) - 类型
-
要使用的连接池实现的完全限定名称。默认情况下,它是从 Classpath 中自动检测到的。(类<DataSource>,默认值:
<none>
) - 网址
-
数据库的 JDBC URL。(字符串,默认值:
<none>
) - 用户名
-
数据库的登录用户名。(字符串,默认值:
<none>
)
spring.mongodb.embedded
- 特征
-
要启用的功能的逗号分隔列表。默认情况下,使用已配置版本的默认值。(Set<Feature>,默认值:
[sync_delay]
) - 版本
-
要使用的 Mongo 版本。(字符串,默认值:
3.5.5
)
spring.redis 的
- 客户端名称
-
要在使用 CLIENT SETNAME 的连接上设置的客户端名称。(字符串,默认值:
<none>
) - 客户端类型
-
要使用的客户端类型。默认情况下,根据 Classpath 自动检测。(ClientType,默认值:
<none>
,可能的值:LETTUCE,JEDIS
)
- 连接超时
-
连接超时。(持续时间,默认值:
<无>
) - 数据库
-
连接工厂使用的数据库索引。(整数,默认值:
0
) - 主机
-
Redis 服务器主机。(字符串,默认值:
localhost
) - 密码
-
redis 服务器的登录密码。(字符串,默认值:
<none>
) - 港口
-
Redis 服务器端口。(整数,默认值:
6379
) - SSL协议
-
是否启用 SSL 支持。(布尔值,默认值:
false
) - 超时
-
读取超时。(持续时间,默认值:
<无>
) - 网址
-
连接 URL。覆盖 host、port 和 password。忽略 User。示例:redis://user:[email protected]:6379 (字符串,默认值:
<none>
) - 用户名
-
redis 服务器的登录用户名。(字符串,默认值:
<none>
)
6.3. Filter处理器
筛选器处理器使应用程序能够检查传入的有效负载,然后对其应用谓词,以决定是否需要继续记录。
例如,如果传入的有效负载是 type 并且您想要过滤掉少于 5 个字符的任何内容,则可以运行过滤器处理器,如下所示。String
java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4
如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。
6.5. Header Enricher 处理器
使用 header-enricher 应用程序添加消息标头。
标头以换行分隔的键值对的形式提供,其中键是标头名称,值是 SpEL 表达式。
例如。--headers='foo=payload.someProperty \n bar=payload.otherProperty'
6.6. Http 请求处理器
一个处理器应用程序,它向 HTTP 资源发出请求并将响应正文作为消息负载发出。
6.6.1. 输入
头
任何必需的 HTTP 标头都必须通过 or 属性显式设置。请参阅下面的示例。
标头值也可用于构造:headers
headers-expression
-
在属性中引用时的请求正文。
body-expression
-
在属性中引用的 HTTP 方法。
http-method-expression
-
在属性中引用时的 URL。
url-expression
有效载荷
默认情况下,有效负载用作 POST 请求的请求正文,可以是任何 Java 类型。 对于 GET 请求,它应为空 String。 payload 还可用于构造:
-
在属性中引用时的请求正文。
body-expression
-
在属性中引用的 HTTP 方法。
http-method-expression
-
在属性中引用时的 URL。
url-expression
底层 WebClient 支持 Jackson JSON 序列化,以在必要时支持任何请求和响应类型。
默认情况下,该属性可以设置为应用程序类路径中的任何类。
请注意,用户定义的有效负载类型需要向 pom 文件添加所需的依赖项。expected-response-type
String.class
6.6.2. 输出
有效载荷
原始输出对象是 ResponseEntity<?>它的任何字段(例如、、)或访问器方法 () 都可以作为 .
默认情况下,出站 Message 负载是响应正文。
请注意, ResponseEntity (由表达式引用) 默认情况下不能由 Jackson 反序列化,但可以呈现为 .body
headers
statusCode
reply-expression
#root
HashMap
6.6.4. 选项
按前缀分组的属性:
http.request 请求
- 身体表情
-
一个 SPEL 表达式,用于从传入消息派生请求正文。(表达式,默认值:
<none>
) - 预期响应类型
-
用于解释响应的类型。(类<?>,默认值:
<无>
) - headers-表达式
-
用于派生要使用的 http 标头映射的 SPEL 表达式。(表达式,默认值:
<none>
) - http-method-expression 表达式
-
一个 SpEL 表达式,用于从传入消息中派生请求方法。(表达式,默认值:
<none>
) - 回复表达式
-
用于计算最终结果的 SPEL 表达式,应用于整个 http {@link org.springframework.http.ResponseEntity}。(表达式,默认值:
<none>
) - 超时
-
请求超时(以毫秒为单位)。(长,默认值:
30000
) - url 表达式
-
针对传入消息的 SPEL 表达式,用于确定要使用的 URL。(表达式,默认值:
<none>
)
6.7. 图像识别处理器
使用 Inception 模型进行分类的处理器 在实时图像中分为不同的类别(例如标签)。
模型的输入是二进制数组的图像。
输出是以下格式的 JSON 消息:
{
"labels" : [
{"giant panda":0.98649305}
]
}
Result 包含已识别类别的名称(例如 label)以及图像表示此类别的置信度(例如 confidence)。
如果 the 设置为大于 1 的值,则结果将包含最可能的标签。例如,将返回:response-seize
response-seize
response-size=3
{
"labels": [
{"giant panda":0.98649305},
{"badger":0.010562794},
{"ice bear":0.001130851}
]
}
有效载荷
如果传入类型为 且内容类型设置为 ,则应用程序会将输入图像处理为并输出增强图像有效负载和 json 标头。byte[]
application/octet-stream
byte[]
byte[]
6.7.2. 选项
- image.recognition.cache-model
-
缓存预先训练的 TensorFlow 模型。(布尔值,默认值:
true
) - image.recognition.debug-output
-
<缺少文档> (布尔值,默认值:
false
) - image.recognition.debug-output-path
-
<缺少文档> (字符串,默认值:
image-recognition-result.png
) - image.recognition.model
-
预先训练的 TensorFlow 图像识别模型。请注意,模型必须与所选模型类型匹配!(字符串,默认值:
https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pb
) - image.recognition.model-type
-
支持三种不同的预训练 tensorflow 图像识别模型:Inception、MobileNetV1 和 MobileNetV2 1。Inception Graph 使用 'input' 作为输入,使用 'output' 作为输出。2. MobileNetV2 预训练模型:https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - 标准化图像大小始终为正方形(例如 H=W) - 图形使用“input”作为输入,“MobilenetV2/Predictions/Reshape_1”作为输出。3. MobileNetV1 预训练模型:https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - 图形使用“input”作为输入,“MobilenetV1/Predictions/Reshape_1”作为输出。(ModelType,默认值:
<none>
,可能的值:inception,mobilenetv1,mobilenetv2
) - image.recognition.normalized-image-size
-
标准化图像大小。(整数,默认值:
224
) - image.recognition.response-size
-
已识别图像的数量。(整数,默认值:
5
)
6.8. 对象检测处理器
Object Detection 处理器为 TensorFlow Object Detection API 提供开箱即用的支持。它允许在单个图像或图像流中实时定位和识别多个对象。Object Detection 处理器构建在对象检测功能之上。
以下是一些合理的配置默认值:
-
object.detection.model
:storage.googleapis.com/scdf-tensorflow-models/object-detection/faster_rcnn_resnet101_coco_2018_01_28_frozen_inference_graph.pb
-
object.detection.labels
:storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt
-
object.detection.with-masks
:false
下图显示了一个 Spring Cloud Data Flow,即流式管道,它实时预测输入图像流中的对象类型。
处理器的输入是一个图像字节数组,输出是一个增强的图像,以及一个名为 的标头,它提供检测到的对象的文本描述:detected_objects
{
"labels" : [
{"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
{"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
{"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
{"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
]
}
标头格式为:detected_objects
-
object-name:confidence - 检测到的对象(例如标签)的人类可读名称,其置信度为 [0-1] 之间的浮点数
-
x1, y1, x2, y2 - 响应还提供表示为 的检测到的对象的边界框。坐标是相对于图像大小的大小的。
(x1, y1, x2, y2)
-
cid - 在提供的标签配置文件中定义的分类标识符。
有效载荷
传入类型为 ,内容类型为 。处理器处理输入图像并输出增强图像有效负载和 JSON 标头 ()。byte[]
application/octet-stream
byte[]
byte[]
detected_objects
6.8.2. 选项
- 对象.detection.cache-model
-
<缺少文档> (布尔值,默认值:
true
) - 对象.detection.confidence
-
<缺少文档> (浮点数,默认值:
0.4
) - object.detection.debug-output
-
<缺少文档> (布尔值,默认值:
false
) - object.detection.debug-output-path
-
<缺少文档> (字符串,默认值:
object-detection-result.png
) - 对象.detection.labels
-
标签 URI。(字符串,默认值:
https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxt
) - 对象.detection.model
-
预先训练的 TensorFlow 对象检测模型。(字符串,默认值:
https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pb
) - object.detection.response-size 对象
-
<缺少文档> (整数,默认值:
<none>
) - 对象.detection.with-masks
-
<缺少文档> (布尔值,默认值:
false
)
6.9. 语义分割处理器
基于最先进的 DeepLab Tensorflow 模型的图像语义分割。
这是将图像的每个像素与类标签(如花朵、人物、道路、天空、海洋或汽车)相关联的过程。
与生成实例感知区域掩码的 不同,生成类感知掩码的 。
有关实施的信息,请改用对象检测服务。Semantic Segmentation
Instance Segmentation
Semantic Segmentation
Instance Segmentation
它使用语义分割函数库和 TensorFlow 服务。Semantic Segmentation Processor
有效载荷
传入类型为 ,内容类型为 。处理器处理输入图像并输出增强图像有效负载和 json 标头。byte[]
application/octet-stream
byte[]
byte[]
处理器的输入是一个图像字节数组,输出是一个增强的图像字节数组,以及一个格式为 JSON 标头:semantic_segmentation
[
[ 0, 0, 0 ],
[ 127, 127, 127 ],
[ 255, 255, 255 ]
...
]
输出标头 json 格式表示从输入图像计算的彩色像素图。
6.9.2. 选项
- 语义.segmentation.color-map-uri
-
每个预训练模型都基于特定的对象颜色映射。预定义的选项包括: - classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json (字符串,默认值:
classpath:/colormap/citymap_colormap.json
) - 语义.segmentation.debug-output
-
save output image 在本地 debugOutputPath 路径中。(布尔值,默认值:
false
) - semantic.segmentation.debug-output-path
-
<缺少文档> (字符串,默认值:
semantic-segmentation-result.png
) - semantic.segmentation.mask-transparency
-
计算的分段蒙版图像的 Alpha 颜色。(浮点数,默认值:
0.45
) - 语义.segmentation.model
-
预先训练的 TensorFlow 语义分割模型。(字符串,默认值:
https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pb
) - semantic.segmentation.output-type
-
指定输出图像类型。您可以返回带有计算的蒙版叠加的输入图像,也可以单独返回蒙版。(OutputType,默认值:
<none>
,可能的值:blended,mask
)
6.10. 脚本处理器
使用脚本转换消息的处理器。脚本正文是直接提供的 作为属性值。可以指定脚本的语言 (groovy/javascript/ruby/python)。
6.10.1. 选项
script-processor 处理器具有以下选项:
- script-processor.language 语言
-
script 属性中文本的语言。支持:groovy、javascript、ruby、python。(字符串,默认值:
<none>
) - script-processor.脚本
-
脚本的文本。(字符串,默认值:
<none>
) - script-processor.variables
-
变量绑定作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - 脚本处理器.变量位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<无>
)
6.11. 分路器处理器
splitter 应用程序构建在 Spring Integration 中的同名概念之上,并允许将单个消息拆分为多个不同的消息。
处理器使用一个函数,该函数将 a 作为输入,然后根据各种属性生成 as 输出(见下文)。
您可以使用 SPEL 表达式或分隔符来指定要如何拆分传入消息。Message<?>
List<Message<?>
有效载荷
-
传入有效负载 -
Message<?
>
如果传入类型为 且内容类型设置为 或 ,则应用程序会将 转换为 。byte[]
text/plain
application/json
byte[]
String
-
传出有效负载 -
List<Message<?>
6.11.2. 选项
- splitter.apply-sequence
-
在 header 中添加关联/序列信息,以方便以后的聚合。(布尔值,默认值:
true
) - splitter.charset
-
将基于文本的文件中的字节转换为 String 时使用的字符集。(字符串,默认值:
<none>
) - splitter.delimiters
-
当 expression 为 null 时,在对 {@link String} 有效负载进行标记时使用的分隔符。(字符串,默认值:
<none>
) - splitter.expression
-
用于拆分有效负载的 SPEL 表达式。(字符串,默认值:
<none>
) - splitter.file-markers 文件标记
-
设置为 true 或 false 以使用包含(或不包含)文件开头/结尾标记的 {@code FileSplitter}(按行拆分基于文本的文件)。(布尔值,默认值:
<none>
) - splitter.markers-json 文件
-
当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:
true
)
6.12. 转换处理器
Transformer 处理器允许您根据 SPEL 表达式转换消息有效负载结构。
下面是如何运行此应用程序的示例。
java -jar transform-processor-kafka-<version>.jar --spel.function.expression=payload.toUpperCase()
如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。
6.13. Twitter 趋势和趋势位置处理器
可以返回热门主题或热门主题的 Locations 的处理器。
该属性 允许 选择查询类型。twitter.trend.trend-query-type
6.13.1. 在某个位置检索热门话题(可选)
对于此模式,设置为 。twitter.trend.trend-query-type
trend
基于 Trends API 的处理器。 返回特定纬度、经度位置附近的热门主题。
6.13.2. 获取趋势位置
对于此模式,设置为 。twitter.trend.trend-query-type
trendLocation
按位置检索热门主题的完整列表或附近的位置列表。
如果未提供 , 参数,则处理器将执行 Trends Available API 并返回 Twitter 具有其热门主题信息的位置。latitude
longitude
如果提供了 , 参数,则处理器将执行 Trends Closest API 并返回 Twitter 具有其热门主题信息、最接近指定位置的位置。latitude
longitude
响应是一个数组,用于编码位置的 WOEID 和一些其他人类可读的信息,例如位置所属的规范名称和国家/地区。locations
7. 水槽
7.1. Cassandra 接收器
此接收器应用程序将其收到的每条消息的内容写入 Cassandra。
它需要 JSON String 的有效负载,并使用其属性映射到表列。
7.1.2. 选项
cassandra sink 具有以下选项:
- spring.cassandra.compression
-
Cassandra 二进制协议支持的压缩。(压缩,默认值:
none
,可能的值:LZ4,SNAPPY,NONE) - spring.cassandra.config
-
要使用的配置文件的位置。(资源,默认值:
<无>
) - spring.cassandra.contact-points
-
群集节点地址采用 'host:port' 格式,或简单的 'host' 以使用配置的端口。(List<String>,默认值:
[127.0.0.1:9042]
) - spring.cassandra.keyspace-名称
-
要使用的 Keyspace 名称。(字符串,默认值:
<none>
) - spring.cassandra.local-datacenter 的
-
被视为 “local” 的数据中心。联系点应来自此数据中心。(字符串,默认值:
<none>
) - spring.cassandra.password
-
服务器的登录密码。(字符串,默认值:
<none>
) - spring.cassandra.port
-
如果联系点未指定端口,则要使用的端口。(整数,默认值:
9042
) - spring.cassandra.schema-action
-
启动时要采取的架构操作。(字符串,默认值:
无
) - spring.cassandra.session-name
-
Cassandra 会话的名称。(字符串,默认值:
<none>
) - spring.cassandra.ssl 中
-
启用 SSL 支持。(布尔值,默认值:
false
) - spring.cassandra.username
-
服务器的登录用户。(字符串,默认值:
<none>
)
7.2. Analytics 接收器
Sink 应用程序,构建在 Analytics Consumer 之上,用于计算输入消息的分析,并将分析作为指标发布到各种监控系统。它利用千分尺库在最流行的监控系统中提供统一的编程体验,并公开 Spring 表达式语言 (SpEL) 属性,用于定义如何从输入数据计算指标 Name、Values 和 Tags。
分析接收器可以生成两种指标类型:
仪表(例如 Counter 或 Gauge)由其 和 唯一标识(术语 dimensions 和 tags 可互换使用)。维度允许对特定的命名量度进行切片,以便向下钻取和推理数据。name
dimensions
由于量度由其 和 唯一标识,因此您可以为每个量度分配多个标签(e.g. key/值对),但之后无法随机更改这些标签!如果具有相同名称的指标具有不同的标签集,Prometheus 等监控系统将会抱怨。name dimensions |
使用 or 属性设置输出分析指标的名称。如果未设置,则指标名称默认为应用程序名称。analytics.name
analytics.name-expression
使用 , 属性向量度添加一个或多个标签。在属性定义中使用的将在量度中显示为标记名称。TAG_VALUE 是一个表达式,用于动态计算传入消息的标签值。analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>
TAG_NAME
SpEL
表达式使用 and 关键字来访问消息的标头和有效负载值。SpEL
headers
payload
你可以使用 literals (例如 ) 来设置具有固定值的标签。'fixed value' |
所有 Stream 应用程序都支持三种最流行的监控系统,并且您可以通过声明方式启用它们。
您只需将 micrometer meter-registry 依赖项添加到应用程序中,即可添加对其他监控系统的支持。Wavefront
Prometheus
InfluxDB
Analytics Sink
请访问 Spring Cloud 数据流流监控,了解有关配置监控系统的详细说明。以下快速代码段可以帮助您开始。
-
要启用 Prometheus 计量注册表,请设置以下属性。
management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
-
要启用 Wavefront 仪表注册表,请设置以下属性。
management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
-
要启用 InfluxDB 计量注册表,请设置以下属性。
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
如果启用了 Data Flow Server Monitoring,则将重复使用提供的指标配置。Analytics Sink |
下图说明了如何帮助收集股票交易业务内部的实时管道。Analytics Sink
7.2.2. 选项
按前缀分组的属性:
分析学
- 金额表达式
-
用于计算输出度量值(例如 amount)的 SPEL 表达式。它默认为 1.0(表达式,默认值:
<none>
) - 米型
-
Micrometer meter 类型,用于向后端报告指标。(MeterType,默认值:
<none>
,可能的值:counter,gauge
) - 名字
-
输出指标的名称。'name' 和 'nameExpression' 是互斥的。只能设置其中一个。(字符串,默认值:
<none>
) - 名称表达式
-
一个 SPEL 表达式,用于计算输入消息的输出指标名称。'name' 和 'nameExpression' 是互斥的。只能设置其中一个。(表达式,默认值:
<none>
)
analytics.tag
- 表达
-
从 SpEL 表达式计算标签。单个 SPEL 表达式可以生成一个值数组,这反过来意味着不同的名称/值标签。每个 name/value 标签都会产生一个单独的计量增量。标签表达式格式为:analytics.tag.expression。[tag-name]=[SPEL 表达式](Map<String,表达式>,默认值:
<none>
) - 固定
-
已弃用:请将 analytics.tag.expression 与文本 SPEL 表达式一起使用。自定义、固定的 Tags。这些标签具有常量值,创建一次,然后与每个发布的指标一起发送。定义固定标记的约定为:<code> analytics.tag.fixed。[标签名称]=[标签值] </code> (Map<String, String>,默认值:
<无>
)
7.3. Elasticsearch 接收器
Sink,用于将文档索引到 Elasticsearch 中。
此 Elasticsearch 接收器仅支持为 JSON 文档编制索引。
它使用来自输入目标的数据,然后将其索引到 Elasticsearch。
输入数据可以是纯 json 字符串,也可以是表示 JSON 的 a。
它还接受 Elasticsearch 提供的数据。
但是,这种情况很少见,因为中间件不太可能将记录保存为 。
这主要用于消费者的直接调用。java.util.Map
XContentBuilder
XContentBuilder
7.3.1. 选项
Elasticsearch 接收器具有以下选项:
- elasticsearch.consumer.async
-
指示索引操作是否为异步操作。默认情况下,索引是同步完成的。(布尔值,默认值:
false
) - elasticsearch.consumer.batch-size
-
每个请求要编制索引的项目数。它默认为 1。对于大于 1 的值,将使用批量索引 API。(整数,默认值:
1
) - elasticsearch.consumer.group-timeout
-
超时(以毫秒为单位),当批量索引处于活动状态时,将刷新消息组。它默认为 -1,这意味着不会自动刷新空闲消息组。(长整型,默认值:
-1
) - elasticsearch.consumer.id
-
要编制索引的文档的 ID。如果设置,则 INDEX_ID Headers 值将基于每条消息覆盖此属性。(表达式,默认值:
<none>
) - elasticsearch.consumer.index
-
索引的名称。如果设置,则 INDEX_NAME Headers 值将基于每条消息覆盖此属性。(字符串,默认值:
<none>
) - elasticsearch.consumer.routing
-
指示要路由到的分片。如果未提供,Elasticsearch 将默认使用文档 ID 的哈希值。(字符串,默认值:
<none>
) - elasticsearch.consumer.timeout-seconds
-
分片可用的超时时间。如果未设置,则默认为 Elasticsearch 客户端设置的 1 分钟。(长整型,默认值:
0
)
7.3.2. 运行此 sink 的示例
-
从文件夹 :
elasticsearch-sink
./mvnw clean package
-
CD 应用程序
-
cd 复制到适当的 Binder 生成的应用程序(Kafka 或 RabbitMQ)
-
./mvnw clean package
-
确保您正在运行 Elasticsearch。例如,您可以使用以下命令将其作为 docker 容器运行。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
-
如果中间件(Kafka 或 RabbitMQ)尚未运行,请启动它。
-
java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing
-
将一些 JSON 数据发送到中间件目标。例如:
{"foo":"bar"}
-
验证数据是否已编制索引:
curl localhost:9200/testing/_search
7.4. 文件接收器
文件接收器应用将其收到的每条消息写入文件。
7.4.2. 选项
有以下选项:file-sink
- 文件.consumer.binary
-
一个标志,用于指示是否应禁止在写入后添加换行符。(布尔值,默认值:
false
) - 文件.consumer.charset
-
编写文本内容时使用的字符集。(字符串,默认值:
UTF-8
) - 文件.consumer.directory
-
目标文件的父目录。(文件,默认值:
<无>
) - 文件.consumer.directory-expression
-
要为目标文件的父目录计算的表达式。(字符串,默认值:
<none>
) - 文件.consumer.mode
-
如果目标文件已存在,则要使用的 FileExistsMode。(FileExistsMode,默认值:
<none>
,可能的值:APPEND
APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED
) - file.consumer.name
-
目标文件的名称。(字符串,默认值:
file-consumer
) - file.consumer.name-表达式
-
要计算目标文件名称的表达式。(字符串,默认值:
<none>
) - file.consumer.suffix
-
要附加到文件名的后缀。(字符串,默认值:
<空字符串>
)
7.5. FTP 接收器
FTP 接收器是将文件从传入邮件推送到 FTP 服务器的简单选项。
它使用 ,因此传入消息可以是对象、(文件内容)
或 (file content 的数组)。ftp-outbound-adapter
java.io.File
String
bytes
要使用此接收器,您需要用户名和密码才能登录。
默认情况下,如果未指定任何名称,则 Spring Integration 将使用。 将确定文件名
根据 中的标头值(如果存在),或者如果 的有效负载已经是 ,则它将
使用该文件的原始名称。o.s.i.file.DefaultFileNameGenerator DefaultFileNameGenerator file_name MessageHeaders Message java.io.File |
7.5.4. 选项
ftp 接收器有以下选项:
按前缀分组的属性:
ftp.consumer 的
- 自动创建目录
-
是否创建远程目录。(布尔值,默认值:
true
) - filename-expression 文件名
-
用于生成远程文件名的 SPEL 表达式。(字符串,默认值:
<none>
) - 模式
-
如果远程文件已存在,则要执行的操作。(FileExistsMode,默认值:
<none>
,可能的值:APPEND
APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED
) - 远程目录
-
远程 FTP 目录。(字符串,默认值:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认值:
/
) - 临时远程目录
-
如果 '#isUseTemporaryFilename()' 为 true,则将写入文件的临时目录。(字符串,默认值:
/
) - tmp-file-suffix 文件后缀
-
传输过程中要使用的后缀。(字符串,默认值:
.tmp
) - 使用临时文件名
-
是否写入临时文件并重命名。(布尔值,默认值:
true
)
ftp.factory
- 缓存会话
-
缓存会话。(布尔值,默认值:
<none>
) - 客户端模式
-
用于 FTP 会话的客户端模式。(ClientMode,默认值:
<none>
,可能的值:ACTIVE,PASSIVE
)
- 主机
-
服务器的主机名。(字符串,默认值:
localhost
) - 密码
-
用于连接到服务器的密码。(字符串,默认值:
<none>
) - 港口
-
服务器的端口。(整数,默认值:
21
) - 用户名
-
用于连接到服务器的用户名。(字符串,默认值:
<none>
)
sinks.adoc 中未解析的指令 - include::https://raw.githubusercontent.com/spring-cloud/stream-applications/main/applications/sink/geode-sink/README.adoc[tags=ref-doc]
7.6. JDBC 接收器
JDBC sink 允许您将传入的有效负载持久化到 RDBMS 数据库中。
该属性表示 where (连同冒号) 是可选的成对。
在这种情况下,值是通过生成的表达式(如 )计算的,因此这样我们就可以从对象属性直接映射到表列。
例如,我们有一个 JSON 有效负载,如下所示:jdbc.consumer.columns
COLUMN_NAME[:EXPRESSION_FOR_VALUE]
EXPRESSION_FOR_VALUE
payload.COLUMN_NAME
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
因此,我们可以使用 ,并使用配置将其插入到表中:name
city
street
--jdbc.consumer.columns=name,city:address.city,street:address.street
只要底层 JDBC 驱动程序支持,此接收器就支持批量插入。
批量插入通过 和 属性进行配置:
传入消息将聚合,直到消息出现,然后作为批处理插入。
如果毫秒过去了,没有新消息,则即使聚合批处理小于 ,也会插入聚合批处理,从而限制最大延迟。batch-size
idle-timeout
batch-size
idle-timeout
batch-size
该模块还使用 Spring Boot 的DataSource支持来配置数据库连接,因此诸如etc.之类的属性适用。spring.datasource.url |
7.6.1. 示例
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names --jdbc.consumer.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
7.7. 日志接收器
sink 使用应用程序记录器输出数据以供检查。log
请理解 sink 使用无类型的处理程序,这会影响实际日志记录的执行方式。
这意味着,如果 content-type 是 textual,则原始有效负载字节将转换为 String,否则将记录原始字节。
请参阅用户指南中的更多信息。log
7.8. MongoDB 接收器
此接收器应用程序将传入数据提取到 MongoDB 中。
此应用程序完全基于 ,因此请参阅 Spring Boot MongoDB 支持以获取更多信息。MongoDataAutoConfiguration
7.8.2. 选项
mongodb sink 具有以下选项:
按前缀分组的属性:
mongodb.consumer
- 收集
-
用于存储数据的 MongoDB 集合。(字符串,默认值:
<none>
) - 集合表达式
-
用于评估 MongoDB 集合的 SPEL 表达式。(表达式,默认值:
<none>
)
spring.data.mongodb
- 附加主机
-
其他服务器主机。不能使用 URI 设置,或者如果未指定 'host' 。其他主机将使用默认的 mongo 端口 27017,如果您想使用不同的端口,可以使用 “host:port” 语法。(List<String>,默认值:
<none>
) - 身份验证数据库
-
身份验证数据库名称。(字符串,默认值:
<none>
) - 自动索引创建
-
是否启用自动索引创建。(布尔值,默认值:
<none>
) - 数据库
-
数据库名称。(字符串,默认值:
<none>
) - 字段命名策略
-
要使用的 FieldNamingStrategy 的完全限定名称。(类<?>,默认值:
<无>
) - 主机
-
Mongo 服务器主机。不能使用 URI 设置。(字符串,默认值:
<none>
) - 密码
-
mongo 服务器的登录密码。不能使用 URI 设置。(字符[],默认值:
<无>
) - 港口
-
Mongo 服务器端口。不能使用 URI 设置。(整数,默认值:
<none>
) - 副本集名称
-
集群所需的副本集名称。不能使用 URI 设置。(字符串,默认值:
<none>
) - URI
-
Mongo 数据库 URI。覆盖主机、端口、用户名、密码和数据库。(字符串,默认值:
mongodb://localhost/test
) - 用户名
-
mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认值:
<none>
) - uuid 表示
-
将 UUID 转换为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认值:
java-legacy
,可能的值:UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)
7.9. MQTT 接收器
此模块将消息发送到 MQTT。
7.9.2. 选项
mqtt sink 有以下选项:
按前缀分组的属性:
MQTT 协议
- clean-session
-
客户端和服务器是否应在重启和重新连接时记住状态。(布尔值,默认值:
true
) - 连接超时
-
连接超时(以秒为单位)。(整数,默认值:
30
) - 保持活动间隔
-
ping 间隔(以秒为单位)。(整数,默认值:
60
) - 密码
-
连接到代理时使用的密码。(字符串,默认值:
guest
) - 坚持
-
'memory' 或 'file'。(字符串,默认值:
memory
) - 持久性目录
-
Persistence 目录。(字符串,默认值:
/tmp/paho
) - SSL 属性
-
MQTT 客户端 SSL 属性。(Map<String, String>,默认值:
<none>
) - 网址
-
MQTT 代理的位置(逗号分隔的列表)。(String[],默认值:
[tcp://localhost:1883]
) - 用户名
-
连接到 broker 时要使用的用户名。(字符串,默认值:
guest
)
7.10. pgcopy 接收器
使用 PostgreSQL COPY 命令将其传入负载写入 RDBMS 的模块。
7.10.3. 选项
jdbc sink 具有以下选项:
- spring.datasource.driver-class-name
-
JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认值:
<none>
) - spring.datasource.password
-
数据库的登录密码。(字符串,默认值:
<none>
) - spring.datasource.url
-
数据库的 JDBC URL。(字符串,默认值:
<none>
) - spring.datasource.username
-
数据库的登录用户名。(字符串,默认值:
<none>
)
该模块还使用 Spring Boot 的DataSource支持来配置数据库连接,因此诸如etc.之类的属性适用。spring.datasource.url |
7.11. RabbitMQ 接收器
该模块向 RabbitMQ 发送消息。
7.11.1. 选项
rabbit sink 具有以下选项:
(有关 RabbitMQ 连接属性,请参阅 Spring Boot 文档)
按前缀分组的属性:
兔
- 转换器 bean 名称
-
自定义消息转换器的 Bean 名称;如果省略,则使用SimpleMessageConverter。如果为 'jsonConverter',将为您创建一个 Jackson2JsonMessageConverter bean。(字符串,默认值:
<none>
) - 交换
-
Exchange name - 如果提供,则由 exchangeNameExpression 覆盖。(字符串,默认值:
<空字符串>
) - 交换表达式
-
计算结果为 exchange 名称的 SPEL 表达式。(表达式,默认值:
<none>
) - 标头映射的最后一个
-
在映射出站消息的报头时,请确定报头是在转换邮件之前还是之后映射的。(布尔值,默认值:
true
) - mapped-request-headers 请求标头
-
将要映射的标头。(String[],默认值:
[*]
) - own-connection
-
如果为 true,则根据引导属性使用单独的连接。(布尔值,默认值:
false
) - 持久传递模式
-
当 'amqp_deliveryMode' 标头不存在时的默认传递模式,对于 PERSISTENT,则为 true。(布尔值,默认值:
false
) - 路由密钥
-
路由密钥 - 如果提供,则由 routingKeyExpression 覆盖。(字符串,默认值:
<none>
) - 路由密钥表达式
-
计算结果为路由密钥的 SPEL 表达式。(表达式,默认值:
<none>
)
spring.rabbitmq 的
- address-shuffle-mode 地址随机模式
-
用于对配置的地址进行随机排序的模式。(AddressShuffleMode,默认值:
none
,可能的值:NONE,RANDOM,INORDER) - 地址
-
客户端应连接到的地址的逗号分隔列表。设置后,将忽略 host 和 port。(字符串,默认值:
<none>
) - 通道 rpc-timeout
-
通道中 RPC 调用的继续超时。将其设为零可永久等待。(持续时间,默认:
10 分钟
) - 连接超时
-
连接超时。将其设为零可永久等待。(持续时间,默认值:
<无>
) - 主机
-
RabbitMQ 主机。如果设置了地址,则忽略。(字符串,默认值:
localhost
) - 密码
-
登录以对代理进行身份验证。(字符串,默认值:
guest
) - 港口
-
RabbitMQ 端口。如果设置了地址,则忽略。默认为 5672,如果启用了 SSL,则为 5671。(整数,默认值:
<none>
) - publisher-confirm-type
-
确认使用的发布者类型。(ConfirmType,默认值:
<none>
,可能的值:SIMPLE,CORRELATED,NONE) - publisher-returns
-
是否启用发布者退货。(布尔值,默认值:
false
) - 请求通道最大值
-
客户端请求的每个连接的通道数。使用 0 表示无限制。(整数,默认值:
2047
) - 请求的心跳
-
请求的检测信号超时;零表示无。如果未指定 duration 后缀,则将使用秒。(持续时间,默认值:
<无>
) - 用户名
-
登录用户以向 broker 进行身份验证。(字符串,默认值:
guest
) - 虚拟主机
-
连接到 broker 时使用的虚拟主机。(字符串,默认值:
<none>
)
7.12. Redis 接收器
向 Redis 发送消息。
7.12.1. 选项
redis sink 具有以下选项:
按前缀分组的属性:
redis.consumer
- 钥匙
-
存储到键时使用的文本键名称。(字符串,默认值:
<none>
) - 键表达式
-
用于存储到键的 SPEL 表达式。(字符串,默认值:
<none>
) - 队列
-
在队列中存储时使用的文本队列名称。(字符串,默认值:
<none>
) - 队列表达式
-
用于队列的 SpEL 表达式。(字符串,默认值:
<none>
) - 主题
-
发布到主题时要使用的文本主题名称。(字符串,默认值:
<none>
) - 主题表达式
-
用于主题的 SPEL 表达式。(字符串,默认值:
<none>
)
spring.data.redis 的
- 客户端名称
-
要在使用 CLIENT SETNAME 的连接上设置的客户端名称。(字符串,默认值:
<none>
) - 客户端类型
-
要使用的客户端类型。默认情况下,根据 Classpath 自动检测。(ClientType,默认值:
<none>
,可能的值:LETTUCE,JEDIS
)
- 连接超时
-
连接超时。(持续时间,默认值:
<无>
) - 数据库
-
连接工厂使用的数据库索引。(整数,默认值:
0
) - 主机
-
Redis 服务器主机。(字符串,默认值:
localhost
) - 密码
-
redis 服务器的登录密码。(字符串,默认值:
<none>
) - 港口
-
Redis 服务器端口。(整数,默认值:
6379
) - SSL协议
-
是否启用 SSL 支持。(布尔值,默认值:
false
) - 超时
-
读取超时。(持续时间,默认值:
<无>
) - 网址
-
连接 URL。覆盖 host、port 和 password。忽略 User。示例:redis://user:[email protected]:6379 (字符串,默认值:
<none>
) - 用户名
-
redis 服务器的登录用户名。(字符串,默认值:
<none>
)
spring.data.redis.jedis.pool
- 启用
-
是否启用池。如果“commons-pool2”可用,则自动启用。使用 Jedis,在 Sentinel 模式下隐式启用池化,此设置仅适用于单节点设置。(布尔值,默认值:
<none>
) - 最大活动
-
池在给定时间可分配的最大连接数。使用负值表示无限制。(整数,默认值:
8
) - max-idle (最大空闲)
-
池中 “空闲” 连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认值:
8
) - 最大等待
-
当池耗尽时,连接分配在引发异常之前应阻止的最长时间。使用负值可无限期阻止。(持续时间,默认值:
-1ms
) - 最小空闲
-
目标是要在池中维护的最小空闲连接数。仅当 it 和驱逐运行之间的时间均为正数时,此设置才有效。(整数,默认值:
0
) - 驱逐运行之间的时间
-
空闲对象 evictor 线程运行之间的时间。当为正数时,空闲对象驱逐线程将启动,否则不执行空闲对象驱逐。(持续时间,默认值:
<无>
)
spring.data.redis.lettuce.pool
- 启用
-
是否启用池。如果“commons-pool2”可用,则自动启用。使用 Jedis,在 Sentinel 模式下隐式启用池化,此设置仅适用于单节点设置。(布尔值,默认值:
<none>
) - 最大活动
-
池在给定时间可分配的最大连接数。使用负值表示无限制。(整数,默认值:
8
) - max-idle (最大空闲)
-
池中 “空闲” 连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认值:
8
) - 最大等待
-
当池耗尽时,连接分配在引发异常之前应阻止的最长时间。使用负值可无限期阻止。(持续时间,默认值:
-1ms
) - 最小空闲
-
目标是要在池中维护的最小空闲连接数。仅当 it 和驱逐运行之间的时间均为正数时,此设置才有效。(整数,默认值:
0
) - 驱逐运行之间的时间
-
空闲对象 evictor 线程运行之间的时间。当为正数时,空闲对象驱逐线程将启动,否则不执行空闲对象驱逐。(持续时间,默认值:
<无>
)
7.13. 路由器接收器
此应用程序将消息路由到命名通道。
7.13.1. 选项
router sink 有以下选项:
- router.default-output-channel
-
将不可路由的消息发送到何处。(字符串,默认值:
nullChannel
) - router.destination 映射
-
目标映射为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - router.expression 的
-
要应用于消息的表达式,以确定要路由到的通道。请注意,文本、json 或 xml 等内容类型的负载连线格式是 byte[] 而不是 String!。有关如何处理字节数组有效负载内容的文档。(表达式,默认值:
<none>
) - router.refresh-delay
-
检查脚本更改的频率(以毫秒为单位)(如果存在);< 0 表示不刷新。(整数,默认值:
60000
) - router.resolution-required
-
是否需要通道分辨率。(布尔值,默认值:
false
) - router.script 的
-
返回通道或通道映射解析键的 groovy 脚本的位置。(资源,默认值:
<无>
) - router.variables 的
-
变量绑定作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:
<无>
) - router.variables-位置
-
包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:
<无>
)
由于这是动态路由器,因此会根据需要创建目标;因此,默认情况下,仅当 绑定到目标时出现问题时,才会使用 and。defaultOutputChannel resolutionRequired Binder |
您可以使用 该属性限制动态绑定的创建。
默认情况下,所有解析的目标都将动态绑定;如果此属性具有逗号分隔的
destination names,则只会绑定这些名称。
解析到不在此列表中的目标的邮件将被路由到 ,该
也必须显示在列表中。spring.cloud.stream.dynamicDestinations
defaultOutputChannel
destinationMappings
用于将评估结果映射到实际目标名称。
7.13.2. 基于 SPEL 的路由
该表达式根据消息进行计算,并返回通道名称或通道名称映射的键。
有关更多信息,请参阅 Spring 中的“路由器和 Spring 表达式语言 (SpEL)”小节 集成参考手册 配置通用路由器 部分。
从 Spring Cloud Stream 2.0 开始,和内容类型的消息线格式不是!
这是对 SCSt 1.x 的更改,它将这些类型视为字符串!
根据内容类型,可以使用不同的技术来处理有效负载。对于纯内容类型,可以使用 SpEL 表达式将八位字节有效负载转换为字符串。对于类型,jsonPath() SpEL 实用程序
已经支持可互换的字符串和字节数组内容。这同样适用于内容类型和 #xpath() SPEL 实用程序。json text xml byte[] String byte[] text new String(payload) json xml |
例如,对于内容类型,应使用:text
new String(payload).contains('a');
对于内容类型 SpEL 表达式,如下所示:json
#jsonPath(payload, '$.person.name')
7.13.3. 基于 Groovy 的路由
还可以使用 Groovy 脚本代替 SpEL 表达式。让我们在文件系统中创建一个 Groovy 脚本,网址为 “file:/my/path/router.groovy”,或 “classpath:/my/path/router.groovy”:
println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
如果要将变量值传递给脚本,可以使用 variables 选项或 (可选)使用 propertiesLocation 选项将路径传递给包含绑定的 properties 文件。 文件中的所有属性都将作为变量提供给脚本。您可以同时指定 variables 和 propertiesLocation,在这种情况下,作为变量提供的任何重复值都会覆盖 propertiesLocation 中提供的值。 请注意,payload 和 headers 是隐式绑定的,以允许您访问消息中包含的数据。
有关更多信息,请参阅 Spring 集成参考手册 Groovy 支持。
7.15. Amazon S3 接收器
此接收器应用程序支持将对象传输到 Amazon S3 存储桶。
文件负载(和递归目录)将传输到部署应用程序的目录(S3 存储桶)中。remote
local
此接收器接受的消息必须包含以下 as 之一:payload
-
File
,包括用于递归上传的目录; -
InputStream
; -
byte[]
7.15.1. 选项
s3 接收器具有以下选项:
按前缀分组的属性:
s3.common
- 端点 URL
-
用于连接到 s3 兼容存储的可选终端节点 URL。(字符串,默认值:
<none>
) - path-style-access (路径样式访问)
-
使用路径样式访问。(布尔值,默认值:
false
)
s3.consumer
- ACL
-
S3 对象访问控制列表。(CannedAccessControlList,默认值:
<none>
,可能的值:private,public-read,public-read-write,authenticated-read,log-delivery-write,bucket-owner-read,bucket-owner-full-control,aws-exec-read)
- ACL 表达式
-
用于评估 S3 对象访问控制列表的表达式。(表达式,默认值:
<none>
) - 桶
-
用于存储目标文件的 AWS 存储桶。(字符串,默认值:
<none>
) - 存储桶表达式
-
用于评估 AWS 存储桶名称的表达式。(表达式,默认值:
<none>
) - 键表达式
-
用于评估 S3 对象键的表达式。(表达式,默认值:
<none>
)
基于 的目标生成的应用程序可以通过注入到 bean 中的 and/或 进行增强。
有关更多详细信息,请参阅 Spring 集成 AWS 支持。AmazonS3SinkConfiguration
S3MessageHandler.UploadMetadataProvider
S3ProgressListener
S3MessageHandler
7.15.2. Amazon AWS 通用选项
Amazon S3 Sink(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为其基础,其自动配置 类由 Spring Boot 自动使用。 请查阅其文档,了解必需和有用的自动配置属性。
其中一些与 AWS 凭证有关:
-
cloud.aws.credentials.accessKey
-
cloud.aws.credentials.secretKey
-
cloud.aws.credentials.instance配置文件
-
cloud.aws.credentials.profileName
-
cloud.aws.credentials.profilePath
其他用于 AWS 定义:Region
-
cloud.aws.region.auto
-
cloud.aws.region.static
对于 AWS:Stack
-
cloud.aws.stack.auto
-
cloud.aws.stack.name
7.16. SFTP Sink 接收器
SFTP 接收器是将文件从传入邮件推送到 SFTP 服务器的简单选项。
它使用 ,因此传入消息可以是对象、(文件内容)
或 (file content 的数组)。sftp-outbound-adapter
java.io.File
String
bytes
要使用此接收器,您需要用户名和密码才能登录。
默认情况下,如果未指定任何名称,则 Spring Integration 将使用。 将确定文件名
根据 中的标头值(如果存在),或者如果 的有效负载已经是 ,则它将
使用该文件的原始名称。o.s.i.file.DefaultFileNameGenerator DefaultFileNameGenerator file_name MessageHeaders Message java.io.File |
配置选项时,评估的根对象是应用程序上下文,例如。sftp.factory.known-hosts-expression
sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
7.16.3. 选项
sftp 接收器具有以下选项:
按前缀分组的属性:
sftp.consumer
- 自动创建目录
-
是否创建远程目录。(布尔值,默认值:
true
) - filename-expression 文件名
-
用于生成远程文件名的 SPEL 表达式。(字符串,默认值:
<none>
) - 模式
-
如果远程文件已存在,则要执行的操作。(FileExistsMode,默认值:
<none>
,可能的值:APPEND
APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIED
) - 远程目录
-
远程 FTP 目录。(字符串,默认值:
/
) - 远程文件分隔符
-
远程文件分隔符。(字符串,默认值:
/
) - 临时远程目录
-
如果 'isUseTemporaryFilename()' 为 true,则将写入文件的临时目录。(字符串,默认值:
/
) - tmp-file-suffix 文件后缀
-
传输过程中要使用的后缀。(字符串,默认值:
.tmp
) - 使用临时文件名
-
是否写入临时文件并重命名。(布尔值,默认值:
true
)
sftp.consumer.factory
- 允许未知密钥
-
如果为 True,则允许未知或更改的键。(布尔值,默认值:
false
) - 缓存会话
-
缓存会话。(布尔值,默认值:
<none>
) - 主机
-
服务器的主机名。(字符串,默认值:
localhost
) - 已知主机表达式
-
解析为已知主机文件位置的 SpEL 表达式。(表达式,默认值:
<none>
) - 密码短语
-
用户私钥的密码。(字符串,默认值:
<空字符串>
) - 密码
-
用于连接到服务器的密码。(字符串,默认值:
<none>
) - 港口
-
服务器的端口。(整数,默认值:
22
) - 私钥
-
用户私钥的资源位置。(资源,默认值:
<无>
) - 用户名
-
用于连接到服务器的用户名。(字符串,默认值:
<none>
)
7.17. TCP 接收器
此模块使用 Encoder 将消息写入 TCP。
TCP 是一种流协议,需要一些机制来在网络上构建消息。许多编码器是 available,默认值为 'CRLF'。
7.17.1. 选项
tcp sink 具有以下选项:
按前缀分组的属性:
7.17.2. 可用的编码器
- CRLF (默认)
-
以回车 (0x0d) 后跟换行符 (0x0a) 结尾的文本
- 如果
-
由换行符终止的文本 (0x0a)
- 零
-
以 null 字节 (0x00) 结尾的文本
- STXETX
-
文本前面有 STX (0x02) 并以 ETX (0x03) 结尾
- 生
-
no structure - 客户端通过关闭套接字来指示完整的消息
- L1 系列
-
数据前面有一个单字节(无符号)长度字段(最多支持 255 个字节)
- L2 (二层)
-
数据前面有一个两字节(无符号)长度的字段(最多 2 个16-1 字节)
- L4 系列
-
数据前面有一个四字节(带符号)长度的字段(最多 2 个31-1 字节)
7.19. Twitter 消息接收器
从身份验证用户向指定用户发送私信。
需要将 JSON POST 正文和标头设置为 。Content-Type
application/json
收到用户的消息后,您可以在 24 小时内发送最多 5 条消息作为响应。 收到的每条消息都会重置 24 小时窗口和分配的 5 条消息。 在 24 小时内发送第 6 条消息或在 24 小时时段外发送消息将计入速率限制。 此行为仅在使用 POST direct_messages/events/new 端点时适用。 |
SPEL 表达式用于计算输入消息中的请求参数。
7.19.1. 选项
使用单引号 () 将表达式属性的文字值括起来。
例如,要设置固定的消息文本,请使用 。
对于固定目标 userId,请使用 。' SpEL text='Fixed Text' userId='666' |
- twitter.message.update.media-id
-
要与消息关联的媒体 ID。私信只能引用一个媒体 ID。(表达式,默认值:
<none>
) - twitter.message.update.screen-name
-
向其发送私信的用户的屏幕名称。(表达式,默认值:
<none>
) - 推特.message.update.text
-
私信文本。URL 编码。最大长度为 10,000 个字符。(表达式,默认值:
payload
) - twitter.message.update.user-id
-
向其发送私信的用户的用户 ID。(表达式,默认值:
<none>
)
7.20. Twitter 更新下沉
更新身份验证用户的当前文本(例如 Tweeting)。
对于每次更新尝试,都会将更新文本与进行身份验证的用户最近的推文进行比较。 任何会导致重复的尝试都将被阻止,从而导致 403 错误。 用户不能连续两次提交相同的文本。 |
虽然不受 API 的速率限制,但用户一次可以创建的推文数量受到限制。 标准 API 的更新限制为 3 小时内 300 个。 如果用户发布的更新数量达到当前允许的限制,此方法将返回 HTTP 403 错误。
您可以在此处找到 Update API 的详细信息:developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-update
7.20.1. 选项
按前缀分组的属性:
推特
- 附件 URL
-
(SPEL 表达式)为了使 URL 不计入扩展推文的文本正文中,请提供 URL 作为推文附件。此 URL 必须是推文永久链接或私信深层链接。任意的非 Twitter URL 必须保留在文本文本中。传递给 attachment_url 参数的 URL 与推文永久链接或私信深层链接不匹配,将在创建推文时失败并导致异常。(表达式,默认值:
<none>
) - 显示坐标
-
(SPEL 表达式)是否在发送 Tweet 的确切坐标上放置一个固定点。(表达式,默认值:
<none>
) - in-reply-to-status-id (回复状态 ID)
-
(SPEL 表达式)更新要回复的现有文本的 ID。注意:除非在文本文本中提及此参数引用的推文的作者,否则将忽略此参数。因此,你必须在更新中包含 @username,其中 username 是引用推文的作者。设置 inReplyToStatusId 时,auto_populate_reply_metadata也会自动设置。稍后 确保从原始推文中查找领先@mentions,并从那里添加到新推文中。随着回复链的增长,这会将@mentions附加到扩展推文的元数据中,直到达到@mentions限制。如果原始推文已被删除,则回复将失败。(表达式,默认值:
<none>
) - 媒体 ID
-
(SPEL 表达式)要与推文关联的media_ids的逗号分隔列表。你可以在一条推文中包含最多 4 张照片、1 张动画 GIF 或 1 个视频。有关上传媒体的更多详细信息,请参阅上传媒体。(表达式,默认值:
<none>
) - 地点 ID
-
(SPEL 表达式)世界上的一个地方。(表达式,默认值:
<none>
) - 发短信
-
(SPEL 表达式)文本的文本将更新。URL 编码。t.co 链接换行都会影响字符数。默认为消息的有效负载(表达式,默认值:
payload
)
7.21. 波前沉
Wavefront 接收器使用 Messages<?>,将其转换为 Wavefront 数据格式的指标,并将指标直接发送到 Wavefront 或 Wavefront 代理。
支持常见的 ETL 使用案例,其中现有(历史)指标数据必须清理、转换并存储在 Wavefront 中以供进一步分析。
7.21.1. 选项
Wavefront sink 具有以下选项:
- wavefront.api-token
-
Wavefront API 访问令牌。(字符串,默认值:
<none>
) - wavefront.metric-表达式
-
计算结果为度量值的 SPEL 表达式。(表达式,默认值:
<none>
) - wavefront.metric-名称
-
指标的名称。默认为应用程序名称。(字符串,默认值:
<none>
) - wavefront.proxy-uri
-
Wavefront 代理的 URL。(字符串,默认值:
<none>
) - wavefront.source
-
发出指标的唯一应用程序、主机、容器或实例。(字符串,默认值:
<none>
) - wavefront.tag-表达式
-
与指标关联的自定义元数据的集合。点标签不能为空。键的有效字符:字母数字、连字符 ('-')、下划线('_')、点 ('.')。对于值,允许使用任何字符,包括空格。要包含双引号,请使用反斜杠对其进行转义,反斜杠不能是标签值中的最后一个字符。点标签键和值组合的最大允许长度为 254 个字符(255 个字符,包括分隔键和值的“=”字符)。如果值较长,则拒绝并记录该点(Map<String、Expression>,默认值:
<none>
) - wavefront.timestamp 表达式
-
一个 SPEL 表达式,计算结果为指标的时间戳(可选)。(表达式,默认值:
<none>
) - wavefront.uri 中
-
Wavefront 环境的 URL。(字符串,默认值:
<none>
)
7.22. Websocket 接收器
一个简单的 Websocket Sink 实现。
7.22.1. 选项
支持以下选项:
- websocket.consumer.log 级
-
netty 通道的 logLevel 。默认值为 <tt>WARN</tt> (String, default:
<none>
) - websocket.consumer.path
-
WebsocketSink 使用者需要连接的路径。默认值为 <tt>/websocket</tt> (String,默认值:
/websocket
) - websocket.consumer.port
-
Netty 服务器侦听的端口。默认值为 <tt>9292</tt> (整数,默认值:
9292
) - websocket.consumer.ssl
-
是否创建一个 {@link io.netty.handler.ssl.SslContext}。(布尔值,默认值:
false
) - websocket.consumer.threads
-
Netty 的线程数 {@link io.netty.channel.EventLoopGroup}。默认值为 <tt>1</tt> (整数,默认值:
1
)
7.22.2. 示例
要验证 websocket-sink 是否接收来自其他 spring-cloud-stream 应用程序的消息,你可以使用 遵循简单的端到端设置。
第 3 步:部署websocket-sink
最后,在 mode 中启动 websocket-sink,以便您可以在日志中看到 the 产生的消息:trace
time-source
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE
您应该开始在启动 WebsocketSink 的控制台中看到日志消息,如下所示:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
7.22.3. 执行器
您可以使用它来访问发送和接收的最后一条消息。您必须
通过提供 来启用它。默认情况下,它通过 .下面是一个示例输出:Endpoint
n
--endpoints.websocketconsumertrace.enabled=true
host:port/websocketconsumertrace
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
还有一个简单的 HTML 页面,您可以在其中的文本区域看到转发的消息。您可以访问
它直接通过您的浏览器。host:port
7.23. ZeroMQ 接收器
“zeromq” sink 支持将消息发送到 ZeroMQ 套接字。
7.23.3. 选项
zeromq sink 有以下选项:
- zeromq.consumer.connect-url
-
用于连接到 ZeroMQ 套接字的连接 URL。(字符串,默认值:
<none>
) - zeromq.consumer.socket类型
-
连接应建立的 Socket Type。(SocketType,默认值:
<none>
,可能的值:PAIR,PUBSUBREQ,REP,DEALER,ROUTER,PULL,PUSH,XPUB,XSUB,STREAM) - zeromq.consumer.topic
-
一个 Topic SpEL 表达式,用于在向订阅者发送消息之前评估主题。(表达式,默认值:
<none>
)