对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
Debezium 支持
Debezium 引擎、变更数据捕获 (CDC) 入站通道适配器。
它允许捕获数据库更改事件,将其转换为消息,并在以后流式传输到出站通道。DebeziumMessageProducer
你需要在你的项目中包含 Spring 集成的 Debezium 依赖项:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-debezium</artifactId>
<version>6.3.6</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:6.3.6"
您还需要为输入 Database 包含一个 debezium 连接器依赖项。 例如,要将 Debezium 与 PostgreSQL 一起使用,您将需要 postgres debezium 连接器:
-
Maven
-
Gradle
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"
将 替换为与正在使用的版本兼容的版本。 |
入站 Debezium 通道适配器
Debezium 适配器需要一个预配置的实例。DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>
debezium-supplier 提供了一个开箱即用的 Spring Boot 自动配置,带有一个方便的 DebeziumProperties 配置抽象。 |
Debezium Java DSL 可以从提供的 以及普通的 Debezium 配置(例如 )创建实例。
稍后对于一些具有固执己见的配置和序列化格式的常见用例来说可能很方便。 |
此外,还可以使用以下配置属性进行调整:DebeziumMessageProducer
-
contentType
- 允许处理 (default) 和 Message 内容。 contentType 与为提供的 .JSON
AVRO
PROTOBUF
must
SerializationFormat
DebeziumEngine.Builder
-
enableBatch
- 当设置为 (default) 时,Debezium 适配器将为从源数据库接收的每个数据更改事件发送 NEW。 如果设置为 ,则适配器将为从 Debezium 引擎接收的每批 single 向下游发送一个。 这样的有效负载是不可序列化的,需要自定义序列化/反序列化实现。false
Message
ChangeEvent
true
Message
ChangeEvent
-
enableEmptyPayload
- 启用对逻辑删除(又名删除)消息的支持。 在数据库行删除时,Debezium 可以发送一个逻辑删除更改事件,该事件与已删除的行具有相同的键和值 . 默认为 。Optional.empty
false
-
headerMapper
- 允许选择标头并将其转换为标头的自定义实现。 默认实现为 . 默认情况下,所有标头都已映射。HeaderMapper
ChangeEvent
Message
DefaultDebeziumHeaderMapper
setHeaderNamesToMap
-
taskExecutor
- 为 Debezium 引擎设置自定义。TaskExecutor
以下代码片段演示了此 channel adapter 的各种配置:
使用 Java 配置进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public MessageChannel debeziumInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer =
new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(Message<?> message) {
Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)
String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)
String payload = new String((byte[]) message.getPayload()); (3)
System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
}
}
1 | 事件所针对的逻辑目标的名称。
通常,目标由配置选项、数据库名称和表名组成。例如:。topic.prefix my-topic.inventory.orders |
2 | 包含已更改表的 key 的架构和已更改行的实际 key。
在连接器创建事件时,键架构及其相应的键有效负载都包含已更改表(或唯一约束)中每一列的字段。PRIMARY KEY |
3 | 与键一样,payload 也有一个 schema 部分和一个 payload value 部分。 的 schema 部分包含描述 payload value 部分的 Envelope 结构的架构,包括其嵌套字段。 创建、更新或删除数据的操作的 Change 事件都具有具有 envelope 结构的值 payload。 |
的 and/or 允许分别禁用密钥或有效负载的消息内架构内容。 |
同样,我们可以配置 以批量处理传入的更改事件:DebeziumMessageProducer
@Bean
public MessageProducer debeziumMessageProducer(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
MessageChannel debeziumInputChannel) {
DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
debeziumMessageProducer.setEnableBatch(true);
debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
return debeziumMessageProducer;
}
@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
System.out.println(payload);
}
Debezium Java DSL 支持
它通过工厂和实现提供了一个方便的 Java DSL Fluent API。spring-integration-debezium
Debezium
DebeziumMessageProducerSpec
Debezium Java DSL 的入站通道适配器是:
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder = ...
IntegrationFlow.from(
Debezium.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
或者从本机 debezium 配置属性创建实例,并默认为序列化格式。DebeziumMessageProducerSpec
JSON
Properties debeziumConfig = ...
IntegrationFlow
.from(Debezium.inboundChannelAdapter(debeziumConfig))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
以下 Spring Boot 应用程序提供了使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class DebeziumJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DebeziumJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow debeziumInbound(
DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {
return IntegrationFlow
.from(Debezium
.inboundChannelAdapter(debeziumEngineBuilder)
.headerNames("special*")
.contentType("application/json")
.enableBatch(false))
.handle(m -> System.out.println(new String((byte[]) m.getPayload())))
.get();
}
}