我们首先需要理解下 Kafka 是如何储存数据的。
从本质上来看 Kafka 与 Hadoop 比较类似,都可以粗略看做是一种分布式文件系统。需要注意的是,Kafka 是将数据以 Byte Array 格式储存在 Topic 中。这个设计直接导致了 Kafka 在 Cluster 和 Topic Level 这个层级上是没有任何数据类型校对的,这一点与数据库表的概念完全不同。也就是说应用端可以随意往 Topic 中写入任何格式的数据对象,这些都被 Kafka 视为合理操作。
尽管 Kafka 存储不关心自己存了什么,但 Producer 和 Consumer 还是有很强的数据约束的,并且二者在 Event Sourcing 系统中,通过 Kafka Topic 建立通信,也就是说 Consumer 必须能够成功反序列出 Producer 发出的数据,才能保证应用正常运行。
假如 Producer 发送如下的数据对象给 Kafka,这里我们用 Kotlin 建立个 Pojo 数据类:
// Partner V1
data class Partner (
val id: String,
val firstname: String,
val name: String,
val email: String
)
Producer 发送的 Partner 数据对象被序列化后,会被 Consumer 在另一端读出,并做反序列化操作。
在运维上通常会碰到以下与数据相关的问题:
// Partner V2
data class Partner (
val id: String,
val firstname: String,
val name: String,
val gender: String,
val age: Int,
val address: String
)
数据版本向上,向下兼容是个在数据仓库建模中常见的问题。通常在这种涉及到历史数据变更的问题,在行业中一般用 Slow Changing Dimension 的范式解决。在 Kafka 中一般会使用 Avro,Json 或者 Protobuf 类型的数据格式来存储,三者都具备对字段描述的属性。
思路上大体可以粗略做下面的约束:
其实仔细思考下也不难懂,主键是数据 ID 的标识,当然不可以删。如果增加的新字段在历史版本中不存在的话,当然是要设置一个初始值,才能让之前的数据被新 Schema 初始化。
Schema Registry
是在Confluent公司开发的产品,它对Kafka一个比较大的增强是:它使得Kafka的数据流必须符合注册的Schema,从而增强了可用性。而且部署时,它并不是 Kafka Cluster 的一部分,而是独立在 Kafka 之外单独部署的。
为什么schema registry
不整合在kafka broker,因为这样会打破kafka优秀的特性:
就Kafka而言,它甚至不知道你的资料是否是整数或是字串,这样才能保持Kafka的高性能。
所以:
Schema-Registry
是为元数据管理提供的服务,同样提供了RESTful接口用来存储和获取schemas,它能够保存数据格式变化的所有版本。
补充了 Schema Evolution 后,我们的微服务变得更加 Robust。
作为消费者,可以在 Producer 升级后继续流畅地运行,同时Producer 也可以自行独立升级。
有了 Schema Registry 的支持后,会对 Topic 中的数据做严格的输入检查,从而避免了工业事故和不必要的运维成本(数据重新建模、Refactoring 等)。
参考: