介绍

当组织具有基于消息传递的发布/订阅体系结构,并且多个生产者和使用者微服务相互通信时,所有这些微服务通常需要就基于架构的协定达成一致。 当此类架构需要发展以适应新的业务需求时,仍然需要现有组件继续工作。 Spring Cloud Stream 提供对独立模式注册表服务器的支持,应用程序可以使用该服务器注册和使用上述模式。 Spring Cloud Stream 模式注册表支持还提供了对基于 avro 的模式注册表客户端的支持,这些客户端实质上提供了与模式注册表通信的消息转换器,以便在消息转换期间协调模式。 Spring Cloud Stream 提供的模式演进支持既适用于上述独立模式注册表,也适用于 Confluent 提供的专门用于 Apache Kafka 的模式注册表。Spring中文文档

Spring Cloud Stream Schema Registry 概述

Spring Cloud Stream Schema Registry 提供对模式演进的支持,以便数据可以随着时间的推移而演进,并且仍然适用于较旧或较新的生产者和使用者,反之亦然。 大多数序列化模型(尤其是旨在跨不同平台和语言实现可移植性的模型)都依赖于描述如何在二进制有效负载中序列化数据的架构。 为了序列化数据并对其进行解释,发送方和接收方都必须有权访问描述二进制格式的架构。 在某些情况下,可以从序列化的有效负载类型或反序列化的目标类型推断架构。 但是,许多应用程序都可以访问描述二进制数据格式的显式架构。 架构注册表允许您以文本格式(通常为 JSON)存储架构信息,并使需要它以二进制格式接收和发送数据的各种应用程序可以访问该信息。 架构可作为元组引用,该元组由以下部分组成:Spring中文文档

Spring Cloud Stream Schema Registry 提供以下组件Spring中文文档

  • 独立架构注册表服务器Spring中文文档

    By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
  • 架构注册表客户端,能够通过与架构注册表通信来封送消息。Spring中文文档

    Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.

架构注册表客户端

用于与架构注册表服务器交互的客户端抽象是接口,该接口具有以下结构:SchemaRegistryClientSpring中文文档

public interface SchemaRegistryClient {

    SchemaRegistrationResponse register(String subject, String format, String schema);

    String fetch(SchemaReference schemaReference);

    String fetch(Integer id);

}

Spring Cloud Stream 提供了开箱即用的实现,用于与自己的模式服务器进行交互以及与 Confluent Schema Registry 进行交互。Spring中文文档

可以使用 来配置 Spring Cloud Stream 模式注册表的客户端,如下所示:@EnableSchemaRegistryClientSpring中文文档

@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {

}
默认转换器经过优化,不仅可以缓存来自远程服务器的架构,还可以缓存非常昂贵的 and 方法。 因此,它使用不缓存响应。 如果要更改默认行为,可以直接在代码上使用客户端并将其重写为所需的结果。 为此,必须将该属性添加到应用程序属性中。parse()toString()DefaultSchemaRegistryClientspring.cloud.stream.schemaRegistryClient.cached=true
默认转换器经过优化,不仅可以缓存来自远程服务器的架构,还可以缓存非常昂贵的 and 方法。 因此,它使用不缓存响应。 如果要更改默认行为,可以直接在代码上使用客户端并将其重写为所需的结果。 为此,必须将该属性添加到应用程序属性中。parse()toString()DefaultSchemaRegistryClientspring.cloud.stream.schemaRegistryClient.cached=true

架构注册表客户端属性

架构注册表客户端支持以下属性:Spring中文文档

spring.cloud.stream.schemaRegistryClient.endpoint

架构服务器的位置。 设置此项时,请使用完整的 URL,包括协议 ( 或 ) 、端口和上下文路径。httphttpsSpring中文文档

违约

localhost:8990/Spring中文文档

spring.cloud.stream.schemaRegistryClient.cached

客户端是否应缓存架构服务器响应。 通常设置为 ,因为缓存发生在消息转换器中。 使用架构注册表客户端的客户端应将其设置为 。falsetrueSpring中文文档

违约

falseSpring中文文档

Avro 架构注册表客户端消息转换器

对于在应用程序上下文中注册了 SchemaRegistryClient bean 的应用程序,Spring Cloud Stream 会自动配置用于模式管理的 Apache Avro 消息转换器。 这简化了架构演变,因为接收消息的应用程序可以轻松访问可以与其自己的读取器架构协调的编写器架构。Spring中文文档

