python操作MSK

本节将演示如何使用python往MSK发布消息和接收消息

安装依赖

pip3 install boto3 kafka-python

消费端

将以下内容保存为consumer.py, region_namemsk_cluster_arn部分根据实际情况做替换:

#!/usr/bin/python3

from kafka import KafkaConsumer
import boto3
import logging as log
import sys

log.basicConfig(format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                datefmt='%Y-%m-%d %H:%M:%S',
                level=log.INFO)


def main(args):
    region_name = 'ap-southeast-1'  # 根据相应区域做替换
    msk_cluster_arn = 'arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4'  # 根据相应arn做替换
    topic = "topic1"
    try:
        # Get the bootstrap servers using MSK api
        msk = boto3.client('kafka', region_name = region_name)
        response = msk.get_bootstrap_brokers(ClusterArn=msk_cluster_arn)
        bootsrap_server = response['BootstrapBrokerString']

        # consume messages using KafkaConsumer
        consumer = KafkaConsumer(bootstrap_servers=bootsrap_server)
        consumer.subscribe(topic)
        for msg in consumer:
            log.info("Consumer Record: \n{}".format(msg))
    except Exception as ex:
        log.error(str(ex))


if __name__ == "__main__":
    main(sys.argv)

运行:

image-20211223123714468

生产端

将以下内容保存为producer.py, region_namemsk_cluster_arn部分根据实际情况做替换:

#!/usr/bin/python3

from kafka import KafkaProducer
import boto3
import logging as log
import sys

log.basicConfig(format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                datefmt='%Y-%m-%d %H:%M:%S',
                level=log.INFO)


def main(args):
    region_name = 'ap-southeast-1'  # 根据相应区域做替换
    msk_cluster_arn = 'arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4'  # 根据相应arn做替换
    topic = "topic1"
    message = "hello-from-python-msk-client"
    try:
        # Get the bootstrap servers using MSK api
        msk = boto3.client('kafka', region_name = region_name)
        response = msk.get_bootstrap_brokers(ClusterArn=msk_cluster_arn)
        bootsrap_server = response['BootstrapBrokerString']

        # produce message using KafkaProducer
        producer = KafkaProducer(bootstrap_servers=bootsrap_server)
        message_bytes = bytes(message, encoding='utf-8')
        producer.send(topic, value=message_bytes)
        producer.flush()
        log.info("Message published successfully.")
    except Exception as ex:
        log.error(str(ex))


if __name__ == "__main__":
    main(sys.argv)

运行producer.py,消息发布成功:

image-20211223123759484

此时在消费端可以看到实时接收的消息:

image-20211223123832733