应用

5. 来源

5.1. CDC 源

变更数据捕获 (CDC),用于从各种数据库捕获和流式传输变更事件。 目前支持 、 、 和 数据库。sourceMySQLPostgreSQLMongoDBOracleSQL Serverspring-doc.cn

它基于 Debezium Embedded Connector 构建,允许通过不同的消息绑定器(如 Apache Kafka、RabbitMQ 和所有 Spring Cloud Stream 支持代理)捕获和流式传输数据库更改。CDC Sourcespring-doc.cn

它支持所有 Debezium 配置属性。只需将前缀添加到现有的 Debezium 属性中即可。例如,要设置 Debezium 的属性,请改用 source 属性。cdc.config.connector.classcdc.config.connector.classspring-doc.cn

我们为最常用的 Debezium 属性提供了方便的快捷方式。例如,您可以使用我们的快捷方式来代替长 Debezium 属性。下表列出了所有可用的快捷键以及它们所代表的 Debezium 属性。 Debezium 属性(例如 )总是优先于快捷键!cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnectorcdc.connector=mysqlcdc.config.XXXspring-doc.cn

CDC 源引入了基于 MetadataStore 服务的新默认配置。Later 提供了各种微服务友好的方式来存储偏移元数据。BackingOffsetStorespring-doc.cn

5.1.1. 选项

按前缀分组的属性:spring-doc.cn

美国疾病控制与预防中心
配置

