本节将演示如何使用python往MSK发布消息和接收消息
pip3 install boto3 kafka-python
将以下内容保存为consumer.py
, region_name
和msk_cluster_arn
部分根据实际情况做替换:
#!/usr/bin/python3
from kafka import KafkaConsumer
import boto3
import logging as log
import sys
log.basicConfig(format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=log.INFO)
def main(args):
region_name = 'ap-southeast-1' # 根据相应区域做替换
msk_cluster_arn = 'arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4' # 根据相应arn做替换
topic = "topic1"
try:
# Get the bootstrap servers using MSK api
msk = boto3.client('kafka', region_name = region_name)
response = msk.get_bootstrap_brokers(ClusterArn=msk_cluster_arn)
bootsrap_server = response['BootstrapBrokerString']
# consume messages using KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers=bootsrap_server)
consumer.subscribe(topic)
for msg in consumer:
log.info("Consumer Record: \n{}".format(msg))
except Exception as ex:
log.error(str(ex))
if __name__ == "__main__":
main(sys.argv)
运行:
将以下内容保存为producer.py
, region_name
和msk_cluster_arn
部分根据实际情况做替换:
#!/usr/bin/python3
from kafka import KafkaProducer
import boto3
import logging as log
import sys
log.basicConfig(format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=log.INFO)
def main(args):
region_name = 'ap-southeast-1' # 根据相应区域做替换
msk_cluster_arn = 'arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4' # 根据相应arn做替换
topic = "topic1"
message = "hello-from-python-msk-client"
try:
# Get the bootstrap servers using MSK api
msk = boto3.client('kafka', region_name = region_name)
response = msk.get_bootstrap_brokers(ClusterArn=msk_cluster_arn)
bootsrap_server = response['BootstrapBrokerString']
# produce message using KafkaProducer
producer = KafkaProducer(bootstrap_servers=bootsrap_server)
message_bytes = bytes(message, encoding='utf-8')
producer.send(topic, value=message_bytes)
producer.flush()
log.info("Message published successfully.")
except Exception as ex:
log.error(str(ex))
if __name__ == "__main__":
main(sys.argv)
运行producer.py
,消息发布成功:
此时在消费端可以看到实时接收的消息: