运行基于Avro的Producer&Consumer

本节将演示基于Avro的schema序列化与反序列化。

假设我们将构建一个地理位置的服务,这个服务会保存设备上传的坐标(经度和纬度)、时间戳、deviceId(UUID):

- deviceId: string

- time: long

- latitude: float

- longitude: float

所以Avro Schema如下:

{
   "namespace": "locations",
   "name": "LocationReceived",
   "type": "record",
   "fields" : [
     {
       "name" : "deviceId",
       "type" : "string"
     },
     {
       "name" : "latitude",
       "type" : "float"
     },
     {
       "name" : "longitude",
       "type" : "float"
     },
     {
        "name": "time",
        "type": {
            "type": "long",
            "logicalType": "timestamp-millis"
        }
     }
   ]
}

由于Python简单易懂,我们使用python代码进行消息发送和消费的演示

Produce messages

首先安装python依赖:

pip3 install confluent_kafka requests fastavro structlog avro kafkian

将以下内容保存为producer.py, 第14行根据自己的KAFKA_BOOTSTRAP_SERVERS做替换:

import random
import uuid
from datetime import datetime

import structlog
from confluent_kafka import avro
from kafkian import Producer
from kafkian.serde.avroserdebase import AvroRecord
from kafkian.serde.serialization import AvroSerializer, AvroStringKeySerializer

logger = structlog.getLogger(__name__)

# KAFKA_BOOTSTRAP_SERVERS和SCHEMA_REGISTRY_URL根据实际场景做替换
KAFKA_BOOTSTRAP_SERVERS = "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092"
SCHEMA_REGISTRY_URL = "http://localhost:8085"

value_schema_str = """
{
   "namespace": "locations",
   "name": "LocationReceived",
   "type": "record",
   "fields" : [
     {
       "name" : "deviceId",
       "type" : "string"
     },
     {
       "name" : "latitude",
       "type" : "float"
     },
     {
       "name" : "longitude",
       "type" : "float"
     },
     {
        "name": "time",
        "type": {
            "type": "long",
            "logicalType": "timestamp-millis"
        }
     }
   ]
}
"""


class LocationReceived(AvroRecord):
    _schema = avro.loads(value_schema_str)


PRODUCER_CONFIG = {
    'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS
}

producer = Producer(
    PRODUCER_CONFIG,
    key_serializer=AvroStringKeySerializer(SCHEMA_REGISTRY_URL),
    value_serializer=AvroSerializer(SCHEMA_REGISTRY_URL)
)


def produce_location_received(device_id: str, latitude: float, longitude: float, time: datetime):
    message = LocationReceived(dict(
        deviceId=device_id,
        latitude=latitude,
        longitude=longitude,
        time=int(time.timestamp() * 1000),
    ))

    try:
        producer.produce('location_ingress', device_id, message, sync=True)
        print("produce to 'location_ingress' topic with", device_id, message)
    except Exception:
        logger.exception("Failed to produce LocationReceived event")


if __name__ == '__main__':
    devices = [str(uuid.uuid4()) for _ in range(3)]
    try:
        while True:
            device = random.choice(devices)
            produce_location_received(
                device,
                59.3363 + random.random() / 100,
                18.0262 + random.random() / 100,
                datetime.now()
            )
    except KeyboardInterrupt:
        pass

在第71行,我们看到消息是发往location_ingress这个topic,所以我们提前进行创建:

image-20211230003201794

创建完成后,运行producer.py模拟发送坐标数据,一段时间后,ctrl + C停止运行:

image-20211230003315695

此时我们查看location_ingress这个topic,会看到我们发送的数据:

image-20211230003301890

Consume Messages

将以下内容保存为consumer.py, 第7行根据自己的KAFKA_BOOTSTRAP_SERVERS做替换:

import structlog
from kafkian import Consumer
from kafkian.serde.deserialization import AvroDeserializer


logger = structlog.getLogger(__name__)
KAFKA_BOOTSTRAP_SERVERS = "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092"
SCHEMA_REGISTRY_URL = "http://localhost:8085"


CONSUMER_CONFIG = {
    'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,
    'default.topic.config': {
        'auto.offset.reset': 'earliest',
    },
    'group.id': 'notifications'
}


def repr_message(message):
    return {
        'topic': message.topic,
        'key': message.key,
        'value': message.value,
    }


def handler():
    consumer = Consumer(
        CONSUMER_CONFIG,
        topics=['location_ingress'],
        key_deserializer=AvroDeserializer(schema_registry_url=SCHEMA_REGISTRY_URL),
        value_deserializer=AvroDeserializer(schema_registry_url=SCHEMA_REGISTRY_URL),
    )

    for message in consumer:
        message_rep = repr_message(message)
        logger.info("Handling message", **message_rep)
        consumer.commit()


if __name__ == '__main__':
    try:
        handler()
    except KeyboardInterrupt:
        pass

运行脚本,成功的从topic中取出了消息,并且这些消息的格式是统一的:

image-20211230004750914


参考:

https://medium.com/@saabeilin/kafka-hands-on-part-ii-producing-and-consuming-messages-in-python-44d5416f582e