Confluent Schema Registry介绍

数据模型的模式演变 ( Schema Evolution )

我们首先需要理解下 Kafka 是如何储存数据的。

从本质上来看 Kafka 与 Hadoop 比较类似,都可以粗略看做是一种分布式文件系统。需要注意的是,Kafka 是将数据以 Byte Array 格式储存在 Topic 中。这个设计直接导致了 Kafka 在 Cluster 和 Topic Level 这个层级上是没有任何数据类型校对的,这一点与数据库表的概念完全不同。也就是说应用端可以随意往 Topic 中写入任何格式的数据对象,这些都被 Kafka 视为合理操作。

尽管 Kafka 存储不关心自己存了什么,但 Producer 和 Consumer 还是有很强的数据约束的,并且二者在 Event Sourcing 系统中,通过 Kafka Topic 建立通信,也就是说 Consumer 必须能够成功反序列出 Producer 发出的数据,才能保证应用正常运行。

image-20211229221910766

假如 Producer 发送如下的数据对象给 Kafka,这里我们用 Kotlin 建立个 Pojo 数据类:

// Partner V1 
data class Partner (
    val id: String,
    val firstname: String,
    val name: String,
    val email: String
)

Producer 发送的 Partner 数据对象被序列化后,会被 Consumer 在另一端读出,并做反序列化操作。

在运维上通常会碰到以下与数据相关的问题:

  • Consumer 作为 Downstream 微服务,不一定需要全部 Partner 字段(比如不需要 age)
  • 在一个 Topic 被多个 Consumer 消费时,一旦 Producer 数据模型更新了,需要更新全部 Consumer 数据模型
  • 假如 Partner V1 添加(或更改,删除字段)了一个新字段(见下面 Partner V2 代码),然后用新的数据模型重新部署了 Producer 和 Consumer,之前储存在 Kafka Topic 的旧数据(V1)怎么处理?
// Partner V2
data class Partner (
    val id: String,
    val firstname: String,
    val name: String,
    val gender: String,
    val age: Int,
    val address: String
)
  • Producer, Consumer 在在线升级中,先升级哪个服务? 会不会出现 Downtime?
  • 如何能用一个数据模型读取 Topic 中全部不同版本的历史数据?
  • 如何能够保证多人开发的微服务系统中,不会因为交流不及时引发矛盾?

image-20211229222111105

解决方案

数据版本向上,向下兼容是个在数据仓库建模中常见的问题。通常在这种涉及到历史数据变更的问题,在行业中一般用 Slow Changing Dimension 的范式解决。在 Kafka 中一般会使用 Avro,Json 或者 Protobuf 类型的数据格式来存储,三者都具备对字段描述的属性。

思路上大体可以粗略做下面的约束:

  • 表/数据模型的主键,设为 required,且唯一字段
  • 将未来有可能被删除的字段设置一个初始值
  • 避免使用 Enum 类型,因为增添、删除 Enum 类型很难做 Schema Evolution
  • 如果需要重命名字段,可以使用 Alias 代替
  • 如果添加字段,一定要设置其初始值(这样读取历史数据时才不会出问题)
  • Avro 中的 required 的字段绝对不能删除,反之,只能删除有 default 值的字段
  • 字段的数据类型不能被更改(比如 String 改为 Integer)

其实仔细思考下也不难懂,主键是数据 ID 的标识,当然不可以删。如果增加的新字段在历史版本中不存在的话,当然是要设置一个初始值,才能让之前的数据被新 Schema 初始化。

什么是Confluent Schema Registry

Schema Registry是在Confluent公司开发的产品,它对Kafka一个比较大的增强是:它使得Kafka的数据流必须符合注册的Schema,从而增强了可用性。而且部署时,它并不是 Kafka Cluster 的一部分,而是独立在 Kafka 之外单独部署的。

image-20211229222607653

为什么schema registry不整合在kafka broker,因为这样会打破kafka优秀的特性: 就Kafka而言,它甚至不知道你的资料是否是整数或是字串,这样才能保持Kafka的高性能。

所以:

  • Schema Registry需要是独立的组件
  • 生产者和消费者需要能够与之对话
  • 必须商定通用的格式(Avro, JSON, protobuf…)
  • 它需要支持格式变动
  • 它需要是轻量级的

Schema-Registry是为元数据管理提供的服务,同样提供了RESTful接口用来存储和获取schemas,它能够保存数据格式变化的所有版本。

总结

  • 补充了 Schema Evolution 后,我们的微服务变得更加 Robust。

  • 作为消费者,可以在 Producer 升级后继续流畅地运行,同时Producer 也可以自行独立升级。

  • 有了 Schema Registry 的支持后,会对 Topic 中的数据做严格的输入检查,从而避免了工业事故和不必要的运维成本(数据重新建模、Refactoring 等)。


参考:

https://zhuanlan.zhihu.com/p/284979007

https://iter01.com/516577.html