对于出站消息,如果绑定的内容类型设置为 ,则激活 ,如以下示例所示:application/*+avroMessageConverterSpring中文文档

spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro

在出站转换期间,消息转换器会尝试推断每个出站消息的架构(基于其类型),并使用 . 如果已找到相同的架构,则检索对它的引用。 如果没有,则注册架构,并提供新的版本号。 消息使用以下方案使用标头发送: ,其中可配置,并从有效负载类型推断出来。SchemaRegistryClientcontentTypeapplication/[prefix].[subject].v[version]+avroprefixsubjectSpring中文文档

例如,该类型的消息可能作为内容类型为 的二进制有效负载发送,其中主题是版本号。Userapplication/vnd.user.v2+avrouser2Spring中文文档

接收消息时,转换器会从传入消息的标头推断架构引用,并尝试检索它。该架构在反序列化过程中用作编写器架构。Spring中文文档

Avro 架构注册表消息转换器属性

如果已通过设置启用了基于 Avro 的架构注册表客户端,则可以通过设置以下属性来自定义注册的行为。spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avroSpring中文文档

spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled(春季.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled)

如果希望转换器使用反射从 POJO 推断架构,请启用。Spring中文文档

违约:falseSpring中文文档

spring.cloud.stream.schema.avro.readerSchema

Avro 通过查看编写器架构(源有效负载)和读取器架构(应用程序有效负载)来比较架构版本。有关详细信息,请参阅 Avro 文档。 如果设置,这将覆盖架构服务器上的任何查找,并使用本地架构作为读取器架构。 违约:nullSpring中文文档

spring.cloud.stream.schema.avro.schema位置

将此属性中列出的任何文件注册到架构服务器。.avscSpring中文文档

违约:emptySpring中文文档

spring.cloud.stream.schema.avro.prefix

要在 Content-Type 标头上使用的前缀。Spring中文文档

违约:vndSpring中文文档

spring.cloud.stream.schema.avro.subject命名策略

确定用于在架构注册表中注册 Avro 架构的使用者名称。有两种实现可用,其中 subject 是架构名称,而 返回使用 Avro 架构命名空间和名称的完全限定主题。可以通过实施来创建自定义策略。org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategyorg.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategyorg.springframework.cloud.stream.schema.avro.SubjectNamingStrategySpring中文文档

违约:org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategySpring中文文档

spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer

忽略任何架构注册表通信。对于测试目的很有用,因此在运行单元测试时,它不会不必要地尝试连接到架构注册表服务器。Spring中文文档

违约:falseSpring中文文档

Apache Avro 消息转换器

Spring Cloud Stream 通过其模块提供对基于模式的消息转换器的支持。 目前,基于架构的消息转换器支持的唯一现成序列化格式是 Apache Avro,未来版本中将添加更多格式。spring-cloud-stream-schema-registry-clientSpring中文文档

该模块包含两种类型的消息转换器,可用于 Apache Avro 序列化:spring-cloud-stream-schema-registry-clientSpring中文文档

  • 使用序列化或反序列化对象的类信息或具有启动时已知位置的架构的转换器。Spring中文文档

  • 使用架构注册表的转换器。它们在运行时定位架构,并随着域对象的发展动态注册新架构。Spring中文文档

支持架构的转换器

它支持通过使用预定义的架构或使用类中可用的架构信息(反射式或包含在 中)来序列化和反序列化消息。 如果提供自定义转换器,则不会创建默认的 AvroSchemaMessageConverter bean。 以下示例显示了自定义转换器:AvroSchemaMessageConverterSpecificRecordSpring中文文档

若要使用自定义转换器,只需将其添加到应用程序上下文中,并选择性地指定要与之关联的一个或多个转换器。 默认值为 。MimeTypesMimeTypeapplication/avroSpring中文文档

如果转换的目标类型是 ,则必须设置架构。GenericRecordSpring中文文档

以下示例演示如何通过注册没有预定义架构的 Apache Avro 来在接收器应用程序中配置转换器。 在此示例中,请注意 mime 类型值为 ,而不是默认值。MessageConverteravro/bytesapplication/avroSpring中文文档

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

相反,以下应用程序使用预定义的架构(在类路径上找到)注册转换器:Spring中文文档

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

架构注册表服务器

Spring Cloud Stream 提供了一个模式注册表服务器实现。 要使用它,您可以下载最新版本并将其作为独立应用程序运行:spring-cloud-stream-schema-registry-serverSpring中文文档

wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar

您可以将架构注册表嵌入到现有的 Spring Boot Web 应用程序中。 为此,请将项目添加到项目并使用批注,这会将架构注册表服务器 REST 控制器添加到应用程序。 以下示例显示了启用架构注册表的 Spring Boot 应用程序:spring-cloud-stream-schema-registry-core@EnableSchemaRegistryServerSpring中文文档

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}

该属性可用于控制架构服务器的根路径(尤其是当它嵌入到其他应用程序中时)。 boolean 属性允许删除架构。默认情况下,此功能处于禁用状态。spring.cloud.stream.schema.server.pathspring.cloud.stream.schema.server.allowSchemaDeletionSpring中文文档

架构注册表服务器使用关系数据库来存储架构。 默认情况下,它使用嵌入式数据库。 您可以使用 Spring Boot SQL 数据库和 JDBC 配置选项来自定义架构存储。Spring中文文档

您可以将架构注册表嵌入到现有的 Spring Boot Web 应用程序中。 为此,请将项目添加到项目并使用批注,这会将架构注册表服务器 REST 控制器添加到应用程序。 以下示例显示了启用架构注册表的 Spring Boot 应用程序:spring-cloud-stream-schema-registry-core@EnableSchemaRegistryServerSpring中文文档

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}

架构注册表服务器 API

注册新架构

若要注册新架构,请向终结点发送请求。POST/Spring中文文档

接受包含以下字段的 JSON 有效负载:/Spring中文文档

它的响应是 JSON 中的架构对象,包含以下字段:Spring中文文档

按主题、格式和版本检索现有架构

要按主题、格式和版本检索现有架构,请向终结点发送请求。GET{subject}/{format}/{version}Spring中文文档

它的响应是 JSON 中的架构对象,包含以下字段:Spring中文文档

按主题和格式检索现有架构

若要按主题和格式检索现有架构,请向终结点发送请求。GET/subject/formatSpring中文文档

它的响应是每个 JSON 模式对象的模式列表,其中包含以下字段:Spring中文文档

按 ID 检索现有架构

若要按架构的 ID 检索架构,请向终结点发送请求。GET/schemas/{id}Spring中文文档

它的响应是 JSON 中的架构对象,包含以下字段:Spring中文文档

按主题、格式和版本删除架构

若要删除由其主题、格式和版本标识的架构,请向终结点发送请求。DELETE{subject}/{format}/{version}Spring中文文档

按 ID 删除架构

要按架构的 ID 删除架构,请向终端节点发送请求。DELETE/schemas/{id}Spring中文文档

按主题删除架构

DELETE /{subject}Spring中文文档

按主题删除现有架构。Spring中文文档

本说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。 Spring Cloud Stream 1.1.0.RELEASE 使用表名 来存储对象。 是许多数据库实现中的关键字。 为了避免将来发生任何冲突,从 1.1.1.RELEASE 开始,我们选择了存储表的名称。 任何升级的Spring Cloud Stream 1.1.0.RELEASE用户都应在升级之前将其现有架构迁移到新表。schemaSchemaSchemaSCHEMA_REPOSITORY
本说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。 Spring Cloud Stream 1.1.0.RELEASE 使用表名 来存储对象。 是许多数据库实现中的关键字。 为了避免将来发生任何冲突,从 1.1.1.RELEASE 开始,我们选择了存储表的名称。 任何升级的Spring Cloud Stream 1.1.0.RELEASE用户都应在升级之前将其现有架构迁移到新表。schemaSchemaSchemaSCHEMA_REPOSITORY

使用 Confluent 的 Schema Registry

缺省配置创建一个 Bean。 如果要使用 Confluent 模式注册表,则需要创建一个 类型的 bean,该 bean 将取代框架默认配置的 bean。下面的示例演示如何创建这样的 Bean:DefaultSchemaRegistryClientConfluentSchemaRegistryClientSpring中文文档

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
  ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
  client.setEndpoint(endpoint);
  return client;
}
ConfluentSchemaRegistryClient 针对 Confluent 平台版本 4.0.0 进行了测试。
ConfluentSchemaRegistryClient 针对 Confluent 平台版本 4.0.0 进行了测试。

架构注册和解析

为了更好地理解 Spring Cloud Stream 如何注册和解析新模式及其对 Avro 模式比较功能的使用,我们提供了两个单独的小节:Spring中文文档

架构注册过程(序列化)Schema Registration Process (Serialization)

注册过程的第一部分是从通过通道发送的有效负载中提取架构。 Avro 类型(如 或 已包含架构),可以立即从实例中检索该架构。 对于 POJO,如果属性设置为(默认值),则推断架构。SpecificRecordGenericRecordspring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabledtrueSpring中文文档

获得架构后,转换器会从远程服务器加载其元数据(版本)。 首先,它查询本地缓存。如果未找到结果,它将数据提交到服务器,服务器会回复版本控制信息。 转换器始终缓存结果,以避免在架构服务器中查询需要序列化的每条新消息的开销。Spring中文文档

使用架构版本信息,转换器将消息的标头设置为携带版本信息,例如:.contentTypeapplication/vnd.user.v1+avroSpring中文文档

架构解析过程(反序列化)

读取包含版本信息的消息(即,具有如下所述方案的标头)时,转换器会查询架构服务器以获取消息的编写器架构。 找到传入消息的正确架构后,它会检索读取器架构,并使用 Avro 的架构解析支持将其读入读取器定义(设置默认值和任何缺失的属性)。contentTypeSchema Registration Process (Serialization)Spring中文文档

您应该了解编写器架构(编写消息的应用程序)和读取器架构(接收应用程序)之间的区别。 我们建议您花点时间阅读 Avro 术语并了解该过程。 Spring Cloud Stream 始终获取编写器架构以确定如何读取消息。 如果要使 Avro 的模式演进支持正常工作,则需要确保为应用程序正确设置了 a。readerSchema
您应该了解编写器架构(编写消息的应用程序)和读取器架构(接收应用程序)之间的区别。 我们建议您花点时间阅读 Avro 术语并了解该过程。 Spring Cloud Stream 始终获取编写器架构以确定如何读取消息。 如果要使 Avro 的模式演进支持正常工作,则需要确保为应用程序正确设置了 a。readerSchema