此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

Debezium 引擎,更改数据捕获 (CDC) 入站通道适配器。 允许捕获数据库更改事件,将它们转换为消息,然后流式传输到出站通道。DebeziumMessageProducerSpring中文文档

您需要将 spring 集成 Debezium 依赖项包含在您的项目中:Spring中文文档

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-debezium</artifactId>
    <version>6.3.2-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-debezium:6.3.2-SNAPSHOT"

您还需要为输入数据库添加 debezium 连接器依赖项。 例如,要将 Debezium 与 PostgreSQL 一起使用,您将需要 postgres debezium 连接器:Spring中文文档

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium-version}</version>
</dependency>
compile "io.debezium:debezium-connector-postgres:{debezium-version}"

将 替换为与正在使用的版本兼容的版本。debezium-versionspring-integration-debeziumSpring中文文档

将 替换为与正在使用的版本兼容的版本。debezium-versionspring-integration-debeziumSpring中文文档

入站 Debezium 通道适配器

Debezium 适配器需要预配置的实例。DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>Spring中文文档

debezium-supplier 提供了一个开箱即用的 Spring Boot 自动配置,其中包含一个方便的 DebeziumProperties 配置抽象。DebeziumEngine.BuilderSpring中文文档

Debezium Java DSL 可以从提供的 以及普通的 Debezium 配置(例如 )创建实例。 对于一些具有自以为是的配置和序列化格式的常见用例,稍后可以很方便。DebeziumMessageProducerDebeziumEngine.Builderjava.util.PropertiesSpring中文文档

此外,还可以使用以下配置属性进行调整:DebeziumMessageProducerSpring中文文档

  • contentType- 允许处理(默认)和消息内容。 contentType 应与为 提供的 .JSONAVROPROTOBUFmustSerializationFormatDebeziumEngine.BuilderSpring中文文档

  • enableBatch- 当设置为(默认)时,debezium 适配器将为从源数据库收到的每个数据更改事件发送 new。 如果设置为 then 适配器会为从 Debezium 引擎接收的每批数据向下游发送一个。 此类有效负载不可序列化,需要自定义序列化/反序列化实现。falseMessageChangeEventtrueMessageChangeEventSpring中文文档

  • enableEmptyPayload- 启用对逻辑删除(也称为删除)消息的支持。 在数据库行删除时,Debezium 可以发送一个逻辑删除更改事件,该事件的键与已删除的行相同,值为 。 默认值为 。Optional.emptyfalseSpring中文文档

  • headerMapper- 自定义实现,允许选择标头并将其转换为标头。 默认实现为 提供 的 setter。 默认情况下,映射所有标头。HeaderMapperChangeEventMessageDefaultDebeziumHeaderMappersetHeaderNamesToMapSpring中文文档

  • taskExecutor- 为 Debezium 引擎设置自定义。TaskExecutorSpring中文文档

以下代码片段演示了此通道适配器的各种配置:Spring中文文档

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:Spring中文文档

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
                .web(WebApplicationType.NONE)
                .run(args);
    }

    @Bean
    public MessageChannel debeziumInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer debeziumMessageProducer(
            DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
            MessageChannel debeziumInputChannel) {

        DebeziumMessageProducer debeziumMessageProducer =
            new DebeziumMessageProducer(debeziumEngineBuilder);
        debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
        return debeziumMessageProducer;
    }

    @ServiceActivator(inputChannel = "debeziumInputChannel")
    public void handler(Message<?> message) {

        Object destination = message.getHeaders().get(DebeziumHeaders.DESTINATION); (1)

        String key = new String((byte[]) message.getHeaders().get(DebeziumHeaders.KEY)); (2)

        String payload = new String((byte[]) message.getPayload()); (3)

        System.out.println("KEY: " + key + ", DESTINATION: " + destination + ", PAYLOAD: " + payload);
    }

}
1 事件所针对的逻辑目标的名称。 通常,目标由配置选项、数据库名称和表名称组成。例如:。topic.prefixmy-topic.inventory.orders
2 包含更改表的键和更改行的实际键的架构。 在连接器创建事件时,密钥架构及其相应的密钥有效负载都包含已更改表(或唯一约束)中每列的字段。PRIMARY KEY
3 与密钥一样,有效负载具有架构部分和有效负载值部分。 架构部分包含描述有效负载值部分的包络结构的架构,包括其嵌套字段。 用于创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。

和/或允许分别禁用密钥或有效负载的消息内模式内容。key.converter.schemas.enable=falsevalue.converter.schemas.enable=falseSpring中文文档

同样,我们可以配置 批量处理传入的更改事件:DebeziumMessageProducerSpring中文文档

@Bean
public MessageProducer debeziumMessageProducer(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder,
        MessageChannel debeziumInputChannel) {

    DebeziumMessageProducer debeziumMessageProducer = new DebeziumMessageProducer(debeziumEngineBuilder);
	debeziumMessageProducer.setEnableBatch(true);
    debeziumMessageProducer.setOutputChannel(debeziumInputChannel);
    return debeziumMessageProducer;
}

@ServiceActivator(inputChannel = "debeziumInputChannel")
public void handler(List<ChangeEvent<Object, Object>> payload) {
    System.out.println(payload);
}

debezium-supplier 提供了一个开箱即用的 Spring Boot 自动配置,其中包含一个方便的 DebeziumProperties 配置抽象。DebeziumEngine.BuilderSpring中文文档

Debezium Java DSL 可以从提供的 以及普通的 Debezium 配置(例如 )创建实例。 对于一些具有自以为是的配置和序列化格式的常见用例,稍后可以很方便。DebeziumMessageProducerDebeziumEngine.Builderjava.util.PropertiesSpring中文文档

1 事件所针对的逻辑目标的名称。 通常,目标由配置选项、数据库名称和表名称组成。例如:。topic.prefixmy-topic.inventory.orders
2 包含更改表的键和更改行的实际键的架构。 在连接器创建事件时,密钥架构及其相应的密钥有效负载都包含已更改表(或唯一约束)中每列的字段。PRIMARY KEY
3 与密钥一样,有效负载具有架构部分和有效负载值部分。 架构部分包含描述有效负载值部分的包络结构的架构,包括其嵌套字段。 用于创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。

和/或允许分别禁用密钥或有效负载的消息内模式内容。key.converter.schemas.enable=falsevalue.converter.schemas.enable=falseSpring中文文档

Debezium Java DSL 支持

它通过工厂和实现提供了一个方便的 Java DSL 流畅 API。spring-integration-debeziumDebeziumDebeziumMessageProducerSpecSpring中文文档

Debezium Java DSL 的入站通道适配器是:Spring中文文档

 DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>>   debeziumEngineBuilder = ...
 IntegrationFlow.from(
    Debezium.inboundChannelAdapter(debeziumEngineBuilder)
        .headerNames("special*")
        .contentType("application/json")
        .enableBatch(false))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

或者从原生 debezium 配置属性创建实例,并默认为序列化格式。DebeziumMessageProducerSpecJSONSpring中文文档

 Properties debeziumConfig = ...
 IntegrationFlow
    .from(Debezium.inboundChannelAdapter(debeziumConfig))
    .handle(m -> System.out.println(new String((byte[]) m.getPayload())))

以下 Spring Boot 应用程序提供了使用 Java DSL 配置入站适配器的示例:Spring中文文档

@SpringBootApplication
public class DebeziumJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(DebeziumJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow debeziumInbound(
        DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder) {

        return IntegrationFlow
                .from(Debezium
                        .inboundChannelAdapter(debeziumEngineBuilder)
					    .headerNames("special*")
					    .contentType("application/json")
					    .enableBatch(false))
                .handle(m -> System.out.println(new String((byte[]) m.getPayload())))
                .get();
    }

}