用于 debezium 配置属性的 Spring pass-trough 包装器。所有带有 'cdc.config.' 前缀的属性都是本机 Debezium 属性。前缀被删除,将它们转换为 Debezium io.debezium.config.Configuration。(Map<String, String>,默认值:<none>spring-doc.cn

连接器

cdc.config.connector.class 属性的快捷方式。只要它们彼此不矛盾,它们中的任何一个都可以使用。(ConnectorType,默认值:<none>,可能的值:mysql,postgres,mongodb,oracle,sqlserver)spring-doc.cn

名字

此 sourceConnector 实例的唯一名称。(字符串,默认值:<none>spring-doc.cn

图式

将架构作为出站消息的一部分。(布尔值,默认值:falsespring-doc.cn

cdc.拼合
添加字段

要添加到拼合消息的元数据字段的逗号分隔列表。这些字段将以 “__” 或 “__[<]struct]__” 为前缀,具体取决于结构的规范。(字符串,默认值:<none>spring-doc.cn

添加标头

逗号分隔列表指定要添加到拼合消息标头的元数据字段列表。字段将以 “__” 或 “__[struct]__” 为前缀。(字符串,默认值:<none>spring-doc.cn

删除处理模式

处理已删除记录的选项:(1) 无 - 传递记录,(2) 删除记录,以及 (3) 重写 - 向记录添加“__deleted”字段。(DeleteHandlingMode,默认值:<none>,可能的值:drop,rewrite,nonespring-doc.cn

放置式逻辑删除

默认情况下,Debezium 会生成逻辑删除记录,以对已删除的记录启用 Kafka 压缩。dropTombstone 可以隐藏 Tombstone 记录。(布尔值,默认值:truespring-doc.cn

启用

启用拼合源记录事件 (https://debezium.io/docs/configuration/event-flattening)。(布尔值,默认值:truespring-doc.cn

cdc.offset
提交超时

在取消进程并恢复要在将来尝试提交的偏移数据之前,等待刷新记录并将分区偏移数据提交到偏移存储的最大毫秒数。(持续时间,默认:5000 毫秒spring-doc.cn

flush-interval 刷新间隔

尝试提交偏移量的时间间隔。默认值为 1 分钟。(持续时间,默认:60000 毫秒spring-doc.cn

政策

偏移存储提交策略。(OffsetPolicy,默认值:<none>spring-doc.cn

存储

Kafka 连接器跟踪已处理记录的数量,并定期将计数(作为“偏移量”)存储在预配置的元数据存储中。重新启动时,连接器会从最后记录的源偏移量恢复读数。(OffsetStorageType,默认值:<none>,可能的值:memory,file,kafka,metadataspring-doc.cn

cdc.stream.header
convert-connect-headers (转换连接标头)

当 true 时,{@link org.apache.kafka.connect.header.Header} 将转换为消息头,其中 {@link org.apache.kafka.connect.header.Header#key()} 作为名称,{@link org.apache.kafka.connect.header.Header#value()}。(布尔值,默认值:truespring-doc.cn

抵消

将源记录的偏移元数据序列化到 cdc.offset 下的出站消息标头中。(布尔值,默认值:falsespring-doc.cn

元数据.store.dynamo-db
创建延迟

创建表重试之间的延迟。(整数,默认值:1spring-doc.cn

创建重试

创建表请求的重试次数。(整数,默认值:25spring-doc.cn

read-capacity (读取容量)

读取表上的 capacity。(长,默认值:1spring-doc.cn

桌子

元数据的表名称。(字符串,默认值:<none>spring-doc.cn

生存时间

表条目的 TTL。(整数,默认值:<none>spring-doc.cn

写入容量

表上的写入容量。(长,默认值:1spring-doc.cn

元数据.store.jdbc
地区

此存储中保留的消息的唯一分组标识符。(字符串,默认值:DEFAULTspring-doc.cn

表前缀

自定义表名称的前缀。(字符串,默认值:<none>spring-doc.cn

元数据.store.mongo-db
收集

元数据的 MongoDB 集合名称。(字符串,默认值:metadataStorespring-doc.cn

metadata.store.redis
钥匙

元数据的 Redis 键。(字符串,默认值:<none>spring-doc.cn

metadata.store
类型

指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:<none>,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory) spring-doc.cn

元数据.store.zookeeper
连接字符串

HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:127.0.0.1:2181spring-doc.cn

编码

在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:UTF-8spring-doc.cn

重试间隔

Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:1000spring-doc.cn

根节点 - 存储条目是此节点的子项。(字符串,默认值:/SpringIntegration-MetadataStorespring-doc.cn

Debezium 属性快捷方式映射

下表列出了所有可用的快捷键以及它们所代表的 Debezium 属性。spring-doc.cn

表 1.表快捷方式属性映射
捷径 源语言 描述

cdc.连接器spring-doc.cn

cdc.config.connector.classspring-doc.cn

mysql: MySqlConnector, : PostgresConnector, : MongodbSourceConnector, : OracleConnector, : SqlServerConnectorpostgresmongodboraclesqlserverspring-doc.cn

cdc.namespring-doc.cn

cdc.config.namespring-doc.cn

cdc.offset.flush-intervalspring-doc.cn

cdc.config.offset.flush.interval.msspring-doc.cn

cdc.offset.commit-timeoutspring-doc.cn

cdc.config.offset.flush.timeout.msspring-doc.cn

cdc.offset.policyspring-doc.cn

cdc.config.offset.commit.policyspring-doc.cn

periodic: PeriodicCommitOffsetPolicy, : AlwaysCommitOffsetPolicyalwaysspring-doc.cn

cdc.offset.storagespring-doc.cn

cdc.config.offset.storagespring-doc.cn

metadata: MetadataStoreOffsetBackingStore, : FileOffsetBackingStore, : KafkaOffsetBackingStore, : MemoryOffsetBackingStorefilekafkamemoryspring-doc.cn

cdc.flattening.drop-tombstonesspring-doc.cn

cdc.config.drop.tombstonesspring-doc.cn

cdc.flattening.delete-handling-modespring-doc.cn

cdc.config.delete.handling.modespring-doc.cn

none: 无, : 丢弃, : 重写droprewritespring-doc.cn

5.1.2. 数据库支持

它使用 Debezium 实用程序,目前支持五个数据存储的 CDC:、、 和 数据库。CDC SourceMySQLPostgreSQLMongoDBOracleSQL Serverspring-doc.cn

5.1.3. 示例和测试

[CdcSourceIntegrationTest]()、[CdcDeleteHandlingIntegrationTest]() 和 [CdcFlatteningIntegrationTest]() 集成测试使用在本地计算机上运行的测试数据库夹具。 我们使用预构建的 debezium docker 数据库镜像。 Maven 构建在 .docker-maven-pluginspring-doc.cn

要从 IDE 运行和调试测试,您需要从命令行部署所需的数据库映像。 以下说明介绍了如何从 Docker 映像运行预配置的测试数据库。spring-doc.cn

MySQL (MySQL的

在 docker 中启动 :debezium/example-mysqlspring-doc.cn

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.0

(可选)使用 client 连接到数据库并创建具有所需凭据的用户:mysqldebeziumspring-doc.cn

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

使用以下属性将 CDC 源连接到 MySQL DB:spring-doc.cn

cdc.connector=mysql (1)

cdc.name=my-sql-connector (2)
cdc.config.database.server.id=85744 (2)
cdc.config.database.server.name=my-app-connector (2)

cdc.config.database.user=debezium  (3)
cdc.config.database.password=dbz (3)
cdc.config.database.hostname=localhost (3)
cdc.config.database.port=3306 (3)

cdc.schema=true (4)
cdc.flattening.enabled=true (5)
1 将 CDC 源配置为使用 MySqlConnector。(相当于 设置 )。cdc.config.connector.class=io.debezium.connector.mysql.MySqlConnector
2 用于标识和调度传入事件的元数据。
3 连接到以 用户身份运行的 MySQL 服务器。localhost:3306debezium
4 在事件中包含 Change Event Value 架构。SourceRecord
5 启用 CDC 事件平展

您也可以使用此 mysql 配置运行。CdcSourceIntegrationTests#CdcMysqlTestsspring-doc.cn

PostgreSQL 数据库

从 Docker 镜像启动预配置的 postgres 服务器:debezium/example-postgres:1.0spring-doc.cn

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.0

您可以像这样连接到此服务器:spring-doc.cn

psql -U postgres -h localhost -p 5432

使用以下属性将 CDC 源连接到 PostgreSQL:spring-doc.cn

cdc.connector=postgres (1)
cdc.offset.storage=memory (2)

cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)

cdc.config.database.user=postgres  (4)
cdc.config.database.password=postgres (4)
cdc.config.database..dbname=postgres (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=5432 (4)

cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 配置为使用 PostgresConnector。等效于设置 。CDC Sourcecdc.config.connector.class=io.debezium.connector.postgresql.PostgresConnector
2 配置 Debezium 引擎以使用(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)后备偏移存储。memory
3 用于标识和调度传入事件的元数据。
4 连接到以 用户身份运行的 PostgreSQL 服务器。localhost:5432postgres
5 在事件中包含 Change Event Value 架构。SourceRecord
6 启用 CDC 事件平展

您也可以使用此 mysql 配置运行。CdcSourceIntegrationTests#CdcPostgresTestsspring-doc.cn

MongoDB 数据库

从 Docker 镜像启动预配置的 mongodb:debezium/example-mongodb:0.10spring-doc.cn

docker run -it --rm --name mongodb -p 27017:27017 -e MONGODB_USER=debezium -e MONGODB_PASSWORD=dbz  debezium/example-mongodb:0.10

初始化清单集合spring-doc.cn

docker exec -it mongodb sh -c 'bash -c /usr/local/bin/init-inventory.sh'

在终端输出中,搜索类似 :mongodbhost: "3f95a8a6516e:27017"spring-doc.cn

2019-01-10T13:46:10.004+0000 I COMMAND  [conn1] command local.oplog.rs appName: "MongoDB Shell" command: replSetInitiate { replSetInitiate: { _id: "rs0", members: [ { _id: 0.0, host: "3f95a8a6516e:27017" } ] }, lsid: { id: UUID("5f477a16-d80d-41f2-9ab4-4ebecea46773") }, $db: "admin" } numYields:0 reslen:22 locks:{ Global: { acquireCount: { r: 36, w: 20, W: 2 }, acquireWaitCount: { W: 1 }, timeAcquiringMicros: { W: 312 } }, Database: { acquireCount: { r: 6, w: 4, W: 16 } }, Collection: { acquireCount: { r: 4, w: 2 } }, oplog: { acquireCount: { r: 2, w: 3 } } } protocol:op_msg 988ms

将条目添加到您的127.0.0.1 3f95a8a6516e/etc/hostsspring-doc.cn

使用以下属性将 CDC 源连接到 MongoDB:spring-doc.cn

cdc.connector=mongodb (1)
cdc.offset.storage=memory (2)

cdc.config.mongodb.hosts=rs0/localhost:27017 (3)
cdc.config.mongodb.name=dbserver1 (3)
cdc.config.mongodb.user=debezium (3)
cdc.config.mongodb.password=dbz (3)
cdc.config.database.whitelist=inventory (3)

cdc.config.tasks.max=1 (4)

cdc.schema=true (5)
cdc.flattening.enabled=true (6)
1 配置为使用 MongoDB Connector。这会映射到 .CDC Sourcecdc.config.connector.class=io.debezium.connector.mongodb.MongodbSourceConnector
2 配置 Debezium 引擎以使用(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)后备偏移存储。memory
3 连接到以 user.localhost:27017debezium
4 debezium.io/docs/connectors/mongodb/#tasks
5 在事件中包含 Change Event Value 架构。SourceRecord
6 启用 CDC 事件平展

您也可以使用此 mysql 配置运行。CdcSourceIntegrationTests#CdcPostgresTestsspring-doc.cn

SQL 服务器

从 Docker 镜像启动 a:sqlserverdebezium/example-postgres:1.0spring-doc.cn

docker run -it --rm --name sqlserver -p 1433:1433 -e ACCEPT_EULA=Y -e MSSQL_PID=Standard -e SA_PASSWORD=Password! -e MSSQL_AGENT_ENABLED=true microsoft/mssql-server-linux:2017-CU9-GDR2

使用 debezium 的 sqlserver 教程中的示例数据进行填充:spring-doc.cn

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
cat ./inventory.sql | docker exec -i sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

使用以下属性将 CDC 源连接到 SQLServer:spring-doc.cn

cdc.connector=sqlserver (1)
cdc.offset.storage=memory (2)

cdc.name=my-sql-connector (3)
cdc.config.database.server.id=85744 (3)
cdc.config.database.server.name=my-app-connector (3)

cdc.config.database.user=sa  (4)
cdc.config.database.password=Password! (4)
cdc.config.database..dbname=testDB (4)
cdc.config.database.hostname=localhost (4)
cdc.config.database.port=1433 (4)
1 配置为使用 SqlServerConnector。等效于设置 。CDC Sourcecdc.config.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
2 配置 Debezium 引擎以使用(例如 'cdc.config.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore)后备偏移存储。memory
3 用于标识和调度传入事件的元数据。
4 连接到以用户身份运行的 SQL Server。localhost:1433sa

您也可以使用此 mysql 配置运行。CdcSourceIntegrationTests#CdcSqlServerTestsspring-doc.cn

神谕

从 localhost 启动 Oracle reachable,并使用 Debezium Vagrant 设置中描述的配置、用户和授权进行设置spring-doc.cn

使用 Debezium 的 Oracle 教程中的示例数据进行填充:spring-doc.cn

wget https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-with-oracle-jdbc/init/inventory.sql
cat ./inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

5.2. 文件源

此应用程序轮询目录并将新文件或其内容发送到 output 通道。 默认情况下,file 源以字节数组的形式提供 File 的内容。 但是,这可以使用 --file.supplier.mode 选项进行自定义:spring-doc.cn

  • ref 提供 java.io.File 引用spring-doc.cn

  • lines 将逐行拆分文件并为每行发出一条新消息spring-doc.cn

  • contents 默认值。以字节数组的形式提供文件的内容spring-doc.cn

使用 时,您还可以提供附加选项 。 如果设置为 true,则底层 FileSplitter 将在实际数据之前和之后发出额外的文件开始和文件结束标记消息。 这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项 withMarkers 默认为 false。--file.supplier.mode=lines--file.supplier.withMarkers=trueFileSplitter.FileMarkerspring-doc.cn

5.2.1. 选项

文件源具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

file.consumer 文件
标记 JSON

当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:truespring-doc.cn

模式

用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:<none>,可能的值:ref,lines,contentsspring-doc.cn

with-markers

设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:<none>spring-doc.cn

文件.supplier
empty-时延迟

未检测到新文件时的延迟持续时间。(持续时间,默认:1 秒spring-doc.cn

目录

用于轮询新文件的目录。(文件,默认值:<无>spring-doc.cn

文件名模式

用于匹配文件的简单 ant 模式。(字符串,默认值:<none>spring-doc.cn

文件名-regex

用于匹配文件的正则表达式模式。(模式,默认值:<无>spring-doc.cn

防止重复

设置为 true 以包含防止重复的 AcceptOnceFileListFilter。(布尔值,默认值:truespring-doc.cn

元数据.store.dynamo-db
创建延迟

创建表重试之间的延迟。(整数,默认值:1spring-doc.cn

创建重试

创建表请求的重试次数。(整数,默认值:25spring-doc.cn

read-capacity (读取容量)

读取表上的 capacity。(长,默认值:1spring-doc.cn

桌子

元数据的表名称。(字符串,默认值:<none>spring-doc.cn

生存时间

表条目的 TTL。(整数,默认值:<none>spring-doc.cn

写入容量

表上的写入容量。(长,默认值:1spring-doc.cn

元数据.store.jdbc
地区

此存储中保留的消息的唯一分组标识符。(字符串,默认值:DEFAULTspring-doc.cn

表前缀

自定义表名称的前缀。(字符串,默认值:<none>spring-doc.cn

元数据.store.mongo-db
收集

元数据的 MongoDB 集合名称。(字符串,默认值:metadataStorespring-doc.cn

metadata.store.redis
钥匙

元数据的 Redis 键。(字符串,默认值:<none>spring-doc.cn

metadata.store
类型

指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:<none>,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory) spring-doc.cn

元数据.store.zookeeper
连接字符串

HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:127.0.0.1:2181spring-doc.cn

编码

在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:UTF-8spring-doc.cn

重试间隔

Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:1000spring-doc.cn

根节点 - 存储条目是此节点的子项。(字符串,默认值:/SpringIntegration-MetadataStorespring-doc.cn

5.3. FTP 源

此源应用程序支持使用 FTP 协议传输文件。 文件将从目录传输到部署应用程序的目录。 默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是 使用选项进行自定义:remotelocal--modespring-doc.cn

  • 裁判提供参考java.io.Filespring-doc.cn

  • 线将逐行拆分文件,并为每行发出一条新消息spring-doc.cn

  • 内容默认值。以字节数组的形式提供文件的内容spring-doc.cn

使用 时,您还可以提供附加选项 。 如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头文件结尾标记消息。 这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines--withMarkers=truetrueFileSplitterFileSplitter.FileMarkerwithMarkersfalsespring-doc.cn

另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。spring-doc.cn

5.3.1. 输入

N/A (从 FTP 服务器获取文件)。spring-doc.cn

5.3.2. 输出

mode = 内容
头:
有效载荷:

A 填充了文件内容。byte[]spring-doc.cn

模式 = 行
头:
有效载荷:

A 表示每行。Stringspring-doc.cn

第一行前面有一条带有标记有效负载的消息(可选)。 最后一行(可选)后跟带有标记有效负载的消息。STARTENDspring-doc.cn

标记存在和格式由 和 属性确定。with-markersmarkers-jsonspring-doc.cn

模式 = ref
头:

没有。spring-doc.cn

有效载荷:

一个对象。java.io.Filespring-doc.cn

5.3.3. 选项

ftp 源具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

file.consumer 文件
标记 JSON

当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:truespring-doc.cn

模式

用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:<none>,可能的值:ref,lines,contentsspring-doc.cn

with-markers

设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:<none>spring-doc.cn

ftp.factory
缓存会话

缓存会话。(布尔值,默认值:<none>spring-doc.cn

客户端模式

用于 FTP 会话的客户端模式。(ClientMode,默认值:<none>,可能的值:ACTIVE,PASSIVEspring-doc.cn

主机

服务器的主机名。(字符串,默认值:localhostspring-doc.cn

密码

用于连接到服务器的密码。(字符串,默认值:<none>spring-doc.cn

港口

服务器的端口。(整数,默认值:21spring-doc.cn

用户名

用于连接到服务器的用户名。(字符串,默认值:<none>spring-doc.cn

ftp.supplier
自动创建本地目录

如果本地目录不存在,则设置为 true 以创建本地目录。(布尔值,默认值:truespring-doc.cn

empty-时延迟

未检测到新文件时的延迟持续时间。(持续时间,默认:1 秒spring-doc.cn

删除远程文件

设置为 true 可在成功传输后删除远程文件。(布尔值,默认值:falsespring-doc.cn

文件名模式

用于匹配要传输的文件的名称的筛选模式。(字符串,默认值:<none>spring-doc.cn

文件名-regex

用于匹配要传输的文件的名称的筛选器正则表达式模式。(模式,默认值:<无>spring-doc.cn

本地目录

用于文件传输的本地目录。(文件,默认值:<无>spring-doc.cn

preserve-timestamp

设置为 true 可保留原始时间戳。(布尔值,默认值:truespring-doc.cn

远程目录

远程 FTP 目录。(字符串,默认值:/spring-doc.cn

远程文件分隔符

远程文件分隔符。(字符串,默认值:/spring-doc.cn

tmp-file-suffix 文件后缀

传输过程中要使用的后缀。(字符串,默认值:.tmpspring-doc.cn

元数据.store.dynamo-db
创建延迟

创建表重试之间的延迟。(整数,默认值:1spring-doc.cn

创建重试

创建表请求的重试次数。(整数,默认值:25spring-doc.cn

read-capacity (读取容量)

读取表上的 capacity。(长,默认值:1spring-doc.cn

桌子

元数据的表名称。(字符串,默认值:<none>spring-doc.cn

生存时间

表条目的 TTL。(整数,默认值:<none>spring-doc.cn

写入容量

表上的写入容量。(长,默认值:1spring-doc.cn

元数据.store.jdbc
地区

此存储中保留的消息的唯一分组标识符。(字符串,默认值:DEFAULTspring-doc.cn

表前缀

自定义表名称的前缀。(字符串,默认值:<none>spring-doc.cn

元数据.store.mongo-db
收集

元数据的 MongoDB 集合名称。(字符串,默认值:metadataStorespring-doc.cn

metadata.store.redis
钥匙

元数据的 Redis 键。(字符串,默认值:<none>spring-doc.cn

metadata.store
类型

指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:<none>,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory) spring-doc.cn

元数据.store.zookeeper
连接字符串

HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:127.0.0.1:2181spring-doc.cn

编码

在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:UTF-8spring-doc.cn

重试间隔

Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:1000spring-doc.cn

根节点 - 存储条目是此节点的子项。(字符串,默认值:/SpringIntegration-MetadataStorespring-doc.cn

5.3.4. 示例

java -jar ftp_source.jar --ftp.supplier.remote-dir=foo --file.consumer.mode=lines --ftp.factory.host=ftpserver \
         --ftp.factory.username=user --ftp.factory.password=pw --ftp.local-dir=/foo

sources.adoc 中未解析的指令 - include::https://raw.githubusercontent.com/spring-cloud/stream-applications/main/applications/source/geode-source/README.adoc[tags=ref-doc]spring-doc.cn

5.4. Http 源

一个源应用程序,用于侦听 HTTP 请求并将正文作为消息负载发出。 如果 Content-Type 与 或 匹配,则有效负载将为 String, 否则,有效负载将是一个字节数组。text/*application/jsonspring-doc.cn

有效载荷:

如果内容类型匹配或text/*application/jsonspring-doc.cn

如果内容类型不匹配或text/*application/jsonspring-doc.cn

5.4.2. 选项

http 源支持以下配置属性:spring-doc.cn

按前缀分组的属性:spring-doc.cn

http.cors 网站
允许凭据

浏览器是否应包含与正在批注的请求的域关联的任何 Cookie。(布尔值,默认值:<none>spring-doc.cn

允许的标头

在实际请求期间可以使用的请求标头列表。(String[],默认值:<none>spring-doc.cn

允许的来源

允许的源列表,例如 https://domain1.com。(String[],默认值:<none>spring-doc.cn

http
mapped-request-headers 请求标头

将要映射的标头。(String[],默认值:<none>spring-doc.cn

路径模式

HTTP 端点路径映射。(字符串,默认值:/spring-doc.cn

服务器
港口

服务器 HTTP 端口。(整数,默认值:8080spring-doc.cn

5.5. JDBC 源码

此源从 RDBMS 轮询数据。 此源代码完全基于 ,因此请参阅 Spring Boot JDBC 支持以获取更多信息。DataSourceAutoConfigurationspring-doc.cn

有效载荷
  • Map<String, Object>when(默认)和jdbc.split == trueList<Map<String, Object>>spring-doc.cn

5.5.2. 选项

jdbc 源具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

jdbc.supplier
最大行数

要为查询处理的最大行数。(整数,默认值:0spring-doc.cn

查询

用于选择数据的查询。(字符串,默认值:<none>spring-doc.cn

分裂

是否将 SQL 结果拆分为单个消息。(布尔值,默认值:truespring-doc.cn

更新

一个 SQL 更新语句,用于将轮询的消息标记为 “seen”。(字符串,默认值:<none>spring-doc.cn

spring.datasource
驱动程序类名称

JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认值:<none>spring-doc.cn

密码

数据库的登录密码。(字符串,默认值:<none>spring-doc.cn

网址

数据库的 JDBC URL。(字符串,默认值:<none>spring-doc.cn

用户名

数据库的登录用户名。(字符串,默认值:<none>spring-doc.cn

spring.integration.poller
cron (定时)

Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:<none>spring-doc.cn

固定延迟

轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:<无>spring-doc.cn

固定利率

轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:<无>spring-doc.cn

初始延迟

轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:<无>spring-doc.cn

最大每次轮询消息数

每个轮询周期要轮询的最大消息数。(整数,默认值:<none>spring-doc.cn

接收超时

轮询消息等待多长时间。(持续时间,默认:1 秒spring-doc.cn

另请参阅 Spring Boot 文档,了解其他属性和轮询选项。DataSourceTriggerPropertiesMaxMessagesPropertiesspring-doc.cn

5.6. JMS 源

JMS 源允许从 JMS 接收消息。spring-doc.cn

5.6.1. 选项

JMS 源具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

jms.supplier
客户端 ID

长期订阅的客户端 ID。(字符串,默认值:<none>spring-doc.cn

目的地

接收消息的目标 (队列或主题)。(字符串,默认值:<none>spring-doc.cn

消息选择器

消息的选择器。(字符串,默认值:<none>spring-doc.cn

会话事务处理

如果为true,则启用事务并选择DefaultMessageListenerContainer,如果为false,则选择SimpleMessageListenerContainer。(布尔值,默认值:truespring-doc.cn

subscription-durable

对于长期订阅,为 True。(布尔值,默认值:<none>spring-doc.cn

订阅名称

长期订阅或共享订阅的名称。(字符串,默认值:<none>spring-doc.cn

订阅共享

对于共享订阅,则为 True。(布尔值,默认值:<none>spring-doc.cn

spring.jms 的
jndi-名称

连接工厂 JNDI 名称。设置后,优先于其他连接出厂自动配置。(字符串,默认值:<none>spring-doc.cn

发布子域

默认目标类型是否为 topic。(布尔值,默认值:falsespring-doc.cn

spring.jms.listener 中
确认模式

容器的确认模式。默认情况下,侦听器使用自动确认进行交易。(AcknowledgeMode,默认值:<none>,可能的值:AUTO,CLIENT DUPS_OKspring-doc.cn

自动启动

启动时自动启动容器。(布尔值,默认值:truespring-doc.cn

并发

最小并发使用者数。(整数,默认值:<none>spring-doc.cn

最大并发

最大并发使用者数。(整数,默认值:<none>spring-doc.cn

接收超时

用于接收调用的超时。使用 -1 表示无等待接收,使用 0 表示完全没有超时。后者仅在不在事务管理器内运行时才可行,并且通常不鼓励使用,因为它会阻止干净关闭。(持续时间,默认:1 秒spring-doc.cn

5.7. 负载生成器源

发送生成的数据并将其调度到流的源。spring-doc.cn

5.7.1. 选项

load-generator 源具有以下选项:spring-doc.cn

load-generator.generate-timestamp 生成时间戳

是否生成时间戳。(布尔值,默认值:falsespring-doc.cn

load-generator.消息计数

消息计数。(整数,默认值:1000spring-doc.cn

load-generator.消息大小

消息大小。(整数,默认值:1000spring-doc.cn

load-generator.producers

生产者数量。(整数,默认值:1spring-doc.cn

5.8. 邮件源

一个源应用程序,用于侦听 Emails 并将消息正文作为消息负载发出。spring-doc.cn

5.8.1. 选项

邮件源具有以下选项:spring-doc.cn

mail.supplier.charset

用于 byte[] mail-to-string 转换的 charset。(字符串,默认值:UTF-8spring-doc.cn

邮件.supplier.delete

设置为 true 可在下载后删除电子邮件。(布尔值,默认值:falsespring-doc.cn

mail.supplier.expression

配置 SpEL 表达式以选择消息。(字符串,默认值:truespring-doc.cn

邮件.supplier.idle-IMAP

设置为 true 以使用 IdleImap 配置。(布尔值,默认值:falsespring-doc.cn

mail.supplier.java 邮件属性

JavaMail 属性作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<无>spring-doc.cn

mail.supplier.mark-as-read

设置为 true 可将电子邮件标记为已读。(布尔值,默认值:falsespring-doc.cn

邮件.supplier.url

用于连接到邮件服务器的邮件连接 URL,例如 'imaps://username:[email protected]:993/Inbox'。(URLName,默认值:<none>spring-doc.cn

mail.supplier.user-flag

当服务器不支持 \Recently 时标记邮件的标志。(字符串,默认值:<none>spring-doc.cn

5.9. MongoDB 源

此源轮询来自 MongoDB 的数据。 此源完全基于 ,因此请参阅 Spring Boot MongoDB 支持以获取更多信息。MongoDataAutoConfigurationspring-doc.cn

5.9.1. 选项

mongodb 源具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

mongodb.supplier
收集

要查询的 MongoDB 集合。(字符串,默认值:<none>spring-doc.cn

查询

MongoDB 查询。(字符串,默认值:{ }spring-doc.cn

查询表达式

MongoDB 中的 SpEL 表达式查询 DSL 样式。(表达式,默认值:<none>spring-doc.cn

分裂

是否将查询结果拆分为单个消息。(布尔值,默认值:truespring-doc.cn

update-expression (更新表达式)

MongoDB 中的 SpEL 表达式更新 DSL 样式。(表达式,默认值:<none>spring-doc.cn

spring.data.mongodb
附加主机

其他服务器主机。不能使用 URI 设置,或者如果未指定 'host' 。其他主机将使用默认的 mongo 端口 27017,如果您想使用不同的端口,可以使用 “host:port” 语法。(List<String>,默认值:<none>spring-doc.cn

身份验证数据库

身份验证数据库名称。(字符串,默认值:<none>spring-doc.cn

自动索引创建

是否启用自动索引创建。(布尔值,默认值:<none>spring-doc.cn

数据库

数据库名称。(字符串,默认值:<none>spring-doc.cn

字段命名策略

要使用的 FieldNamingStrategy 的完全限定名称。(类<?>,默认值:<无>spring-doc.cn

主机

Mongo 服务器主机。不能使用 URI 设置。(字符串,默认值:<none>spring-doc.cn

密码

mongo 服务器的登录密码。不能使用 URI 设置。(字符[],默认值:<无>spring-doc.cn

港口

Mongo 服务器端口。不能使用 URI 设置。(整数,默认值:<none>spring-doc.cn

副本集名称

集群所需的副本集名称。不能使用 URI 设置。(字符串,默认值:<none>spring-doc.cn

URI

Mongo 数据库 URI。覆盖主机、端口、用户名、密码和数据库。(字符串,默认值:mongodb://localhost/testspring-doc.cn

用户名

mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认值:<none>spring-doc.cn

uuid 表示

将 UUID 转换为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认值:java-legacy,可能的值:UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)spring-doc.cn

另请参阅 Spring Boot 文档以获取其他属性。 请参阅 和 了解轮询选项。MongoPropertiesTriggerPropertiesspring-doc.cn

5.10. MQTT 源码

允许从 MQTT 接收消息的 Source。spring-doc.cn

有效载荷:

5.10.2. 选项

mqtt 源具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

MQTT 协议
clean-session

客户端和服务器是否应在重启和重新连接时记住状态。(布尔值,默认值:truespring-doc.cn

连接超时

连接超时(以秒为单位)。(整数,默认值:30spring-doc.cn

保持活动间隔

ping 间隔(以秒为单位)。(整数,默认值:60spring-doc.cn

密码

连接到代理时使用的密码。(字符串,默认值:guestspring-doc.cn

坚持

'memory' 或 'file'。(字符串,默认值:memoryspring-doc.cn

持久性目录

Persistence 目录。(字符串,默认值:/tmp/pahospring-doc.cn

SSL 属性

MQTT 客户端 SSL 属性。(Map<String, String>,默认值:<none>spring-doc.cn

网址

MQTT 代理的位置(逗号分隔的列表)。(String[],默认值:[tcp://localhost:1883]spring-doc.cn

用户名

连接到 broker 时要使用的用户名。(字符串,默认值:guestspring-doc.cn

mqtt.supplier (英语)
二元的

true 将有效负载保留为 bytes。(布尔值,默认值:falsespring-doc.cn

字符集

用于将字节转换为 String 的字符集(当 binary 为 false 时)。(字符串,默认值:UTF-8spring-doc.cn

客户端 ID

标识客户端。(字符串,默认:stream.client.id.sourcespring-doc.cn

QoS

QoS;所有主题的单个值或以逗号分隔的列表以匹配主题。(整数 [],默认值:[0]spring-doc.cn

主题

源将订阅的主题(逗号分隔)。(String[],默认值:[stream.mqtt]spring-doc.cn

5.11. RabbitMQ 源码

“rabbit” 源允许从 RabbitMQ 接收消息。spring-doc.cn

在部署流之前,队列必须存在;它们不是自动创建的。 您可以使用 RabbitMQ Web UI 轻松创建队列。spring-doc.cn

5.11.1. 输入

不适用spring-doc.cn

5.11.2. 输出

有效载荷

5.11.3. 选项

rabbit 源具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

rabbit.supplier 的
enable-retry (启用重试)

如果为 true,则启用重试。(布尔值,默认值:falsespring-doc.cn

初始重试间隔

启用重试时的初始重试间隔。(整数,默认值:1000spring-doc.cn

mapped-request-headers 请求标头

将要映射的标头。(String[],默认值:[STANDARD_REQUEST_HEADERS]spring-doc.cn

最大尝试次数

启用重试时的最大传递尝试次数。(整数,默认值:3spring-doc.cn

最大重试间隔

启用重试时的最大重试间隔。(整数,默认值:30000spring-doc.cn

own-connection

如果为 true,则根据引导属性使用单独的连接。(布尔值,默认值:falsespring-doc.cn

队列

源将侦听消息的队列。(String[],默认值:<none>spring-doc.cn

重新排队

是否应将被拒绝的邮件重新排队。(布尔值,默认值:truespring-doc.cn

重试乘数

启用 retry 时重试回退乘数。(双精度,默认值:2spring-doc.cn

交易

通道是否事务处理。(布尔值,默认值:falsespring-doc.cn

spring.rabbitmq 的
address-shuffle-mode 地址随机模式

用于对配置的地址进行随机排序的模式。(AddressShuffleMode,默认值:none,可能的值:NONE,RANDOM,INORDER)spring-doc.cn

地址

客户端应连接到的地址的逗号分隔列表。设置后,将忽略 host 和 port。(字符串,默认值:<none>spring-doc.cn

通道 rpc-timeout

通道中 RPC 调用的继续超时。将其设为零可永久等待。(持续时间,默认:10 分钟spring-doc.cn

连接超时

连接超时。将其设为零可永久等待。(持续时间,默认值:<无>spring-doc.cn

主机

RabbitMQ 主机。如果设置了地址,则忽略。(字符串,默认值:localhostspring-doc.cn

密码

登录以对代理进行身份验证。(字符串,默认值:guestspring-doc.cn

港口

RabbitMQ 端口。如果设置了地址,则忽略。默认为 5672,如果启用了 SSL,则为 5671。(整数,默认值:<none>spring-doc.cn

publisher-confirm-type

确认使用的发布者类型。(ConfirmType,默认值:<none>,可能的值:SIMPLE,CORRELATED,NONE)spring-doc.cn

publisher-returns

是否启用发布者退货。(布尔值,默认值:falsespring-doc.cn

请求通道最大值

客户端请求的每个连接的通道数。使用 0 表示无限制。(整数,默认值:2047spring-doc.cn

请求的心跳

请求的检测信号超时;零表示无。如果未指定 duration 后缀,则将使用秒。(持续时间,默认值:<无>spring-doc.cn

用户名

登录用户以向 broker 进行身份验证。(字符串,默认值:guestspring-doc.cn

虚拟主机

连接到 broker 时使用的虚拟主机。(字符串,默认值:<none>spring-doc.cn

另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的其他属性。spring-doc.cn

关于重试的说明
使用默认的 ackModeAUTO)requeuetrue) 选项,将重试失败的消息传送 无限期。 由于 rabbit source 中没有太多的处理,因此 source 本身失败的风险很小,除非 由于某种原因,下游未连接。 将 requeue 设置为 false 将导致邮件在第一次尝试时被拒绝(并可能发送到 Dead Letter Exchange/Queue(如果代理是这样配置的)。 的 enableRetry 选项允许配置重试参数,以便可以重试失败的消息投放,并且 最终在重试用尽时丢弃(或死信)。 传递线程在重试间隔期间暂停。 重试选项包括 enableRetrymaxAttemptsinitialRetryIntervalretryMultipliermaxRetryInterval。 消息传送失败并显示 MessageConversionException 永远不会重试;假设是,如果消息 无法转换,则后续尝试也将失败。 此类消息将被丢弃(或死信)。Binder

5.11.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:spring-doc.cn

$ ./mvnw clean package

5.11.5. 示例

java -jar rabbit-source.jar --rabbit.queues=

5.12. Amazon S3 源

此源应用程序支持使用 Amazon S3 协议传输文件。 文件从目录 (S3 存储桶) 传输到部署应用程序的目录。remotelocalspring-doc.cn

默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是 使用选项进行自定义:--modespring-doc.cn

  • 裁判提供参考java.io.Filespring-doc.cn

  • 线将逐行拆分文件,并为每行发出一条新消息spring-doc.cn

  • 内容默认值。以字节数组的形式提供文件的内容spring-doc.cn

使用 时,您还可以提供附加选项 。 如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头文件结尾标记消息。 这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines--withMarkers=truetrueFileSplitterFileSplitter.FileMarkerwithMarkersfalsespring-doc.cn

另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。spring-doc.cn

模式 = 行
头:
有效载荷:

A 表示每行。Stringspring-doc.cn

第一行前面有一条带有标记有效负载的消息(可选)。 最后一行(可选)后跟带有标记有效负载的消息。STARTENDspring-doc.cn

标记存在和格式由 和 属性确定。with-markersmarkers-jsonspring-doc.cn

模式 = ref
头:

没有。spring-doc.cn

有效载荷:

一个对象。java.io.Filespring-doc.cn

5.12.3. 选项

s3 源具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

file.consumer 文件
标记 JSON

当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:truespring-doc.cn

模式

用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:<none>,可能的值:ref,lines,contentsspring-doc.cn

with-markers

设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:<none>spring-doc.cn

元数据.store.dynamo-db
创建延迟

创建表重试之间的延迟。(整数,默认值:1spring-doc.cn

创建重试

创建表请求的重试次数。(整数,默认值:25spring-doc.cn

read-capacity (读取容量)

读取表上的 capacity。(长,默认值:1spring-doc.cn

桌子

元数据的表名称。(字符串,默认值:<none>spring-doc.cn

生存时间

表条目的 TTL。(整数,默认值:<none>spring-doc.cn

写入容量

表上的写入容量。(长,默认值:1spring-doc.cn

元数据.store.jdbc
地区

此存储中保留的消息的唯一分组标识符。(字符串,默认值:DEFAULTspring-doc.cn

表前缀

自定义表名称的前缀。(字符串,默认值:<none>spring-doc.cn

元数据.store.mongo-db
收集

元数据的 MongoDB 集合名称。(字符串,默认值:metadataStorespring-doc.cn

metadata.store.redis
钥匙

元数据的 Redis 键。(字符串,默认值:<none>spring-doc.cn

metadata.store
类型

指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:<none>,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory) spring-doc.cn

元数据.store.zookeeper
连接字符串

HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:127.0.0.1:2181spring-doc.cn

编码

在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:UTF-8spring-doc.cn

重试间隔

Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:1000spring-doc.cn

根节点 - 存储条目是此节点的子项。(字符串,默认值:/SpringIntegration-MetadataStorespring-doc.cn

s3.common
端点 URL

用于连接到 s3 兼容存储的可选终端节点 URL。(字符串,默认值:<none>spring-doc.cn

path-style-access (路径样式访问)

使用路径样式访问。(布尔值,默认值:falsespring-doc.cn

s3.supplier
自动创建本地目录

创建或不创建本地目录。(布尔值,默认值:truespring-doc.cn

删除远程文件

处理后删除或不删除远程文件。(布尔值,默认值:falsespring-doc.cn

文件名模式

用于筛选远程文件的模式。(字符串,默认值:<none>spring-doc.cn

文件名-regex

用于过滤远程文件的 regexp。(模式,默认值:<无>spring-doc.cn

仅列表

设置为 true 可返回 s3 对象元数据,而不将文件复制到本地目录。(布尔值,默认值:falsespring-doc.cn

本地目录

用于存储文件的本地目录。(文件,默认值:<无>spring-doc.cn

preserve-timestamp

将远程文件的时间戳传输到本地文件。(布尔值,默认值:truespring-doc.cn

远程目录

AWS S3 存储桶资源。(字符串,默认值:bucketspring-doc.cn

远程文件分隔符

远程文件分隔符。(字符串,默认值:/spring-doc.cn

tmp-file-suffix 文件后缀

临时文件后缀。(字符串,默认值:.tmpspring-doc.cn

5.12.4. Amazon AWS 通用选项

Amazon S3 源(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为其基础,其自动配置 类由 Spring Boot 自动使用。 请查阅其文档,了解必需和有用的自动配置属性。spring-doc.cn

其中一些与 AWS 凭证有关:spring-doc.cn

其他用于 AWS 定义:Regionspring-doc.cn

对于 AWS:Stackspring-doc.cn

5.12.5. 示例

java -jar s3-source.jar --s3.remoteDir=/tmp/foo --file.consumer.mode=lines

5.13. SFTP 源

此源应用程序支持使用 SFTP 协议传输文件。 文件将从目录传输到部署应用程序的目录。 默认情况下,源发出的消息以字节数组的形式提供。但是,这可能是 使用选项进行自定义:remotelocal--modespring-doc.cn

  • 裁判提供参考java.io.Filespring-doc.cn

  • 线将逐行拆分文件,并为每行发出一条新消息spring-doc.cn

  • 内容默认值。以字节数组的形式提供文件的内容spring-doc.cn

使用 时,您还可以提供附加选项 。 如果设置为 ,则底层将在实际数据之前和之后发出额外的文件开头文件结尾标记消息。 这 2 个附加标记消息的有效负载为 类型 。如果未明确设置,则选项默认为 。--mode=lines--withMarkers=truetrueFileSplitterFileSplitter.FileMarkerwithMarkersfalsespring-doc.cn

有关高级配置选项,请参阅 sftp-supplierspring-doc.cn

另请参阅 MetadataStore 选项,了解用于防止重新启动时出现重复消息的可能共享持久性存储配置。spring-doc.cn

5.13.1. 输入

N/A (从 SFTP 服务器获取文件)。spring-doc.cn

5.13.2. 输出

mode = 内容
头:
有效载荷:

A 填充了文件内容。byte[]spring-doc.cn

模式 = 行
头:
有效载荷:

A 表示每行。Stringspring-doc.cn

第一行前面有一条带有标记有效负载的消息(可选)。 最后一行(可选)后跟带有标记有效负载的消息。STARTENDspring-doc.cn

标记存在和格式由 和 属性确定。with-markersmarkers-jsonspring-doc.cn

模式 = ref
头:
有效载荷:

一个对象。java.io.Filespring-doc.cn

5.13.3. 选项

ftp 源具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

file.consumer 文件
标记 JSON

当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:truespring-doc.cn

模式

用于文件读取源的 FileReadingMode。值包括 'ref' - 文件对象,'lines' - 每行一条消息,或 'contents' - 以字节为单位的内容。(FileReadingMode,默认值:<none>,可能的值:ref,lines,contentsspring-doc.cn

with-markers

设置为 true 可在数据之前/之后发出文件开头/文件结尾标记消息。仅对 FileReadingMode 'lines' 有效。(布尔值,默认值:<none>spring-doc.cn

元数据.store.dynamo-db
创建延迟

创建表重试之间的延迟。(整数,默认值:1spring-doc.cn

创建重试

创建表请求的重试次数。(整数,默认值:25spring-doc.cn

read-capacity (读取容量)

读取表上的 capacity。(长,默认值:1spring-doc.cn

桌子

元数据的表名称。(字符串,默认值:<none>spring-doc.cn

生存时间

表条目的 TTL。(整数,默认值:<none>spring-doc.cn

写入容量

表上的写入容量。(长,默认值:1spring-doc.cn

元数据.store.jdbc
地区

此存储中保留的消息的唯一分组标识符。(字符串,默认值:DEFAULTspring-doc.cn

表前缀

自定义表名称的前缀。(字符串,默认值:<none>spring-doc.cn

元数据.store.mongo-db
收集

元数据的 MongoDB 集合名称。(字符串,默认值:metadataStorespring-doc.cn

metadata.store.redis
钥匙

元数据的 Redis 键。(字符串,默认值:<none>spring-doc.cn

metadata.store
类型

指示要配置的元数据存储的类型(默认为 'memory')。您必须包含相应的 Spring 集成依赖项才能使用持久存储。(StoreType,默认值:<none>,可能的值:mongodb,redis,dynamodb,jdbc,zookeeper,hazelcast,memory) spring-doc.cn

元数据.store.zookeeper
连接字符串

HOST:PORT 形式的 Zookeeper 连接字符串。(字符串,默认值:127.0.0.1:2181spring-doc.cn

编码

在 Zookeeper 中存储数据时使用的编码。(字符集,默认值:UTF-8spring-doc.cn

重试间隔

Zookeeper 操作的重试间隔(以毫秒为单位)。(整数,默认值:1000spring-doc.cn

根节点 - 存储条目是此节点的子项。(字符串,默认值:/SpringIntegration-MetadataStorespring-doc.cn

sftp.supplier
自动创建本地目录

如果本地目录不存在,则设置为 true 以创建本地目录。(布尔值,默认值:truespring-doc.cn

empty-时延迟

未检测到新文件时的延迟持续时间。(持续时间,默认:1 秒spring-doc.cn

删除远程文件

设置为 true 可在成功传输后删除远程文件。(布尔值,默认值:falsespring-doc.cn

目录

工厂 “name.directory” 对的列表。(String[],默认值:<none>spring-doc.cn

工厂

工厂名称到工厂的映射。(Map<String,Factory>,默认值:<none>spring-doc.cn

公平

True 表示多个服务器/目录的公平轮换。默认情况下,这是 false,因此如果源具有多个条目,则将在访问其他源之前收到这些条目。(布尔值,默认值:falsespring-doc.cn

文件名模式

用于匹配要传输的文件的名称的筛选模式。(字符串,默认值:<none>spring-doc.cn

文件名-regex

用于匹配要传输的文件的名称的筛选器正则表达式模式。(模式,默认值:<无>spring-doc.cn

仅列表

设置为 true 可返回文件元数据,而不返回整个负载。(布尔值,默认值:falsespring-doc.cn

本地目录

用于文件传输的本地目录。(文件,默认值:<无>spring-doc.cn

max-fetch (最大获取)

每次轮询要获取的最大远程文件数;默认 unlimited 。在列出文件或构建任务启动请求时不适用。(整数,默认值:<none>spring-doc.cn

preserve-timestamp

设置为 true 可保留原始时间戳。(布尔值,默认值:truespring-doc.cn

远程目录

远程 FTP 目录。(字符串,默认值:/spring-doc.cn

远程文件分隔符

远程文件分隔符。(字符串,默认值:/spring-doc.cn

重命名远程文件

成功传输后,必须将解析为新名称 remote files 的 SPEL 表达式重命名为该表达式。(表达式,默认值:<none>spring-doc.cn

设置为 true 可流式传输文件,而不是复制到本地目录。(布尔值,默认值:falsespring-doc.cn

tmp-file-suffix 文件后缀

传输过程中要使用的后缀。(字符串,默认值:.tmpspring-doc.cn

sftp.supplier.factory
允许未知密钥

如果为 True,则允许未知或更改的键。(布尔值,默认值:falsespring-doc.cn

主机

服务器的主机名。(字符串,默认值:localhostspring-doc.cn

已知主机表达式

解析为已知主机文件位置的 SpEL 表达式。(表达式,默认值:<none>spring-doc.cn

密码短语

用户私钥的密码。(字符串,默认值:<空字符串>spring-doc.cn

密码

用于连接到服务器的密码。(字符串,默认值:<none>spring-doc.cn

港口

服务器的端口。(整数,默认值:22spring-doc.cn

私钥

用户私钥的资源位置。(资源,默认值:<无>spring-doc.cn

用户名

用于连接到服务器的用户名。(字符串,默认值:<none>spring-doc.cn

sftp.supplier.排序方式
属性

要排序依据的文件列表条目的属性(FILENAME、ATIME:上次访问时间、MTIME:上次修改时间)。(属性,默认值:<无>spring-doc.cn

迪尔

排序方向(ASC 或 DESC)。(Dir,默认值:<none>spring-doc.cn

5.13.4. 示例

java -jar sftp_source.jar --sftp.supplier.remote-dir=foo --file.mode=lines --sftp.supplier.factory.host=sftpserver \
         --sftp.supplier.factory.username=user --sftp.supplier.factory.password=pw --sftp.supplier.local-dir=/foo

5.14. 系统日志

syslog 源通过 UDP 和/或 TCP 接收 SYSLOG 数据包。支持 RFC3164 (BSD) 和 RFC5424 格式。spring-doc.cn

5.14.1. 选项

syslog.supplier.buffer-size

解码消息时使用的缓冲区大小;较大的邮件将被拒绝。(整数,默认值:2048spring-doc.cn

syslog.supplier.nio

是否使用 NIO(当支持大量连接时)。(布尔值,默认值:falsespring-doc.cn

syslog.supplier.port

要侦听的端口。(整数,默认值:1514spring-doc.cn

syslog.supplier.protocol

用于 SYSLOG 的协议(tcp 或 udp)。(协议,默认值:<none>,可能的值:tcp,udp,bothspring-doc.cn

syslog.supplier.reverse-lookup

是否对 incoming socket 执行反向查找。(布尔值,默认值:falsespring-doc.cn

syslog.supplier.rfc

'5424' 或 '3164' - 根据 RFC 的 syslog 格式;3164 又名“BSD”格式。(字符串,默认值:3164spring-doc.cn

syslog.supplier.socket-超时

套接字超时。(整数,默认值:0spring-doc.cn

5.15. TCP 协议

源充当服务器,并允许远程方连接到它并通过原始 tcp 套接字提交数据。tcpspring-doc.cn

TCP 是一种流协议,需要一些机制来在网络上构建消息。许多解码器是 available,默认值为 'CRLF',它与 Telnet 兼容。spring-doc.cn

TCP 源应用程序生成的消息具有有效负载。byte[]spring-doc.cn

5.15.1. 选项

按前缀分组的属性:spring-doc.cn

TCP 协议
蔚来

是否使用 NIO。(布尔值,默认值:falsespring-doc.cn

港口

要侦听的端口;0 让操作系统选择一个端口。(整数,默认值:1234spring-doc.cn

反向查找

对远程 IP 地址执行反向 DNS 查找;如果为 false,则邮件报头中仅包含 IP 地址。(布尔值,默认值:falsespring-doc.cn

套接字超时

未收到数据时关闭套接字之前的超时 (ms)。(整数,默认值:120000spring-doc.cn

使用直接缓冲区

是否使用直接缓冲区。(布尔值,默认值:falsespring-doc.cn

tcp.supplier
缓冲区大小

解码消息时使用的缓冲区大小;较大的邮件将被拒绝。(整数,默认值:2048spring-doc.cn

译码器

接收消息时使用的解码器。(编码,默认值:<none>,可能的值:CRLF,LF,NULL,STXETX,RAW,L1,L2,L4spring-doc.cn

5.15.2. 可用的解码器

文本数据
CRLF (默认)

以回车 (0x0d) 后跟换行符 (0x0a) 结尾的文本spring-doc.cn

如果

由换行符终止的文本 (0x0a)spring-doc.cn

以 null 字节 (0x00) 结尾的文本spring-doc.cn

STXETX

文本前面有 STX (0x02) 并以 ETX (0x03) 结尾spring-doc.cn

文本和二进制数据

no structure - 客户端通过关闭套接字来指示完整的消息spring-doc.cn

L1 系列

数据前面有一个单字节(无符号)长度字段(最多支持 255 个字节)spring-doc.cn

L2 (二层)

数据前面有一个两字节(无符号)长度的字段(最多 2 个16-1 字节)spring-doc.cn

L4 系列

数据前面有一个四字节(带符号)长度的字段(最多 2 个31-1 字节)spring-doc.cn

5.16. 时间源

时间源将每隔一段时间简单地发出一个包含当前时间的 String。spring-doc.cn

5.16.1. 选项

时间源具有以下选项:spring-doc.cn

spring.integration.poller.cron

Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:<none>spring-doc.cn

spring.integration.poller.fixed-delay

轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:<无>spring-doc.cn

spring.integration.poller.fixed-rate

轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:<无>spring-doc.cn

spring.integration.poller.initial-delay

轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:<无>spring-doc.cn

每次轮询 spring.integration.poller.max 条消息数

每个轮询周期要轮询的最大消息数。(整数,默认值:<none>spring-doc.cn

spring.integration.poller.receive-timeout

轮询消息等待多长时间。(持续时间,默认:1 秒spring-doc.cn

time.date-格式

日期值的格式。(字符串,默认值:MM/dd/yy HH:mm:ssspring-doc.cn

5.17. Twitter 消息源

重复检索过去 30 天内的直接消息(发送和接收),按时间倒序排序。 已释放的消息将缓存(在缓存中)以防止重复。 默认情况下,使用 in-memory。MetadataStoreSimpleMetadataStorespring-doc.cn

控制消息的数量或返回的消息。twitter.message.source.countspring-doc.cn

这些属性控制消息轮询间隔。 必须与使用的 API 速率限制保持一致spring.cloud.stream.pollerspring-doc.cn

5.17.1. 选项

按前缀分组的属性:spring-doc.cn

spring.integration.poller
cron (定时)

Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:<none>spring-doc.cn

固定延迟

轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:<无>spring-doc.cn

固定利率

轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:<无>spring-doc.cn

初始延迟

轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:<无>spring-doc.cn

最大每次轮询消息数

每个轮询周期要轮询的最大消息数。(整数,默认值:<none>spring-doc.cn

接收超时

轮询消息等待多长时间。(持续时间,默认:1 秒spring-doc.cn

推特连接
访问令牌

您的 Twitter 令牌。(字符串,默认值:<none>spring-doc.cn

访问令牌密钥

您的 Twitter 令牌密钥。(字符串,默认值:<none>spring-doc.cn

consumer-key (使用者密钥)

您的 Twitter 密钥。(字符串,默认值:<none>spring-doc.cn

消费者密钥

您的 Twitter 秘密。(字符串,默认值:<none>spring-doc.cn

已启用调试

启用 Twitter4J 调试模式。(布尔值,默认值:falsespring-doc.cn

raw-json 格式

启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认值:truespring-doc.cn

推特.message.source
计数

要返回的最大事件数。默认为 20。最大 50 (整数,默认值:20spring-doc.cn

5.18. Twitter 搜索源

Twitter 的标准搜索 API(搜索/推文)允许对最近或热门推文的索引进行简单查询。这提供了对过去 7 天内发布的最近推文样本的连续搜索。“公共”API 集的一部分。Sourcespring-doc.cn

返回与指定查询匹配的相关推文的集合。spring-doc.cn

使用属性可控制连续搜索请求之间的间隔。速率限制 - 每 30 分钟窗口 180 个请求(例如 ~6 r/m,~ 1 个请求/10 秒)spring.cloud.stream.pollerspring-doc.cn

查询属性允许按关键字进行查询,并按时间和地理位置筛选结果。twitter.searchspring-doc.cn

和 根据搜索 API 控制结果分页。twitter.search.counttwitter.search.pagespring-doc.cn

注意:Twitter 的搜索服务以及 Search API 并不是详尽的推文来源。并非所有推文都会被编入索引或通过搜索界面提供。spring-doc.cn

5.18.1. 选项

按前缀分组的属性:spring-doc.cn

spring.integration.poller
cron (定时)

Cron 表达式进行轮询。与 'fixedDelay' 和 'fixedRate' 互斥。(字符串,默认值:<none>spring-doc.cn

固定延迟

轮询延迟期。与 'cron' 和 'fixedRate' 互斥。(持续时间,默认值:<无>spring-doc.cn

固定利率

轮询率周期。与 'fixedDelay' 和 'cron' 互斥。(持续时间,默认值:<无>spring-doc.cn

初始延迟

轮询初始延迟。申请了 'fixedDelay' 和 'fixedRate';对于 'cron' 而被忽略。(持续时间,默认值:<无>spring-doc.cn

最大每次轮询消息数

每个轮询周期要轮询的最大消息数。(整数,默认值:<none>spring-doc.cn

接收超时

轮询消息等待多长时间。(持续时间,默认:1 秒spring-doc.cn

推特连接
访问令牌

您的 Twitter 令牌。(字符串,默认值:<none>spring-doc.cn

访问令牌密钥

您的 Twitter 令牌密钥。(字符串,默认值:<none>spring-doc.cn

consumer-key (使用者密钥)

您的 Twitter 密钥。(字符串,默认值:<none>spring-doc.cn

消费者密钥

您的 Twitter 秘密。(字符串,默认值:<none>spring-doc.cn

已启用调试

启用 Twitter4J 调试模式。(布尔值,默认值:falsespring-doc.cn

raw-json 格式

启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认值:truespring-doc.cn

计数

每个页面要返回的推文数(例如,每个请求),最多 100 条。(整数,默认值:100spring-doc.cn

将搜索的推文限制为给定的语言,由 http://en.wikipedia.org/wiki/ISO_639-1 指定。(字符串,默认值:<none>spring-doc.cn

在再次从最近的推文开始搜索之前,要向后搜索(从最新到最早的推文)的页面数(例如请求)。向后搜索的推文总数为 (page * count) (整数,默认值:3spring-doc.cn

查询

按搜索查询字符串搜索推文。(字符串,默认值:<none>spring-doc.cn

从最近空响应重新启动

从最新的 tweets 重新开始搜索 empty response。仅在第一次重启后应用(例如,当 since_id != UNBOUNDED 时)(布尔值,默认值:falsespring-doc.cn

result-type

指定您希望接收的搜索结果类型。当前默认值为 “mixed”。有效值包括:mixed :在响应中同时包含常用结果和实时结果。recent :仅返回响应中的最新结果 popular :仅返回响应中最受欢迎的结果(ResultType,默认值:<none>,可能的值:popular,mixed,recent)spring-doc.cn

因为

如果指定,则返回自给定日期以来的推文。日期的格式应为 YYYY-MM-DD。(字符串,默认值:<none>spring-doc.cn

twitter.search.geocode
纬度

用户的纬度。(Double,默认值:-1spring-doc.cn

经度

用户的经度。(Double,默认值:-1spring-doc.cn

半径

围绕(纬度、经度)点的半径(以公里为单位)。(Double,默认值:-1spring-doc.cn

5.19. Twitter 流源

实时推文流式处理 FilterSample API 支持。spring-doc.cn

  • 返回与一个或多个筛选条件谓词匹配的公有状态。 多个参数允许使用与 Streaming API 的单个连接。 提示:、 、 和 字段与 OR 运算符组合在一起! 查询并返回与 OR 匹配的推文 由 用户 创建。Filter APItrackfollowlocationstrack=foofollow=1234test1234spring-doc.cn

  • 这将返回所有公共状态的小型随机样本。 默认访问级别返回的推文是相同的,因此,如果两个不同的客户端连接到此终端节点,它们将看到相同的推文。Sample APIspring-doc.cn

默认访问级别最多允许 400 个跟踪关键词、5,000 个关注用户 ID 和 25 个 0.1-360 度位置框。spring-doc.cn

5.19.1. 选项

按前缀分组的属性:spring-doc.cn

推特连接
访问令牌

您的 Twitter 令牌。(字符串,默认值:<none>spring-doc.cn

访问令牌密钥

您的 Twitter 令牌密钥。(字符串,默认值:<none>spring-doc.cn

consumer-key (使用者密钥)

您的 Twitter 密钥。(字符串,默认值:<none>spring-doc.cn

消费者密钥

您的 Twitter 秘密。(字符串,默认值:<none>spring-doc.cn

已启用调试

启用 Twitter4J 调试模式。(布尔值,默认值:falsespring-doc.cn

raw-json 格式

启用缓存 Twitter API 返回的原始(原始)JSON 对象。当设置为 False 时,结果将使用 Twitter4J 的 json 表示形式。当设置为 True 时,结果将使用原始的 Twitter APISs json 表示形式。(布尔值,默认值:truespring-doc.cn

推特.stream.filter
计数

指示在过渡到实时流之前要流式传输的先前状态的数量。(整数,默认值:0spring-doc.cn

filter-level (过滤器级别)

筛选条件级别将流中显示的推文限制为具有最小 filterLevel 属性值的推文。none (无)、low (低) 或 medium (中等) 之一。(FilterLevel,默认值:<none>spring-doc.cn

跟随

按 ID 指定要从中接收公共推文的用户。(List<Long>,默认值:<none>spring-doc.cn

语言

指定流的推文语言。(List<String>,默认值:<none>spring-doc.cn

地点

要跟踪的位置。内部表示为 2D 数组。边界框无效:52.38、4.90、51.51、-0.12。第一对必须是框的 SW 角(List<BoundingBox>,默认值:<none>spring-doc.cn

跟踪

指定要跟踪的关键字。(List<String>,默认值:<none>spring-doc.cn

推特
类型

<缺少文档>(StreamType,默认值:<none>,可能的值:sample,filter,firehose,linkspring-doc.cn

5.20. Websocket 源码

通过 Web 套接字生成消息的源。Websocketspring-doc.cn

5.20.1. 选项

按前缀分组的属性:spring-doc.cn

websocket.supplier
允许的来源

允许的源。(字符串,默认值:*spring-doc.cn

路径

公开服务器 WebSocket 处理程序的路径。(字符串,默认值:/websocketspring-doc.cn

websocket.supplier.sock-js
使

在服务器上启用 Sockjs 服务。默认值为 'false' (布尔值,默认值: falsespring-doc.cn

5.20.2. 示例

要验证 websocket-source 是否从 Websocket 客户端接收消息,您可以使用以下简单的端到端设置。spring-doc.cn

第 1 步:启动 Kafka
第 2 步:在特定端口上部署,例如 8080websocket-source
第 3 步:在 8080 端口路径 “/websocket” 上连接一个 websocket 客户端,并发送一些消息。

您可以启动 Kafka 控制台使用者并在其中查看消息。spring-doc.cn

5.21. ZeroMQ 源码

“zeromq” 源允许从 ZeroMQ 接收消息。spring-doc.cn

5.21.1. 输入

不适用spring-doc.cn

5.21.2. 输出

有效载荷

5.21.3. 选项

zeromq 源具有以下选项:spring-doc.cn

zeromq.supplier.bind-port

Bind Port 用于创建 ZeroMQ Socket;0 选择随机端口。(整数,默认值:0spring-doc.cn

zeromq.supplier.connect-url

ZeroMQ 套接字的连接 URL。(字符串,默认值:<none>spring-doc.cn

zeromq.supplier.consume-delay

未收到数据时从 ZeroMQ Socket 消耗的延迟。(持续时间,默认:1 秒spring-doc.cn

zeromq.supplier.socket类型

连接应进行的 Socket Type。(SocketType,默认值:<none>,可能的值:PAIR,PUBSUBREQ,REP,DEALER,ROUTER,PULL,PUSH,XPUB,XSUB,STREAM)spring-doc.cn

zeromq.supplier.topics

要订阅的主题。(String[],默认值:[]spring-doc.cn

另请参阅 Spring Boot 文档,了解代理连接和侦听器属性的其他属性。spring-doc.cn

5.21.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:spring-doc.cn

$ ./mvnw clean package

5.21.5. 示例

java -jar zeromq-source.jar --zeromq.supplier.connectUrl=tcp://server:port --zeromq.supplier.topics=

6. 处理器

6.1. 聚合器处理器

聚合器处理器使应用程序能够将传入消息聚合到组中,并将其发布到输出目标中。spring-doc.cn

java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbcspring-doc.cn

如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。spring-doc.cn

有效载荷

6.1.2. 选项

按前缀分组的属性:spring-doc.cn

聚合
集合体

聚合策略的 SpEL 表达式。默认值是有效负载的集合。(表达式,默认值:<none>spring-doc.cn

相关

相关键的 SPEL 表达式。默认为 correlationId 标头。(表达式,默认值:<none>spring-doc.cn

组超时

超时到过期未完成的组的 SPEL 表达式。(表达式,默认值:<none>spring-doc.cn

消息存储实体

持久性消息存储实体:RDBMS 中的表前缀、MongoDB 中的集合名称等(字符串,默认值:<none>spring-doc.cn

消息存储类型

消息存储类型。(字符串,默认值:<none>spring-doc.cn

释放

用于发布策略的 SPEL 表达式。默认基于 sequenceSize 标头。(表达式,默认值:<none>spring-doc.cn

spring.data.mongodb
身份验证数据库

身份验证数据库名称。(字符串,默认值:<none>spring-doc.cn

自动索引创建

是否启用自动索引创建。(布尔值,默认值:<none>spring-doc.cn

数据库

数据库名称。(字符串,默认值:<none>spring-doc.cn

字段命名策略

要使用的 FieldNamingStrategy 的完全限定名称。(类<?>,默认值:<无>spring-doc.cn

网格 fs-数据库

<缺少文档>(字符串,默认值:<none>spring-doc.cn

主机

Mongo 服务器主机。不能使用 URI 设置。(字符串,默认值:<none>spring-doc.cn

密码

mongo 服务器的登录密码。不能使用 URI 设置。(字符[],默认值:<无>spring-doc.cn

港口

Mongo 服务器端口。不能使用 URI 设置。(整数,默认值:<none>spring-doc.cn

副本集名称

集群所需的副本集名称。不能使用 URI 设置。(字符串,默认值:<none>spring-doc.cn

URI

Mongo 数据库 URI。不能使用 host、port、credentials 和 replica set name 进行设置。(字符串,默认值:mongodb://localhost/testspring-doc.cn

用户名

mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认值:<none>spring-doc.cn

uuid 表示

将 UUID 转换为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认值:java-legacy,可能的值:UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)spring-doc.cn

spring.datasource
出错时继续

初始化数据库时发生错误时是否停止。(布尔值,默认值:falsespring-doc.cn

数据

数据 (DML) 脚本资源引用。(List<String>,默认值:<none>spring-doc.cn

数据密码

用于执行 DML 脚本的数据库密码(如果不同)。(字符串,默认值:<none>spring-doc.cn

数据用户名

执行 DML 脚本的数据库的用户名(如果不同)。(字符串,默认值:<none>spring-doc.cn

驱动程序类名称

JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认值:<none>spring-doc.cn

嵌入式数据库连接

嵌入式数据库的连接详细信息。默认为 Classpath 上可用的最合适的嵌入式数据库。(EmbeddedDatabaseConnection,默认值:<none>,可能的值:NONE,H2,DERBY,HSQL,HSQLDB)spring-doc.cn

生成唯一名称

是否生成随机数据源名称。(布尔值,默认值:truespring-doc.cn

初始化模式

在确定是否应使用可用的 DDL 和 DML 脚本执行 DataSource 初始化时应用的模式。(DataSourceInitializationMode,默认值:embedded,可能的值:ALWAYS,EMBEDDED,NEVERspring-doc.cn

jndi-名称

数据源的 JNDI 位置。设置时,将忽略 Class、url、username 和 password。(字符串,默认值:<none>spring-doc.cn

名字

如果 “generate-unique-name” 为 false,则要使用的数据源名称。使用嵌入式数据库时默认为 “testdb”,否则为 null。(字符串,默认值:<none>spring-doc.cn

密码

数据库的登录密码。(字符串,默认值:<none>spring-doc.cn

平台

在 DDL 或 DML 脚本中使用的平台(例如 schema-${platform}.sql 或 data-${platform}.sql)。(字符串,默认值:allspring-doc.cn

图式

架构 (DDL) 脚本资源引用。(List<String>,默认值:<none>spring-doc.cn

架构密码

用于执行 DDL 脚本的数据库密码(如果不同)。(字符串,默认值:<none>spring-doc.cn

架构用户名

执行 DDL 脚本的数据库的用户名(如果不同)。(字符串,默认值:<none>spring-doc.cn

分隔符

SQL 初始化脚本中的语句分隔符。(字符串,默认值:;spring-doc.cn

sql-script-encoding

SQL 脚本编码。(字符集,默认值:<无>spring-doc.cn

类型

要使用的连接池实现的完全限定名称。默认情况下,它是从 Classpath 中自动检测到的。(类<DataSource>,默认值:<none>spring-doc.cn

网址

数据库的 JDBC URL。(字符串,默认值:<none>spring-doc.cn

用户名

数据库的登录用户名。(字符串,默认值:<none>spring-doc.cn

spring.mongodb.embedded
特征

要启用的功能的逗号分隔列表。默认情况下,使用已配置版本的默认值。(Set<Feature>,默认值:[sync_delay]spring-doc.cn

版本

要使用的 Mongo 版本。(字符串,默认值:3.5.5spring-doc.cn

spring.redis 的
客户端名称

要在使用 CLIENT SETNAME 的连接上设置的客户端名称。(字符串,默认值:<none>spring-doc.cn

客户端类型

要使用的客户端类型。默认情况下,根据 Classpath 自动检测。(ClientType,默认值:<none>,可能的值:LETTUCE,JEDISspring-doc.cn

连接超时

连接超时。(持续时间,默认值:<无>spring-doc.cn

数据库

连接工厂使用的数据库索引。(整数,默认值:0spring-doc.cn

主机

Redis 服务器主机。(字符串,默认值:localhostspring-doc.cn

密码

redis 服务器的登录密码。(字符串,默认值:<none>spring-doc.cn

港口

Redis 服务器端口。(整数,默认值:6379spring-doc.cn

SSL协议

是否启用 SSL 支持。(布尔值,默认值:falsespring-doc.cn

超时

读取超时。(持续时间,默认值:<无>spring-doc.cn

网址

连接 URL。覆盖 host、port 和 password。忽略 User。示例:redis://user:[email protected]:6379 (字符串,默认值:<none>spring-doc.cn

用户名

redis 服务器的登录用户名。(字符串,默认值:<none>spring-doc.cn

6.2. 桥接处理器

一个处理器,通过简单地将传入的有效负载传递给出站来桥接 input 和 output。spring-doc.cn

有效载荷

6.3. Filter处理器

筛选器处理器使应用程序能够检查传入的有效负载,然后对其应用谓词,以决定是否需要继续记录。 例如,如果传入的有效负载是 type 并且您想要过滤掉少于 5 个字符的任何内容,则可以运行过滤器处理器,如下所示。Stringspring-doc.cn

java -jar filter-processor-kafka-<version>.jar --filter.function.expression=payload.length() > 4spring-doc.cn

如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。spring-doc.cn

有效载荷

您可以将任何类型作为有效负载传递,然后对其应用 SpEL 表达式以进行筛选。 如果传入类型为 且内容类型设置为 或 ,则应用程序会将 转换为 。byte[]text/plainapplication/jsonbyte[]Stringspring-doc.cn

6.3.2. 选项

filter.function.expression

要针对要筛选的请求消息应用的布尔 SpEL 表达式。(表达式,默认值:<none>spring-doc.cn

6.4. Groovy 处理器

对消息应用 Groovy 脚本的处理器。spring-doc.cn

6.4.1. 选项

groovy-processor 处理器具有以下选项:spring-doc.cn

groovy-processor.script

对用于处理消息的脚本的引用。(资源,默认值:<无>spring-doc.cn

groovy-processor.variables

变量绑定作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<无>spring-doc.cn

groovy-processor.variables-位置

包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:<无>spring-doc.cn

6.5. Header Enricher 处理器

使用 header-enricher 应用程序添加消息标头。spring-doc.cn

标头以换行分隔的键值对的形式提供,其中键是标头名称,值是 SpEL 表达式。 例如。--headers='foo=payload.someProperty \n bar=payload.otherProperty'spring-doc.cn

6.5.1. 选项

header-enricher 处理器具有以下选项:spring-doc.cn

header.enricher.headers

\n 分隔的属性表示标头,其中的值是 SpEL 表达式,例如 foo='bar' \n baz=payload.baz。(属性,默认值:<无>spring-doc.cn

header.enricher.overwrite

设置为 true 可覆盖任何现有邮件标头。(布尔值,默认值:falsespring-doc.cn

6.6. Http 请求处理器

一个处理器应用程序,它向 HTTP 资源发出请求并将响应正文作为消息负载发出。spring-doc.cn

6.6.1. 输入

任何必需的 HTTP 标头都必须通过 or 属性显式设置。请参阅下面的示例。 标头值也可用于构造:headersheaders-expressionspring-doc.cn

  • 在属性中引用时的请求正文。body-expressionspring-doc.cn

  • 在属性中引用的 HTTP 方法。http-method-expressionspring-doc.cn

  • 在属性中引用时的 URL。url-expressionspring-doc.cn

有效载荷

默认情况下,有效负载用作 POST 请求的请求正文,可以是任何 Java 类型。 对于 GET 请求,它应为空 String。 payload 还可用于构造:spring-doc.cn

  • 在属性中引用时的请求正文。body-expressionspring-doc.cn

  • 在属性中引用的 HTTP 方法。http-method-expressionspring-doc.cn

  • 在属性中引用时的 URL。url-expressionspring-doc.cn

底层 WebClient 支持 Jackson JSON 序列化,以在必要时支持任何请求和响应类型。 默认情况下,该属性可以设置为应用程序类路径中的任何类。 请注意,用户定义的有效负载类型需要向 pom 文件添加所需的依赖项。expected-response-typeString.classspring-doc.cn

6.6.2. 输出

没有 HTTP 消息标头映射到出站消息。spring-doc.cn

有效载荷

原始输出对象是 ResponseEntity<?>它的任何字段(例如、、)或访问器方法 () 都可以作为 . 默认情况下,出站 Message 负载是响应正文。 请注意, ResponseEntity (由表达式引用) 默认情况下不能由 Jackson 反序列化,但可以呈现为 .bodyheadersstatusCodereply-expression#rootHashMapspring-doc.cn

6.6.3. 选项

http-request 处理器有以下选项:spring-doc.cn

6.6.4. 选项

按前缀分组的属性:spring-doc.cn

http.request 请求
身体表情

一个 SPEL 表达式,用于从传入消息派生请求正文。(表达式,默认值:<none>spring-doc.cn

预期响应类型

用于解释响应的类型。(类<?>,默认值:<无>spring-doc.cn

headers-表达式

用于派生要使用的 http 标头映射的 SPEL 表达式。(表达式,默认值:<none>spring-doc.cn

http-method-expression 表达式

一个 SpEL 表达式,用于从传入消息中派生请求方法。(表达式,默认值:<none>spring-doc.cn

回复表达式

用于计算最终结果的 SPEL 表达式,应用于整个 http {@link org.springframework.http.ResponseEntity}。(表达式,默认值:<none>spring-doc.cn

超时

请求超时(以毫秒为单位)。(长,默认值:30000spring-doc.cn

url 表达式

针对传入消息的 SPEL 表达式,用于确定要使用的 URL。(表达式,默认值:<none>spring-doc.cn

spring.codec
日志请求详细信息

是否在 DEBUG 级别记录表单数据,在 TRACE 级别记录标头。(布尔值,默认值:falsespring-doc.cn

最大内存大小

每当需要聚合 input stream 时可以缓冲的字节数的限制。这仅适用于自动配置的 WebFlux 服务器和 WebClient 实例。默认情况下,未设置此项,在这种情况下,将应用单个编解码器的默认值。默认情况下,大多数编解码器限制为 256K。(DataSize,默认值:<none>spring-doc.cn

6.7. 图像识别处理器

使用 Inception 模型进行分类的处理器 在实时图像中分为不同的类别(例如标签)。spring-doc.cn

模型实现了一个深度卷积神经网络,可以在困难的视觉识别任务上实现合理的性能 - 在某些领域(如图像识别)的性能达到或超过人类的表现。spring-doc.cn

模型的输入是二进制数组的图像。spring-doc.cn

输出是以下格式的 JSON 消息:spring-doc.cn

{
  "labels" : [
     {"giant panda":0.98649305}
  ]
}

Result 包含已识别类别的名称(例如 label)以及图像表示此类别的置信度(例如 confidence)。spring-doc.cn

如果 the 设置为大于 1 的值,则结果将包含最可能的标签。例如,将返回:response-seizeresponse-seizeresponse-size=3spring-doc.cn

{
  "labels": [
    {"giant panda":0.98649305},
    {"badger":0.010562794},
    {"ice bear":0.001130851}
  ]
}
有效载荷

如果传入类型为 且内容类型设置为 ,则应用程序会将输入图像处理为并输出增强图像有效负载和 json 标头。byte[]application/octet-streambyte[]byte[]spring-doc.cn

6.7.2. 选项

image.recognition.cache-model

缓存预先训练的 TensorFlow 模型。(布尔值,默认值:truespring-doc.cn

image.recognition.debug-output

<缺少文档> (布尔值,默认值:falsespring-doc.cn

image.recognition.debug-output-path

<缺少文档> (字符串,默认值:image-recognition-result.pngspring-doc.cn

image.recognition.model

预先训练的 TensorFlow 图像识别模型。请注意,模型必须与所选模型类型匹配!(字符串,默认值:https://storage.googleapis.com/mobilenet_v2/checkpoints/mobilenet_v2_1.4_224.tgz#mobilenet_v2_1.4_224_frozen.pbspring-doc.cn

image.recognition.model-type

支持三种不同的预训练 tensorflow 图像识别模型:Inception、MobileNetV1 和 MobileNetV2 1。Inception Graph 使用 'input' 作为输入,使用 'output' 作为输出。2. MobileNetV2 预训练模型:https://github.com/tensorflow/models/tree/master/research/slim/nets/mobilenet#pretrained-models - 标准化图像大小始终为正方形(例如 H=W) - 图形使用“input”作为输入,“MobilenetV2/Predictions/Reshape_1”作为输出。3. MobileNetV1 预训练模型:https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet_v1.md#pre-trained-models - 图形使用“input”作为输入,“MobilenetV1/Predictions/Reshape_1”作为输出。(ModelType,默认值:<none>,可能的值:inception,mobilenetv1,mobilenetv2spring-doc.cn

image.recognition.normalized-image-size

标准化图像大小。(整数,默认值:224spring-doc.cn

image.recognition.response-size

已识别图像的数量。(整数,默认值:5spring-doc.cn

6.8. 对象检测处理器

Object Detection 处理器为 TensorFlow Object Detection API 提供开箱即用的支持。它允许在单个图像或图像流中实时定位和识别多个对象。Object Detection 处理器构建在对象检测功能之上。spring-doc.cn

您必须为 Processor 提供预先训练的对象检测模型和相应的对象标签spring-doc.cn

以下是一些合理的配置默认值:spring-doc.cn

下图显示了一个 Spring Cloud Data Flow,即流式管道,它实时预测输入图像流中的对象类型。spring-doc.cn

SCDF TensorFlow 对象检测架构

处理器的输入是一个图像字节数组,输出是一个增强的图像,以及一个名为 的标头,它提供检测到的对象的文本描述:detected_objectsspring-doc.cn

{
  "labels" : [
     {"name":"person", "confidence":0.9996774,"x1":0.0,"y1":0.3940161,"x2":0.9465165,"y2":0.5592592,"cid":1},
     {"name":"person", "confidence":0.9996604,"x1":0.047891676,"y1":0.03169123,"x2":0.941098,"y2":0.2085562,"cid":1},
     {"name":"backpack", "confidence":0.96534747,"x1":0.15588468,"y1":0.85957795,"x2":0.5091308,"y2":0.9908878,"cid":23},
     {"name":"backpack", "confidence":0.963343,"x1":0.1273736,"y1":0.57658505,"x2":0.47765,"y2":0.6986431,"cid":23}
  ]
}

标头格式为:detected_objectsspring-doc.cn

  • object-nameconfidence - 检测到的对象(例如标签)的人类可读名称,其置信度为 [0-1] 之间的浮点数spring-doc.cn

  • x1y1x2y2 - 响应还提供表示为 的检测到的对象的边界框。坐标是相对于图像大小的大小的。(x1, y1, x2, y2)spring-doc.cn

  • cid - 在提供的标签配置文件中定义的分类标识符。spring-doc.cn

有效载荷

传入类型为 ,内容类型为 。处理器处理输入图像并输出增强图像有效负载和 JSON 标头 ()。byte[]application/octet-streambyte[]byte[]detected_objectsspring-doc.cn

6.8.2. 选项

对象.detection.cache-model

<缺少文档> (布尔值,默认值:truespring-doc.cn

对象.detection.confidence

<缺少文档> (浮点数,默认值:0.4spring-doc.cn

object.detection.debug-output

<缺少文档> (布尔值,默认值:falsespring-doc.cn

object.detection.debug-output-path

<缺少文档> (字符串,默认值:object-detection-result.pngspring-doc.cn

对象.detection.labels

标签 URI。(字符串,默认值:https://storage.googleapis.com/scdf-tensorflow-models/object-detection/mscoco_label_map.pbtxtspring-doc.cn

对象.detection.model

预先训练的 TensorFlow 对象检测模型。(字符串,默认值:https://download.tensorflow.org/models/object_detection/ssdlite_mobilenet_v2_coco_2018_05_09.tar.gz#frozen_inference_graph.pbspring-doc.cn

object.detection.response-size 对象

<缺少文档> (整数,默认值: <none>spring-doc.cn

对象.detection.with-masks

<缺少文档> (布尔值,默认值:falsespring-doc.cn

6.9. 语义分割处理器

基于最先进的 DeepLab Tensorflow 模型的图像语义分割。spring-doc.cn

这是将图像的每个像素与类标签(如花朵、人物、道路、天空、海洋或汽车)相关联的过程。 与生成实例感知区域掩码的 不同,生成类感知掩码的 。 有关实施的信息,请改用对象检测服务Semantic SegmentationInstance SegmentationSemantic SegmentationInstance Segmentationspring-doc.cn

它使用语义分割函数库和 TensorFlow 服务Semantic Segmentation Processorspring-doc.cn

有效载荷

传入类型为 ,内容类型为 。处理器处理输入图像并输出增强图像有效负载和 json 标头。byte[]application/octet-streambyte[]byte[]spring-doc.cn

处理器的输入是一个图像字节数组,输出是一个增强的图像字节数组,以及一个格式为 JSON 标头:semantic_segmentationspring-doc.cn

[
    [ 0, 0, 0 ],
    [ 127, 127, 127 ],
    [ 255, 255, 255 ]
    ...
]

输出标头 json 格式表示从输入图像计算的彩色像素图。spring-doc.cn

6.9.2. 选项

语义.segmentation.color-map-uri

每个预训练模型都基于特定的对象颜色映射。预定义的选项包括: - classpath:/colormap/citymap_colormap.json - classpath:/colormap/ade20k_colormap.json - classpath:/colormap/black_white_colormap.json - classpath:/colormap/mapillary_colormap.json (字符串,默认值:classpath:/colormap/citymap_colormap.jsonspring-doc.cn

语义.segmentation.debug-output

save output image 在本地 debugOutputPath 路径中。(布尔值,默认值:falsespring-doc.cn

semantic.segmentation.debug-output-path

<缺少文档> (字符串,默认值:semantic-segmentation-result.pngspring-doc.cn

semantic.segmentation.mask-transparency

计算的分段蒙版图像的 Alpha 颜色。(浮点数,默认值:0.45spring-doc.cn

语义.segmentation.model

预先训练的 TensorFlow 语义分割模型。(字符串,默认值:https://download.tensorflow.org/models/deeplabv3_mnv2_cityscapes_train_2018_02_05.tar.gz#frozen_inference_graph.pbspring-doc.cn

semantic.segmentation.output-type

指定输出图像类型。您可以返回带有计算的蒙版叠加的输入图像,也可以单独返回蒙版。(OutputType,默认值:<none>,可能的值:blended,maskspring-doc.cn

6.10. 脚本处理器

使用脚本转换消息的处理器。脚本正文是直接提供的 作为属性值。可以指定脚本的语言 (groovy/javascript/ruby/python)。spring-doc.cn

6.10.1. 选项

script-processor 处理器具有以下选项:spring-doc.cn

script-processor.language 语言

script 属性中文本的语言。支持:groovy、javascript、ruby、python。(字符串,默认值:<none>spring-doc.cn

script-processor.脚本

脚本的文本。(字符串,默认值:<none>spring-doc.cn

script-processor.variables

变量绑定作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<无>spring-doc.cn

脚本处理器.变量位置

包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:<无>spring-doc.cn

6.11. 分路器处理器

splitter 应用程序构建在 Spring Integration 中的同名概念之上,并允许将单个消息拆分为多个不同的消息。 处理器使用一个函数,该函数将 a 作为输入,然后根据各种属性生成 as 输出(见下文)。 您可以使用 SPEL 表达式或分隔符来指定要如何拆分传入消息。Message<?>List<Message<?>spring-doc.cn

有效载荷

如果传入类型为 且内容类型设置为 或 ,则应用程序会将 转换为 。byte[]text/plainapplication/jsonbyte[]Stringspring-doc.cn

6.11.2. 选项

splitter.apply-sequence

在 header 中添加关联/序列信息,以方便以后的聚合。(布尔值,默认值:truespring-doc.cn

splitter.charset

将基于文本的文件中的字节转换为 String 时使用的字符集。(字符串,默认值:<none>spring-doc.cn

splitter.delimiters

当 expression 为 null 时,在对 {@link String} 有效负载进行标记时使用的分隔符。(字符串,默认值:<none>spring-doc.cn

splitter.expression

用于拆分有效负载的 SPEL 表达式。(字符串,默认值:<none>spring-doc.cn

splitter.file-markers 文件标记

设置为 true 或 false 以使用包含(或不包含)文件开头/结尾标记的 {@code FileSplitter}(按行拆分基于文本的文件)。(布尔值,默认值:<none>spring-doc.cn

splitter.markers-json 文件

当 'fileMarkers == true' 时,指定它们是应生成为 FileSplitter.FileMarker 对象还是 JSON。(布尔值,默认值:truespring-doc.cn

6.12. 转换处理器

Transformer 处理器允许您根据 SPEL 表达式转换消息有效负载结构。spring-doc.cn

下面是如何运行此应用程序的示例。spring-doc.cn

java -jar transform-processor-kafka-<version>.jar --spel.function.expression=payload.toUpperCase()spring-doc.cn

如果要对 RabbitMQ 运行 kafka,请将 kafka 更改为 rabbit。spring-doc.cn

有效载荷

传入消息可以包含任何类型的有效负载。spring-doc.cn

6.12.2. 选项

spel.function.expression

要应用的 SpEL 表达式。(字符串,默认值:<none>spring-doc.cn

6.13. Twitter 趋势和趋势位置处理器

可以返回热门主题或热门主题的 Locations 的处理器。 该属性 允许 选择查询类型。twitter.trend.trend-query-typespring-doc.cn

对于此模式,设置为 。twitter.trend.trend-query-typetrendspring-doc.cn

基于 Trends API 的处理器。 返回特定纬度、经度位置附近的热门主题spring-doc.cn

6.13.2. 获取趋势位置

对于此模式,设置为 。twitter.trend.trend-query-typetrendLocationspring-doc.cn

按位置检索热门主题的完整列表或附近的位置列表。spring-doc.cn

如果未提供 , 参数,则处理器将执行 Trends Available API 并返回 Twitter 具有其热门主题信息的位置。latitudelongitudespring-doc.cn

如果提供了 , 参数,则处理器将执行 Trends Closest API 并返回 Twitter 具有其热门主题信息、最接近指定位置的位置。latitudelongitudespring-doc.cn

响应是一个数组,用于编码位置的 WOEID 和一些其他人类可读的信息,例如位置所属的规范名称和国家/地区。locationsspring-doc.cn

6.13.3. 选项

按前缀分组的属性:spring-doc.cn

推特.trend.closest
纬度

如果提供 long 参数,则可用的趋势位置将按距离(距离坐标对最近到最远)进行排序。经度的有效范围为 -180.0 到 +180.0(西为负,东为正)。(表达式,默认值:<none>spring-doc.cn

离子

如果提供了 lat 参数,则可用的趋势位置将按距离(从最近到最远的距离)进行排序。经度的有效范围为 -180.0 到 +180.0(西为负,东为正)。(表达式,默认值:<none>spring-doc.cn

推特
位置 ID

雅虎Where On Earth ID 要返回其趋势信息的位置。全局信息可通过使用 1 作为 WOEID 来获得。(表达式,默认值:payloadspring-doc.cn

趋势查询类型

<缺少文档>(TrendQueryType,默认值:<none>,可能的值:trend,trendLocationspring-doc.cn

7. 水槽

7.1. Cassandra 接收器

此接收器应用程序将其收到的每条消息的内容写入 Cassandra。spring-doc.cn

它需要 JSON String 的有效负载,并使用其属性映射到表列。spring-doc.cn

有效载荷

一个 JSON 字符串或字节数组,表示要持久保存的实体(或实体列表)。spring-doc.cn

7.1.2. 选项

cassandra sink 具有以下选项:spring-doc.cn

spring.cassandra.compression

Cassandra 二进制协议支持的压缩。(压缩,默认值:none,可能的值:LZ4,SNAPPY,NONE) spring-doc.cn

spring.cassandra.config

要使用的配置文件的位置。(资源,默认值:<无>spring-doc.cn

spring.cassandra.contact-points

群集节点地址采用 'host:port' 格式,或简单的 'host' 以使用配置的端口。(List<String>,默认值:[127.0.0.1:9042]spring-doc.cn

spring.cassandra.keyspace-名称

要使用的 Keyspace 名称。(字符串,默认值:<none>spring-doc.cn

spring.cassandra.local-datacenter 的

被视为 “local” 的数据中心。联系点应来自此数据中心。(字符串,默认值:<none>spring-doc.cn

spring.cassandra.password

服务器的登录密码。(字符串,默认值:<none>spring-doc.cn

spring.cassandra.port

如果联系点未指定端口,则要使用的端口。(整数,默认值:9042spring-doc.cn

spring.cassandra.schema-action

启动时要采取的架构操作。(字符串,默认值:spring-doc.cn

spring.cassandra.session-name

Cassandra 会话的名称。(字符串,默认值:<none>spring-doc.cn

spring.cassandra.ssl 中

启用 SSL 支持。(布尔值,默认值:falsespring-doc.cn

spring.cassandra.username

服务器的登录用户。(字符串,默认值:<none>spring-doc.cn

7.2. Analytics 接收器

Sink 应用程序,构建在 Analytics Consumer 之上,用于计算输入消息的分析,并将分析作为指标发布到各种监控系统。它利用千分尺库在最流行的监控系统中提供统一的编程体验,并公开 Spring 表达式语言 (SpEL) 属性,用于定义如何从输入数据计算指标 Name、Values 和 Tags。spring-doc.cn

分析接收器可以生成两种指标类型:spring-doc.cn

  • 计数器 - 报告一个指标,即一个计数,该指标以固定的正量递增。计数器可用于计算数据随时间变化的速率。spring-doc.cn

  • 仪表 - 报告当前值。仪表的典型示例是集合或 map 的大小,或者处于 Running 状态的线程数。spring-doc.cn

仪表(例如 Counter 或 Gauge)由其 和 唯一标识(术语 dimensions 和 tags 可互换使用)。维度允许对特定的命名量度进行切片,以便向下钻取和推理数据。namedimensionsspring-doc.cn

由于量度由其 和 唯一标识,因此您可以为每个量度分配多个标签(e.g. key/值对),但之后无法随机更改这些标签!如果具有相同名称的指标具有不同的标签集,Prometheus 等监控系统将会抱怨。namedimensions

使用 or 属性设置输出分析指标的名称。如果未设置,则指标名称默认为应用程序名称。analytics.nameanalytics.name-expressionspring-doc.cn

使用 , 属性向量度添加一个或多个标签。在属性定义中使用的将在量度中显示为标记名称。TAG_VALUE 是一个表达式,用于动态计算传入消息的标签值。analytics.tag.expression.<TAG_NAME>=<TAG_VALUE>TAG_NAMESpELspring-doc.cn

表达式使用 and 关键字来访问消息的标头和有效负载值。SpELheaderspayloadspring-doc.cn

你可以使用 literals (例如 ) 来设置具有固定值的标签。'fixed value'

所有 Stream 应用程序都支持三种最流行的监控系统,并且您可以通过声明方式启用它们。 您只需将 micrometer meter-registry 依赖项添加到应用程序中,即可添加对其他监控系统的支持。WavefrontPrometheusInfluxDBAnalytics Sinkspring-doc.cn

请访问 Spring Cloud 数据流流监控,了解有关配置监控系统的详细说明。以下快速代码段可以帮助您开始。spring-doc.cn

  • 要启用 Prometheus 计量注册表,请设置以下属性。spring-doc.cn

management.metrics.export.prometheus.enabled=true
management.metrics.export.prometheus.rsocket.enabled=true
management.metrics.export.prometheus.rsocket.host=<YOUR PROMETHEUS-RSOKET PROXI URI
management.metrics.export.prometheus.rsocket.port=7001
  • 要启用 Wavefront 仪表注册表,请设置以下属性。spring-doc.cn

management.metrics.export.wavefront.enabled=true
management.metrics.export.wavefront.api-token=YOUR WAVEFRONT KEY
management.metrics.export.wavefront.uri=YOUR WAVEFRONT URI
management.metrics.export.wavefront.source=UNIQUE NAME TO IDENTIFY YOUR APP
  • 要启用 InfluxDB 计量注册表,请设置以下属性。spring-doc.cn

management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
如果启用了 Data Flow Server Monitoring,则将重复使用提供的指标配置。Analytics Sink

下图说明了如何帮助收集股票交易业务内部的实时管道。Analytics Sinkspring-doc.cn

Analytics Architecture
有效载荷

传入消息可以包含任何类型的有效负载。spring-doc.cn

7.2.2. 选项

按前缀分组的属性:spring-doc.cn

分析学
金额表达式

用于计算输出度量值(例如 amount)的 SPEL 表达式。它默认为 1.0(表达式,默认值:<none>spring-doc.cn

米型

Micrometer meter 类型,用于向后端报告指标。(MeterType,默认值:<none>,可能的值:counter,gaugespring-doc.cn

名字

输出指标的名称。'name' 和 'nameExpression' 是互斥的。只能设置其中一个。(字符串,默认值:<none>spring-doc.cn

名称表达式

一个 SPEL 表达式,用于计算输入消息的输出指标名称。'name' 和 'nameExpression' 是互斥的。只能设置其中一个。(表达式,默认值:<none>spring-doc.cn

analytics.tag
表达

从 SpEL 表达式计算标签。单个 SPEL 表达式可以生成一个值数组,这反过来意味着不同的名称/值标签。每个 name/value 标签都会产生一个单独的计量增量。标签表达式格式为:analytics.tag.expression。[tag-name]=[SPEL 表达式](Map<String,表达式>,默认值:<none>spring-doc.cn

固定

已弃用:请将 analytics.tag.expression 与文本 SPEL 表达式一起使用。自定义、固定的 Tags。这些标签具有常量值,创建一次,然后与每个发布的指标一起发送。定义固定标记的约定为:<code> analytics.tag.fixed。[标签名称]=[标签值] </code> (Map<String, String>,默认值:<无>spring-doc.cn

7.3. Elasticsearch 接收器

Sink,用于将文档索引到 Elasticsearch 中。spring-doc.cn

此 Elasticsearch 接收器仅支持为 JSON 文档编制索引。 它使用来自输入目标的数据,然后将其索引到 Elasticsearch。 输入数据可以是纯 json 字符串,也可以是表示 JSON 的 a。 它还接受 Elasticsearch 提供的数据。 但是,这种情况很少见,因为中间件不太可能将记录保存为 。 这主要用于消费者的直接调用。java.util.MapXContentBuilderXContentBuilderspring-doc.cn

7.3.1. 选项

Elasticsearch 接收器具有以下选项:spring-doc.cn

elasticsearch.consumer.async

指示索引操作是否为异步操作。默认情况下,索引是同步完成的。(布尔值,默认值:falsespring-doc.cn

elasticsearch.consumer.batch-size

每个请求要编制索引的项目数。它默认为 1。对于大于 1 的值,将使用批量索引 API。(整数,默认值:1spring-doc.cn

elasticsearch.consumer.group-timeout

超时(以毫秒为单位),当批量索引处于活动状态时,将刷新消息组。它默认为 -1,这意味着不会自动刷新空闲消息组。(长整型,默认值:-1spring-doc.cn

elasticsearch.consumer.id

要编制索引的文档的 ID。如果设置,则 INDEX_ID Headers 值将基于每条消息覆盖此属性。(表达式,默认值:<none>spring-doc.cn

elasticsearch.consumer.index

索引的名称。如果设置,则 INDEX_NAME Headers 值将基于每条消息覆盖此属性。(字符串,默认值:<none>spring-doc.cn

elasticsearch.consumer.routing

指示要路由到的分片。如果未提供,Elasticsearch 将默认使用文档 ID 的哈希值。(字符串,默认值:<none>spring-doc.cn

elasticsearch.consumer.timeout-seconds

分片可用的超时时间。如果未设置,则默认为 Elasticsearch 客户端设置的 1 分钟。(长整型,默认值:0spring-doc.cn

7.3.2. 运行此 sink 的示例

  1. 从文件夹 :elasticsearch-sink./mvnw clean packagespring-doc.cn

  2. CD 应用程序spring-doc.cn

  3. cd 复制到适当的 Binder 生成的应用程序(Kafka 或 RabbitMQ)spring-doc.cn

  4. ./mvnw clean packagespring-doc.cn

  5. 确保您正在运行 Elasticsearch。例如,您可以使用以下命令将其作为 docker 容器运行。docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2spring-doc.cn

  6. 如果中间件(Kafka 或 RabbitMQ)尚未运行,请启动它。spring-doc.cn

  7. java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testingspring-doc.cn

  8. 将一些 JSON 数据发送到中间件目标。例如:{"foo":"bar"}spring-doc.cn

  9. 验证数据是否已编制索引:curl localhost:9200/testing/_searchspring-doc.cn

7.4. 文件接收器

文件接收器应用将其收到的每条消息写入文件。spring-doc.cn

有效载荷

7.4.2. 选项

有以下选项:file-sinkspring-doc.cn

文件.consumer.binary

一个标志,用于指示是否应禁止在写入后添加换行符。(布尔值,默认值:falsespring-doc.cn

文件.consumer.charset

编写文本内容时使用的字符集。(字符串,默认值:UTF-8spring-doc.cn

文件.consumer.directory

目标文件的父目录。(文件,默认值:<无>spring-doc.cn

文件.consumer.directory-expression

要为目标文件的父目录计算的表达式。(字符串,默认值:<none>spring-doc.cn

文件.consumer.mode

如果目标文件已存在,则要使用的 FileExistsMode。(FileExistsMode,默认值:<none>,可能的值:APPEND APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIEDspring-doc.cn

file.consumer.name

目标文件的名称。(字符串,默认值:file-consumerspring-doc.cn

file.consumer.name-表达式

要计算目标文件名称的表达式。(字符串,默认值:<none>spring-doc.cn

file.consumer.suffix

要附加到文件名的后缀。(字符串,默认值:<空字符串>spring-doc.cn

7.5. FTP 接收器

FTP 接收器是将文件从传入邮件推送到 FTP 服务器的简单选项。spring-doc.cn

它使用 ,因此传入消息可以是对象、(文件内容) 或 (file content 的数组)。ftp-outbound-adapterjava.io.FileStringbytesspring-doc.cn

要使用此接收器,您需要用户名和密码才能登录。spring-doc.cn

默认情况下,如果未指定任何名称,则 Spring Integration 将使用。 将确定文件名 根据 中的标头值(如果存在),或者如果 的有效负载已经是 ,则它将 使用该文件的原始名称。o.s.i.file.DefaultFileNameGeneratorDefaultFileNameGeneratorfile_nameMessageHeadersMessagejava.io.File
有效载荷

7.5.3. 输出

N/A (写入 FTP 服务器)。spring-doc.cn

7.5.4. 选项

ftp 接收器有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

ftp.consumer 的
自动创建目录

是否创建远程目录。(布尔值,默认值:truespring-doc.cn

filename-expression 文件名

用于生成远程文件名的 SPEL 表达式。(字符串,默认值:<none>spring-doc.cn

模式

如果远程文件已存在,则要执行的操作。(FileExistsMode,默认值:<none>,可能的值:APPEND APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIEDspring-doc.cn

远程目录

远程 FTP 目录。(字符串,默认值:/spring-doc.cn

远程文件分隔符

远程文件分隔符。(字符串,默认值:/spring-doc.cn

临时远程目录

如果 '#isUseTemporaryFilename()' 为 true,则将写入文件的临时目录。(字符串,默认值:/spring-doc.cn

tmp-file-suffix 文件后缀

传输过程中要使用的后缀。(字符串,默认值:.tmpspring-doc.cn

使用临时文件名

是否写入临时文件并重命名。(布尔值,默认值:truespring-doc.cn

ftp.factory
缓存会话

缓存会话。(布尔值,默认值:<none>spring-doc.cn

客户端模式

用于 FTP 会话的客户端模式。(ClientMode,默认值:<none>,可能的值:ACTIVE,PASSIVEspring-doc.cn

主机

服务器的主机名。(字符串,默认值:localhostspring-doc.cn

密码

用于连接到服务器的密码。(字符串,默认值:<none>spring-doc.cn

港口

服务器的端口。(整数,默认值:21spring-doc.cn

用户名

用于连接到服务器的用户名。(字符串,默认值:<none>spring-doc.cn

sinks.adoc 中未解析的指令 - include::https://raw.githubusercontent.com/spring-cloud/stream-applications/main/applications/sink/geode-sink/README.adoc[tags=ref-doc]spring-doc.cn

7.6. JDBC 接收器

JDBC sink 允许您将传入的有效负载持久化到 RDBMS 数据库中。spring-doc.cn

该属性表示 where (连同冒号) 是可选的成对。 在这种情况下,值是通过生成的表达式(如 )计算的,因此这样我们就可以从对象属性直接映射到表列。 例如,我们有一个 JSON 有效负载,如下所示:jdbc.consumer.columnsCOLUMN_NAME[:EXPRESSION_FOR_VALUE]EXPRESSION_FOR_VALUEpayload.COLUMN_NAMEspring-doc.cn

{
  "name": "My Name",
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

因此,我们可以使用 ,并使用配置将其插入到表中:namecitystreetspring-doc.cn

--jdbc.consumer.columns=name,city:address.city,street:address.street

只要底层 JDBC 驱动程序支持,此接收器就支持批量插入。 批量插入通过 和 属性进行配置: 传入消息将聚合,直到消息出现,然后作为批处理插入。 如果毫秒过去了,没有新消息,则即使聚合批处理小于 ,也会插入聚合批处理,从而限制最大延迟。batch-sizeidle-timeoutbatch-sizeidle-timeoutbatch-sizespring-doc.cn

该模块还使用 Spring Boot 的DataSource支持来配置数据库连接,因此诸如etc.之类的属性适用。spring.datasource.url

7.6.1. 示例

java -jar jdbc-sink.jar --jdbc.consumer.tableName=names --jdbc.consumer.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
有效载荷

7.6.2. 选项

jdbc sink 具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

jdbc.consumer
批量大小

将数据刷新到数据库表的消息数阈值。(整数,默认值:1spring-doc.cn

逗号分隔基于冒号的列名对和要插入/更新的值的 SPEL 表达式。在初始化时使用名称来颁发 DDL。(字符串,默认值:payload:payload.toString()spring-doc.cn

空闲超时

数据自动刷新到数据库表时的空闲超时(以毫秒为单位)。(长整型,默认值:-1spring-doc.cn

初始化

'true'、'false' 或表的自定义初始化脚本的位置。(字符串,默认值:falsespring-doc.cn

表名

要写入的表的名称。(字符串,默认值:messagesspring-doc.cn

spring.datasource
驱动程序类名称

JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认值:<none>spring-doc.cn

密码

数据库的登录密码。(字符串,默认值:<none>spring-doc.cn

网址

数据库的 JDBC URL。(字符串,默认值:<none>spring-doc.cn

用户名

数据库的登录用户名。(字符串,默认值:<none>spring-doc.cn

7.7. 日志接收器

sink 使用应用程序记录器输出数据以供检查。logspring-doc.cn

请理解 sink 使用无类型的处理程序,这会影响实际日志记录的执行方式。 这意味着,如果 content-type 是 textual,则原始有效负载字节将转换为 String,否则将记录原始字节。 请参阅用户指南中的更多信息。logspring-doc.cn

7.7.1. 选项

日志接收器具有以下选项:spring-doc.cn

log.expression

一个 SPEL 表达式(针对传入消息),用于评估为记录的消息。(字符串,默认值:payloadspring-doc.cn

log.level 级别

记录消息的级别。(级别,默认值:<none>,可能的值:FATAL,ERROR,WARN,INFO,DEBUG,TRACEspring-doc.cn

log.name

要使用的 Logger 的名称。(字符串,默认值:<none>spring-doc.cn

7.8. MongoDB 接收器

此接收器应用程序将传入数据提取到 MongoDB 中。 此应用程序完全基于 ,因此请参阅 Spring Boot MongoDB 支持以获取更多信息。MongoDataAutoConfigurationspring-doc.cn

7.8.1. 输入

有效载荷

7.8.2. 选项

mongodb sink 具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

mongodb.consumer
收集

用于存储数据的 MongoDB 集合。(字符串,默认值:<none>spring-doc.cn

集合表达式

用于评估 MongoDB 集合的 SPEL 表达式。(表达式,默认值:<none>spring-doc.cn

spring.data.mongodb
附加主机

其他服务器主机。不能使用 URI 设置,或者如果未指定 'host' 。其他主机将使用默认的 mongo 端口 27017,如果您想使用不同的端口,可以使用 “host:port” 语法。(List<String>,默认值:<none>spring-doc.cn

身份验证数据库

身份验证数据库名称。(字符串,默认值:<none>spring-doc.cn

自动索引创建

是否启用自动索引创建。(布尔值,默认值:<none>spring-doc.cn

数据库

数据库名称。(字符串,默认值:<none>spring-doc.cn

字段命名策略

要使用的 FieldNamingStrategy 的完全限定名称。(类<?>,默认值:<无>spring-doc.cn

主机

Mongo 服务器主机。不能使用 URI 设置。(字符串,默认值:<none>spring-doc.cn

密码

mongo 服务器的登录密码。不能使用 URI 设置。(字符[],默认值:<无>spring-doc.cn

港口

Mongo 服务器端口。不能使用 URI 设置。(整数,默认值:<none>spring-doc.cn

副本集名称

集群所需的副本集名称。不能使用 URI 设置。(字符串,默认值:<none>spring-doc.cn

URI

Mongo 数据库 URI。覆盖主机、端口、用户名、密码和数据库。(字符串,默认值:mongodb://localhost/testspring-doc.cn

用户名

mongo 服务器的登录用户。不能使用 URI 设置。(字符串,默认值:<none>spring-doc.cn

uuid 表示

将 UUID 转换为 BSON 二进制值时使用的表示形式。(UuidRepresentation,默认值:java-legacy,可能的值:UNSPECIFIED,STANDARD,C_SHARP_LEGACY,JAVA_LEGACY,PYTHON_LEGACY)spring-doc.cn

7.9. MQTT 接收器

此模块将消息发送到 MQTT。spring-doc.cn

有效载荷:

7.9.2. 选项

mqtt sink 有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

MQTT 协议
clean-session

客户端和服务器是否应在重启和重新连接时记住状态。(布尔值,默认值:truespring-doc.cn

连接超时

连接超时(以秒为单位)。(整数,默认值:30spring-doc.cn

保持活动间隔

ping 间隔(以秒为单位)。(整数,默认值:60spring-doc.cn

密码

连接到代理时使用的密码。(字符串,默认值:guestspring-doc.cn

坚持

'memory' 或 'file'。(字符串,默认值:memoryspring-doc.cn

持久性目录

Persistence 目录。(字符串,默认值:/tmp/pahospring-doc.cn

SSL 属性

MQTT 客户端 SSL 属性。(Map<String, String>,默认值:<none>spring-doc.cn

网址

MQTT 代理的位置(逗号分隔的列表)。(String[],默认值:[tcp://localhost:1883]spring-doc.cn

用户名

连接到 broker 时要使用的用户名。(字符串,默认值:guestspring-doc.cn

mqtt.consumer
异步

是否使用 async sends。(布尔值,默认值:falsespring-doc.cn

字符集

用于将 String 有效负载转换为 byte[] 的字符集。(字符串,默认值:UTF-8spring-doc.cn

客户端 ID

标识客户端。(字符串,默认:stream.client.id.sinkspring-doc.cn

QoS

要使用的服务质量。(整数,默认值:1spring-doc.cn

保留

是否设置 'retained' 标志。(布尔值,默认值:falsespring-doc.cn

主题

接收器将发布到的主题。(字符串,默认:stream.mqttspring-doc.cn

7.10. pgcopy 接收器

使用 PostgreSQL COPY 命令将其传入负载写入 RDBMS 的模块。spring-doc.cn

7.10.1. 输入

有效载荷

Column expression 将根据消息进行评估,并且表达式通常只与一种类型(例如 Map 或 bean 等)兼容。spring-doc.cn

7.10.2. 输出

不适用spring-doc.cn

7.10.3. 选项

jdbc sink 具有以下选项:spring-doc.cn

spring.datasource.driver-class-name

JDBC 驱动程序的完全限定名称。默认情况下,根据 URL 自动检测。(字符串,默认值:<none>spring-doc.cn

spring.datasource.password

数据库的登录密码。(字符串,默认值:<none>spring-doc.cn

spring.datasource.url

数据库的 JDBC URL。(字符串,默认值:<none>spring-doc.cn

spring.datasource.username

数据库的登录用户名。(字符串,默认值:<none>spring-doc.cn

该模块还使用 Spring Boot 的DataSource支持来配置数据库连接,因此诸如etc.之类的属性适用。spring.datasource.url

7.10.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于 Binder 的项目。 然后,你可以 cd 到其中一个文件夹并构建它:spring-doc.cn

$ ./mvnw clean package

要运行集成测试,请在 localhost 上启动 PostgreSQL 数据库:spring-doc.cn

    docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest

7.10.5. 示例

java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test

7.11. RabbitMQ 接收器

该模块向 RabbitMQ 发送消息。spring-doc.cn

7.11.1. 选项

rabbit sink 具有以下选项:spring-doc.cn

(有关 RabbitMQ 连接属性,请参阅 Spring Boot 文档)spring-doc.cn

按前缀分组的属性:spring-doc.cn

转换器 bean 名称

自定义消息转换器的 Bean 名称;如果省略,则使用SimpleMessageConverter。如果为 'jsonConverter',将为您创建一个 Jackson2JsonMessageConverter bean。(字符串,默认值:<none>spring-doc.cn

交换

Exchange name - 如果提供,则由 exchangeNameExpression 覆盖。(字符串,默认值:<空字符串>spring-doc.cn

交换表达式

计算结果为 exchange 名称的 SPEL 表达式。(表达式,默认值:<none>spring-doc.cn

标头映射的最后一个

在映射出站消息的报头时,请确定报头是在转换邮件之前还是之后映射的。(布尔值,默认值:truespring-doc.cn

mapped-request-headers 请求标头

将要映射的标头。(String[],默认值:[*]spring-doc.cn

own-connection

如果为 true,则根据引导属性使用单独的连接。(布尔值,默认值:falsespring-doc.cn

持久传递模式

当 'amqp_deliveryMode' 标头不存在时的默认传递模式,对于 PERSISTENT,则为 true。(布尔值,默认值:falsespring-doc.cn

路由密钥

路由密钥 - 如果提供,则由 routingKeyExpression 覆盖。(字符串,默认值:<none>spring-doc.cn

路由密钥表达式

计算结果为路由密钥的 SPEL 表达式。(表达式,默认值:<none>spring-doc.cn

spring.rabbitmq 的
address-shuffle-mode 地址随机模式

用于对配置的地址进行随机排序的模式。(AddressShuffleMode,默认值:none,可能的值:NONE,RANDOM,INORDER)spring-doc.cn

地址

客户端应连接到的地址的逗号分隔列表。设置后,将忽略 host 和 port。(字符串,默认值:<none>spring-doc.cn

通道 rpc-timeout

通道中 RPC 调用的继续超时。将其设为零可永久等待。(持续时间,默认:10 分钟spring-doc.cn

连接超时

连接超时。将其设为零可永久等待。(持续时间,默认值:<无>spring-doc.cn

主机

RabbitMQ 主机。如果设置了地址,则忽略。(字符串,默认值:localhostspring-doc.cn

密码

登录以对代理进行身份验证。(字符串,默认值:guestspring-doc.cn

港口

RabbitMQ 端口。如果设置了地址,则忽略。默认为 5672,如果启用了 SSL,则为 5671。(整数,默认值:<none>spring-doc.cn

publisher-confirm-type

确认使用的发布者类型。(ConfirmType,默认值:<none>,可能的值:SIMPLE,CORRELATED,NONE)spring-doc.cn

publisher-returns

是否启用发布者退货。(布尔值,默认值:falsespring-doc.cn

请求通道最大值

客户端请求的每个连接的通道数。使用 0 表示无限制。(整数,默认值:2047spring-doc.cn

请求的心跳

请求的检测信号超时;零表示无。如果未指定 duration 后缀,则将使用秒。(持续时间,默认值:<无>spring-doc.cn

用户名

登录用户以向 broker 进行身份验证。(字符串,默认值:guestspring-doc.cn

虚拟主机

连接到 broker 时使用的虚拟主机。(字符串,默认值:<none>spring-doc.cn

7.12. Redis 接收器

向 Redis 发送消息。spring-doc.cn

7.12.1. 选项

redis sink 具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

redis.consumer
钥匙

存储到键时使用的文本键名称。(字符串,默认值:<none>spring-doc.cn

键表达式

用于存储到键的 SPEL 表达式。(字符串,默认值:<none>spring-doc.cn

队列

在队列中存储时使用的文本队列名称。(字符串,默认值:<none>spring-doc.cn

队列表达式

用于队列的 SpEL 表达式。(字符串,默认值:<none>spring-doc.cn

主题

发布到主题时要使用的文本主题名称。(字符串,默认值:<none>spring-doc.cn

主题表达式

用于主题的 SPEL 表达式。(字符串,默认值:<none>spring-doc.cn

spring.data.redis 的
客户端名称

要在使用 CLIENT SETNAME 的连接上设置的客户端名称。(字符串,默认值:<none>spring-doc.cn

客户端类型

要使用的客户端类型。默认情况下,根据 Classpath 自动检测。(ClientType,默认值:<none>,可能的值:LETTUCE,JEDISspring-doc.cn

连接超时

连接超时。(持续时间,默认值:<无>spring-doc.cn

数据库

连接工厂使用的数据库索引。(整数,默认值:0spring-doc.cn

主机

Redis 服务器主机。(字符串,默认值:localhostspring-doc.cn

密码

redis 服务器的登录密码。(字符串,默认值:<none>spring-doc.cn

港口

Redis 服务器端口。(整数,默认值:6379spring-doc.cn

SSL协议

是否启用 SSL 支持。(布尔值,默认值:falsespring-doc.cn

超时

读取超时。(持续时间,默认值:<无>spring-doc.cn

网址

连接 URL。覆盖 host、port 和 password。忽略 User。示例:redis://user:[email protected]:6379 (字符串,默认值:<none>spring-doc.cn

用户名

redis 服务器的登录用户名。(字符串,默认值:<none>spring-doc.cn

spring.data.redis.jedis.pool
启用

是否启用池。如果“commons-pool2”可用,则自动启用。使用 Jedis,在 Sentinel 模式下隐式启用池化,此设置仅适用于单节点设置。(布尔值,默认值:<none>spring-doc.cn

最大活动

池在给定时间可分配的最大连接数。使用负值表示无限制。(整数,默认值:8spring-doc.cn

max-idle (最大空闲)

池中 “空闲” 连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认值:8spring-doc.cn

最大等待

当池耗尽时,连接分配在引发异常之前应阻止的最长时间。使用负值可无限期阻止。(持续时间,默认值:-1msspring-doc.cn

最小空闲

目标是要在池中维护的最小空闲连接数。仅当 it 和驱逐运行之间的时间均为正数时,此设置才有效。(整数,默认值:0spring-doc.cn

驱逐运行之间的时间

空闲对象 evictor 线程运行之间的时间。当为正数时,空闲对象驱逐线程将启动,否则不执行空闲对象驱逐。(持续时间,默认值:<无>spring-doc.cn

spring.data.redis.lettuce.pool
启用

是否启用池。如果“commons-pool2”可用,则自动启用。使用 Jedis,在 Sentinel 模式下隐式启用池化,此设置仅适用于单节点设置。(布尔值,默认值:<none>spring-doc.cn

最大活动

池在给定时间可分配的最大连接数。使用负值表示无限制。(整数,默认值:8spring-doc.cn

max-idle (最大空闲)

池中 “空闲” 连接的最大数量。使用负值表示无限数量的空闲连接。(整数,默认值:8spring-doc.cn

最大等待

当池耗尽时,连接分配在引发异常之前应阻止的最长时间。使用负值可无限期阻止。(持续时间,默认值:-1msspring-doc.cn

最小空闲

目标是要在池中维护的最小空闲连接数。仅当 it 和驱逐运行之间的时间均为正数时,此设置才有效。(整数,默认值:0spring-doc.cn

驱逐运行之间的时间

空闲对象 evictor 线程运行之间的时间。当为正数时,空闲对象驱逐线程将启动,否则不执行空闲对象驱逐。(持续时间,默认值:<无>spring-doc.cn

spring.data.redis.sentinel
主人

Redis 服务器的名称。(字符串,默认值:<none>spring-doc.cn

节点

以逗号分隔的 “host:port” 对列表。(List<String>,默认值:<none>spring-doc.cn

密码

用于使用 Sentinel 进行身份验证的密码。(字符串,默认值:<none>spring-doc.cn

用户名

用于向 Sentinel 进行身份验证的登录用户名。(字符串,默认值:<none>spring-doc.cn

7.13. 路由器接收器

此应用程序将消息路由到命名通道。spring-doc.cn

7.13.1. 选项

router sink 有以下选项:spring-doc.cn

router.default-output-channel

将不可路由的消息发送到何处。(字符串,默认值:nullChannelspring-doc.cn

router.destination 映射

目标映射为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<无>spring-doc.cn

router.expression 的

要应用于消息的表达式,以确定要路由到的通道。请注意,文本、json 或 xml 等内容类型的负载连线格式是 byte[] 而不是 String!。有关如何处理字节数组有效负载内容的文档。(表达式,默认值:<none>spring-doc.cn

router.refresh-delay

检查脚本更改的频率(以毫秒为单位)(如果存在);< 0 表示不刷新。(整数,默认值:60000spring-doc.cn

router.resolution-required

是否需要通道分辨率。(布尔值,默认值:falsespring-doc.cn

router.script 的

返回通道或通道映射解析键的 groovy 脚本的位置。(资源,默认值:<无>spring-doc.cn

router.variables 的

变量绑定作为名称-值对的新行分隔字符串,例如 'foo=bar\n baz=car'。(属性,默认值:<无>spring-doc.cn

router.variables-位置

包含自定义脚本变量绑定的属性文件的位置。(资源,默认值:<无>spring-doc.cn

由于这是动态路由器,因此会根据需要创建目标;因此,默认情况下,仅当 绑定到目标时出现问题时,才会使用 and。defaultOutputChannelresolutionRequiredBinder

您可以使用 该属性限制动态绑定的创建。 默认情况下,所有解析的目标都将动态绑定;如果此属性具有逗号分隔的 destination names,则只会绑定这些名称。 解析到不在此列表中的目标的邮件将被路由到 ,该 也必须显示在列表中。spring.cloud.stream.dynamicDestinationsdefaultOutputChannelspring-doc.cn

destinationMappings用于将评估结果映射到实际目标名称。spring-doc.cn

7.13.2. 基于 SPEL 的路由

该表达式根据消息进行计算,并返回通道名称或通道名称映射的键。spring-doc.cn

有关更多信息,请参阅 Spring 中的“路由器和 Spring 表达式语言 (SpEL)”小节 集成参考手册 配置通用路由器 部分。spring-doc.cn

从 Spring Cloud Stream 2.0 开始,和内容类型的消息线格式不是! 这是对 SCSt 1.x 的更改,它将这些类型视为字符串! 根据内容类型,可以使用不同的技术来处理有效负载。对于纯内容类型,可以使用 SpEL 表达式将八位字节有效负载转换为字符串。对于类型,jsonPath() SpEL 实用程序 已经支持可互换的字符串和字节数组内容。这同样适用于内容类型和 #xpath() SPEL 实用程序。jsontextxmlbyte[]Stringbyte[]textnew String(payload)jsonxml

例如,对于内容类型,应使用:textspring-doc.cn

 new String(payload).contains('a');

对于内容类型 SpEL 表达式,如下所示:jsonspring-doc.cn

 #jsonPath(payload, '$.person.name')

7.13.3. 基于 Groovy 的路由

还可以使用 Groovy 脚本代替 SpEL 表达式。让我们在文件系统中创建一个 Groovy 脚本,网址为 “file:/my/path/router.groovy”,或 “classpath:/my/path/router.groovy”:spring-doc.cn

println("Groovy processing payload '" + payload + "'")
if (payload.contains('a')) {
    return "foo"
}
else {
    return "bar"
}

如果要将变量值传递给脚本,可以使用 variables 选项或 (可选)使用 propertiesLocation 选项将路径传递给包含绑定的 properties 文件。 文件中的所有属性都将作为变量提供给脚本。您可以同时指定 variablespropertiesLocation,在这种情况下,作为变量提供的任何重复值都会覆盖 propertiesLocation 中提供的值。 请注意,payloadheaders 是隐式绑定的,以允许您访问消息中包含的数据。spring-doc.cn

有关更多信息,请参阅 Spring 集成参考手册 Groovy 支持spring-doc.cn

7.14. RSocket 接收器

RSocket sink 使用 RSocket 协议的 fire and forget 策略发送数据。spring-doc.cn

7.14.1. 选项

rsocket sink 有以下选项:spring-doc.cn

rsocket.consumer.host

RSocket 主机。(字符串,默认值:localhostspring-doc.cn

rsocket.consumer.port

RSocket 端口。(整数,默认值:7000spring-doc.cn

rsocket.consumer.route

用于 RSocket 的路由。(字符串,默认值:<none>spring-doc.cn

rsocket.consumer.uri 中

URI 的 URI 中,该 URI 可用于基于 websocket 的传输。(URI,默认值:<none>spring-doc.cn

7.15. Amazon S3 接收器

此接收器应用程序支持将对象传输到 Amazon S3 存储桶。 文件负载(和递归目录)将传输到部署应用程序的目录(S3 存储桶)中。remotelocalspring-doc.cn

此接收器接受的消息必须包含以下 as 之一:payloadspring-doc.cn

7.15.1. 选项

s3 接收器具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

s3.common
端点 URL

用于连接到 s3 兼容存储的可选终端节点 URL。(字符串,默认值:<none>spring-doc.cn

path-style-access (路径样式访问)

使用路径样式访问。(布尔值,默认值:falsespring-doc.cn

s3.consumer
ACL

S3 对象访问控制列表。(CannedAccessControlList,默认值:<none>,可能的值:private,public-read,public-read-write,authenticated-read,log-delivery-write,bucket-owner-read,bucket-owner-full-control,aws-exec-read)spring-doc.cn

ACL 表达式

用于评估 S3 对象访问控制列表的表达式。(表达式,默认值:<none>spring-doc.cn

用于存储目标文件的 AWS 存储桶。(字符串,默认值:<none>spring-doc.cn

存储桶表达式

用于评估 AWS 存储桶名称的表达式。(表达式,默认值:<none>spring-doc.cn

键表达式

用于评估 S3 对象键的表达式。(表达式,默认值:<none>spring-doc.cn

基于 的目标生成的应用程序可以通过注入到 bean 中的 and/或 进行增强。 有关更多详细信息,请参阅 Spring 集成 AWS 支持。AmazonS3SinkConfigurationS3MessageHandler.UploadMetadataProviderS3ProgressListenerS3MessageHandlerspring-doc.cn

7.15.2. Amazon AWS 通用选项

Amazon S3 Sink(与所有其他 Amazon AWS 应用程序一样)基于 Spring Cloud AWS 项目作为其基础,其自动配置 类由 Spring Boot 自动使用。 请查阅其文档,了解必需和有用的自动配置属性。spring-doc.cn

其中一些与 AWS 凭证有关:spring-doc.cn

其他用于 AWS 定义:Regionspring-doc.cn

对于 AWS:Stackspring-doc.cn

例子
java -jar s3-sink.jar --s3.bucket=/tmp/bar

7.16. SFTP Sink 接收器

SFTP 接收器是将文件从传入邮件推送到 SFTP 服务器的简单选项。spring-doc.cn

它使用 ,因此传入消息可以是对象、(文件内容) 或 (file content 的数组)。sftp-outbound-adapterjava.io.FileStringbytesspring-doc.cn

要使用此接收器,您需要用户名和密码才能登录。spring-doc.cn

默认情况下,如果未指定任何名称,则 Spring Integration 将使用。 将确定文件名 根据 中的标头值(如果存在),或者如果 的有效负载已经是 ,则它将 使用该文件的原始名称。o.s.i.file.DefaultFileNameGeneratorDefaultFileNameGeneratorfile_nameMessageHeadersMessagejava.io.File

配置选项时,评估的根对象是应用程序上下文,例如。sftp.factory.known-hosts-expressionsftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'spring-doc.cn

7.16.1. 输入

有效载荷

7.16.2. 输出

N/A(写入 SFTP 服务器)。spring-doc.cn

7.16.3. 选项

sftp 接收器具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

sftp.consumer
自动创建目录

是否创建远程目录。(布尔值,默认值:truespring-doc.cn

filename-expression 文件名

用于生成远程文件名的 SPEL 表达式。(字符串,默认值:<none>spring-doc.cn

模式

如果远程文件已存在,则要执行的操作。(FileExistsMode,默认值:<none>,可能的值:APPEND APPEND_NO_FLUSH,FAIL,IGNORE,REPLACE,REPLACE_IF_MODIFIEDspring-doc.cn

远程目录

远程 FTP 目录。(字符串,默认值:/spring-doc.cn

远程文件分隔符

远程文件分隔符。(字符串,默认值:/spring-doc.cn

临时远程目录

如果 'isUseTemporaryFilename()' 为 true,则将写入文件的临时目录。(字符串,默认值:/spring-doc.cn

tmp-file-suffix 文件后缀

传输过程中要使用的后缀。(字符串,默认值:.tmpspring-doc.cn

使用临时文件名

是否写入临时文件并重命名。(布尔值,默认值:truespring-doc.cn

sftp.consumer.factory
允许未知密钥

如果为 True,则允许未知或更改的键。(布尔值,默认值:falsespring-doc.cn

缓存会话

缓存会话。(布尔值,默认值:<none>spring-doc.cn

主机

服务器的主机名。(字符串,默认值:localhostspring-doc.cn

已知主机表达式

解析为已知主机文件位置的 SpEL 表达式。(表达式,默认值:<none>spring-doc.cn

密码短语

用户私钥的密码。(字符串,默认值:<空字符串>spring-doc.cn

密码

用于连接到服务器的密码。(字符串,默认值:<none>spring-doc.cn

港口

服务器的端口。(整数,默认值:22spring-doc.cn

私钥

用户私钥的资源位置。(资源,默认值:<无>spring-doc.cn

用户名

用于连接到服务器的用户名。(字符串,默认值:<none>spring-doc.cn

7.17. TCP 接收器

此模块使用 Encoder 将消息写入 TCP。spring-doc.cn

TCP 是一种流协议,需要一些机制来在网络上构建消息。许多编码器是 available,默认值为 'CRLF'。spring-doc.cn

7.17.1. 选项

tcp sink 具有以下选项:spring-doc.cn

按前缀分组的属性:spring-doc.cn

tcp.consumer
字符集

从 bytes 转换为 String 时使用的字符集。(字符串,默认值:UTF-8spring-doc.cn

关闭

是否在每条消息后关闭套接字。(布尔值,默认值:falsespring-doc.cn

编码器

发送消息时使用的编码器。(编码,默认值:<none>,可能的值:CRLF,LF,NULL,STXETX,RAW,L1,L2,L4spring-doc.cn

主机

此接收器将连接到的主机。(字符串,默认值:<none>spring-doc.cn

TCP 协议
蔚来

是否使用 NIO。(布尔值,默认值:falsespring-doc.cn

港口

要侦听的端口;0 让操作系统选择一个端口。(整数,默认值:1234spring-doc.cn

反向查找

对远程 IP 地址执行反向 DNS 查找;如果为 false,则邮件报头中仅包含 IP 地址。(布尔值,默认值:falsespring-doc.cn

套接字超时

未收到数据时关闭套接字之前的超时 (ms)。(整数,默认值:120000spring-doc.cn

使用直接缓冲区

是否使用直接缓冲区。(布尔值,默认值:falsespring-doc.cn

7.17.2. 可用的编码器

文本数据
CRLF (默认)

以回车 (0x0d) 后跟换行符 (0x0a) 结尾的文本spring-doc.cn

如果

由换行符终止的文本 (0x0a)spring-doc.cn

以 null 字节 (0x00) 结尾的文本spring-doc.cn

STXETX

文本前面有 STX (0x02) 并以 ETX (0x03) 结尾spring-doc.cn

文本和二进制数据

no structure - 客户端通过关闭套接字来指示完整的消息spring-doc.cn

L1 系列

数据前面有一个单字节(无符号)长度字段(最多支持 255 个字节)spring-doc.cn

L2 (二层)

数据前面有一个两字节(无符号)长度的字段(最多 2 个16-1 字节)spring-doc.cn

L4 系列

数据前面有一个四字节(带符号)长度的字段(最多 2 个31-1 字节)spring-doc.cn

7.18. 吞吐量接收器

Sink,它将对消息进行计数,并按选定的时间间隔记录观察到的吞吐量。spring-doc.cn

7.18.1. 选项

吞吐量接收器具有以下选项:spring-doc.cn

吞吐量.报告每毫秒

报告的频率。(整数,默认值:1000spring-doc.cn

7.19. Twitter 消息接收器

从身份验证用户向指定用户发送私信。 需要将 JSON POST 正文和标头设置为 。Content-Typeapplication/jsonspring-doc.cn

收到用户的消息后,您可以在 24 小时内发送最多 5 条消息作为响应。 收到的每条消息都会重置 24 小时窗口和分配的 5 条消息。 在 24 小时内发送第 6 条消息或在 24 小时时段外发送消息将计入速率限制。 此行为仅在使用 POST direct_messages/events/new 端点时适用。

SPEL 表达式用于计算输入消息中的请求参数。spring-doc.cn

7.19.1. 选项

使用单引号 () 将表达式属性的文字值括起来。 例如,要设置固定的消息文本,请使用 。 对于固定目标 userId,请使用 。'SpELtext='Fixed Text'userId='666'
twitter.message.update.media-id

要与消息关联的媒体 ID。私信只能引用一个媒体 ID。(表达式,默认值:<none>spring-doc.cn

twitter.message.update.screen-name

向其发送私信的用户的屏幕名称。(表达式,默认值:<none>spring-doc.cn

推特.message.update.text

私信文本。URL 编码。最大长度为 10,000 个字符。(表达式,默认值:payloadspring-doc.cn

twitter.message.update.user-id

向其发送私信的用户的用户 ID。(表达式,默认值:<none>spring-doc.cn

7.20. Twitter 更新下沉

更新身份验证用户的当前文本(例如 Tweeting)。spring-doc.cn

对于每次更新尝试,都会将更新文本与进行身份验证的用户最近的推文进行比较。 任何会导致重复的尝试都将被阻止,从而导致 403 错误。 用户不能连续两次提交相同的文本。

虽然不受 API 的速率限制,但用户一次可以创建的推文数量受到限制。 标准 API 的更新限制为 3 小时内 300 个。 如果用户发布的更新数量达到当前允许的限制,此方法将返回 HTTP 403 错误。spring-doc.cn

7.20.1. 选项

按前缀分组的属性:spring-doc.cn

推特
附件 URL

(SPEL 表达式)为了使 URL 不计入扩展推文的文本正文中,请提供 URL 作为推文附件。此 URL 必须是推文永久链接或私信深层链接。任意的非 Twitter URL 必须保留在文本文本中。传递给 attachment_url 参数的 URL 与推文永久链接或私信深层链接不匹配,将在创建推文时失败并导致异常。(表达式,默认值:<none>spring-doc.cn

显示坐标

(SPEL 表达式)是否在发送 Tweet 的确切坐标上放置一个固定点。(表达式,默认值:<none>spring-doc.cn

in-reply-to-status-id (回复状态 ID)

(SPEL 表达式)更新要回复的现有文本的 ID。注意:除非在文本文本中提及此参数引用的推文的作者,否则将忽略此参数。因此,你必须在更新中包含 @username,其中 username 是引用推文的作者。设置 inReplyToStatusId 时,auto_populate_reply_metadata也会自动设置。稍后 确保从原始推文中查找领先@mentions,并从那里添加到新推文中。随着回复链的增长,这会将@mentions附加到扩展推文的元数据中,直到达到@mentions限制。如果原始推文已被删除,则回复将失败。(表达式,默认值:<none>spring-doc.cn

媒体 ID

(SPEL 表达式)要与推文关联的media_ids的逗号分隔列表。你可以在一条推文中包含最多 4 张照片、1 张动画 GIF 或 1 个视频。有关上传媒体的更多详细信息,请参阅上传媒体。(表达式,默认值:<none>spring-doc.cn

地点 ID

(SPEL 表达式)世界上的一个地方。(表达式,默认值:<none>spring-doc.cn

发短信

(SPEL 表达式)文本的文本将更新。URL 编码。t.co 链接换行都会影响字符数。默认为消息的有效负载(表达式,默认值:payloadspring-doc.cn

推特.update.location
纬度

此推文所指位置的纬度。除非此参数在 -90.0 到 +90.0(北为正)范围内(包括 -90.0 到 +90.0 为正)内,否则将忽略此参数。如果没有相应的 long 参数,它也将被忽略。(表达式,默认值:<none>spring-doc.cn

离子

此推文所指位置的经度。经度的有效范围为 -180.0 到 +180.0(东为正),包括正数。如果超出该范围、不是数字、禁用geo_enabled或没有相应的 lat 参数,则此参数将被忽略。(表达式,默认值:<none>spring-doc.cn

7.21. 波前沉

Wavefront 接收器使用 Messages<?>,将其转换为 Wavefront 数据格式的指标,并将指标直接发送到 Wavefront 或 Wavefront 代理。spring-doc.cn

支持常见的 ETL 使用案例,其中现有(历史)指标数据必须清理、转换并存储在 Wavefront 中以供进一步分析。spring-doc.cn

7.21.1. 选项

Wavefront sink 具有以下选项:spring-doc.cn

wavefront.api-token

Wavefront API 访问令牌。(字符串,默认值:<none>spring-doc.cn

wavefront.metric-表达式

计算结果为度量值的 SPEL 表达式。(表达式,默认值:<none>spring-doc.cn

wavefront.metric-名称

指标的名称。默认为应用程序名称。(字符串,默认值:<none>spring-doc.cn

wavefront.proxy-uri

Wavefront 代理的 URL。(字符串,默认值:<none>spring-doc.cn

wavefront.source

发出指标的唯一应用程序、主机、容器或实例。(字符串,默认值:<none>spring-doc.cn

wavefront.tag-表达式

与指标关联的自定义元数据的集合。点标签不能为空。键的有效字符:字母数字、连字符 ('-')、下划线('_')、点 ('.')。对于值,允许使用任何字符,包括空格。要包含双引号,请使用反斜杠对其进行转义,反斜杠不能是标签值中的最后一个字符。点标签键和值组合的最大允许长度为 254 个字符(255 个字符,包括分隔键和值的“=”字符)。如果值较长,则拒绝并记录该点(Map<String、Expression>,默认值:<none>spring-doc.cn

wavefront.timestamp 表达式

一个 SPEL 表达式,计算结果为指标的时间戳(可选)。(表达式,默认值:<none>spring-doc.cn

wavefront.uri 中

Wavefront 环境的 URL。(字符串,默认值:<none>spring-doc.cn

7.22. Websocket 接收器

一个简单的 Websocket Sink 实现。spring-doc.cn

7.22.1. 选项

支持以下选项:spring-doc.cn

websocket.consumer.log 级

netty 通道的 logLevel 。默认值为 <tt>WARN</tt> (String, default: <none>spring-doc.cn

websocket.consumer.path

WebsocketSink 使用者需要连接的路径。默认值为 <tt>/websocket</tt> (String,默认值:/websocketspring-doc.cn

websocket.consumer.port

Netty 服务器侦听的端口。默认值为 <tt>9292</tt> (整数,默认值:9292spring-doc.cn

websocket.consumer.ssl

是否创建一个 {@link io.netty.handler.ssl.SslContext}。(布尔值,默认值:falsespring-doc.cn

websocket.consumer.threads

Netty 的线程数 {@link io.netty.channel.EventLoopGroup}。默认值为 <tt>1</tt> (整数,默认值:1spring-doc.cn

7.22.2. 示例

要验证 websocket-sink 是否接收来自其他 spring-cloud-stream 应用程序的消息,你可以使用 遵循简单的端到端设置。spring-doc.cn

第 1 步:启动 Rabbitmq
第 2 步:部署time-source
第 3 步:部署websocket-sink

最后,在 mode 中启动 websocket-sink,以便您可以在日志中看到 the 产生的消息:tracetime-sourcespring-doc.cn

java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
	--logging.level.org.springframework.cloud.fn.consumer.websocket=TRACE

您应该开始在启动 WebsocketSink 的控制台中看到日志消息,如下所示:spring-doc.cn

Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]

7.22.3. 执行器

您可以使用它来访问发送和接收的最后一条消息。您必须 通过提供 来启用它。默认情况下,它通过 .下面是一个示例输出:Endpointn--endpoints.websocketconsumertrace.enabled=truehost:port/websocketconsumertracespring-doc.cn

 [
   {
    "timestamp": 1445453703508,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
      "payload": "2015-10-21 20:54:33"
    }
  },
  ...
  {
    "timestamp": 1445453703506,
    "info": {
      "type": "text",
      "direction": "out",
      "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
      "payload": "2015-10-21 20:54:32"
    }
  }
]

还有一个简单的 HTML 页面,您可以在其中的文本区域看到转发的消息。您可以访问 它直接通过您的浏览器。host:portspring-doc.cn

7.23. ZeroMQ 接收器

“zeromq” sink 支持将消息发送到 ZeroMQ 套接字。spring-doc.cn

7.23.1. 输入

7.23.2. 输出

有效载荷

不适用spring-doc.cn

7.23.3. 选项

zeromq sink 有以下选项:spring-doc.cn

zeromq.consumer.connect-url

用于连接到 ZeroMQ 套接字的连接 URL。(字符串,默认值:<none>spring-doc.cn

zeromq.consumer.socket类型

连接应建立的 Socket Type。(SocketType,默认值:<none>,可能的值:PAIR,PUBSUBREQ,REP,DEALER,ROUTER,PULL,PUSH,XPUB,XSUB,STREAM)spring-doc.cn

zeromq.consumer.topic

一个 Topic SpEL 表达式,用于在向订阅者发送消息之前评估主题。(表达式,默认值:<none>spring-doc.cn

7.23.4. 构建

$ ./mvnw clean install -PgenerateApps
$ cd apps

您可以在此处找到相应的基于 Binder 的项目。 然后你可以 cd 到其中一个文件夹并构建它:spring-doc.cn

$ ./mvnw clean package

7.23.5. 示例

java -jar zeromq-sink.jar --zeromq.consumer.connectUrl=tcp://server:port --zeromq.consumer.topic=