本节将演示基于Avro的schema序列化与反序列化。
假设我们将构建一个地理位置的服务,这个服务会保存设备上传的坐标(经度和纬度)、时间戳、deviceId(UUID):
- deviceId: string
- time: long
- latitude: float
- longitude: float
所以Avro Schema如下:
{
"namespace": "locations",
"name": "LocationReceived",
"type": "record",
"fields" : [
{
"name" : "deviceId",
"type" : "string"
},
{
"name" : "latitude",
"type" : "float"
},
{
"name" : "longitude",
"type" : "float"
},
{
"name": "time",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
由于Python简单易懂,我们使用python代码进行消息发送和消费的演示
首先安装python依赖:
pip3 install confluent_kafka requests fastavro structlog avro kafkian
将以下内容保存为producer.py
, 第14行根据自己的KAFKA_BOOTSTRAP_SERVERS
做替换:
import random
import uuid
from datetime import datetime
import structlog
from confluent_kafka import avro
from kafkian import Producer
from kafkian.serde.avroserdebase import AvroRecord
from kafkian.serde.serialization import AvroSerializer, AvroStringKeySerializer
logger = structlog.getLogger(__name__)
# KAFKA_BOOTSTRAP_SERVERS和SCHEMA_REGISTRY_URL根据实际场景做替换
KAFKA_BOOTSTRAP_SERVERS = "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092"
SCHEMA_REGISTRY_URL = "http://localhost:8085"
value_schema_str = """
{
"namespace": "locations",
"name": "LocationReceived",
"type": "record",
"fields" : [
{
"name" : "deviceId",
"type" : "string"
},
{
"name" : "latitude",
"type" : "float"
},
{
"name" : "longitude",
"type" : "float"
},
{
"name": "time",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
"""
class LocationReceived(AvroRecord):
_schema = avro.loads(value_schema_str)
PRODUCER_CONFIG = {
'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS
}
producer = Producer(
PRODUCER_CONFIG,
key_serializer=AvroStringKeySerializer(SCHEMA_REGISTRY_URL),
value_serializer=AvroSerializer(SCHEMA_REGISTRY_URL)
)
def produce_location_received(device_id: str, latitude: float, longitude: float, time: datetime):
message = LocationReceived(dict(
deviceId=device_id,
latitude=latitude,
longitude=longitude,
time=int(time.timestamp() * 1000),
))
try:
producer.produce('location_ingress', device_id, message, sync=True)
print("produce to 'location_ingress' topic with", device_id, message)
except Exception:
logger.exception("Failed to produce LocationReceived event")
if __name__ == '__main__':
devices = [str(uuid.uuid4()) for _ in range(3)]
try:
while True:
device = random.choice(devices)
produce_location_received(
device,
59.3363 + random.random() / 100,
18.0262 + random.random() / 100,
datetime.now()
)
except KeyboardInterrupt:
pass
在第71行,我们看到消息是发往location_ingress
这个topic,所以我们提前进行创建:
创建完成后,运行producer.py
模拟发送坐标数据,一段时间后,ctrl + C
停止运行:
此时我们查看location_ingress
这个topic,会看到我们发送的数据:
将以下内容保存为consumer.py
, 第7行根据自己的KAFKA_BOOTSTRAP_SERVERS
做替换:
import structlog
from kafkian import Consumer
from kafkian.serde.deserialization import AvroDeserializer
logger = structlog.getLogger(__name__)
KAFKA_BOOTSTRAP_SERVERS = "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092"
SCHEMA_REGISTRY_URL = "http://localhost:8085"
CONSUMER_CONFIG = {
'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,
'default.topic.config': {
'auto.offset.reset': 'earliest',
},
'group.id': 'notifications'
}
def repr_message(message):
return {
'topic': message.topic,
'key': message.key,
'value': message.value,
}
def handler():
consumer = Consumer(
CONSUMER_CONFIG,
topics=['location_ingress'],
key_deserializer=AvroDeserializer(schema_registry_url=SCHEMA_REGISTRY_URL),
value_deserializer=AvroDeserializer(schema_registry_url=SCHEMA_REGISTRY_URL),
)
for message in consumer:
message_rep = repr_message(message)
logger.info("Handling message", **message_rep)
consumer.commit()
if __name__ == '__main__':
try:
handler()
except KeyboardInterrupt:
pass
运行脚本,成功的从topic中取出了消息,并且这些消息的格式是统一的:
参考: