获取MSK集群列表:
kongpingfan:~/environment $ aws kafka list-clusters | jq .ClusterInfoList[].ClusterArn
"arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4"
获取某个集群的详细信息:
aws kafka describe-cluster --cluster-arn <cluster-arn>
获取集群的Zookeeper连接地址:
# aws kafka describe-cluster --cluster-arn <cluster-arn> --query 'ClusterInfo.ZookeeperConnectString'
kongpingfan:~/environment $ aws kafka describe-cluster --cluster-arn arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4 --query 'ClusterInfo.ZookeeperConnectString'
"z-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181,z-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181,z-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181"
获取集群的Broker地址:
# aws kafka get-bootstrap-brokers --cluster-arn <cluster-arn>
kongpingfan:~/environment $ aws kafka get-bootstrap-brokers --cluster-arn arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4
{
"BootstrapBrokerStringSaslIam": "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9098",
"BootstrapBrokerStringTls": "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9094",
"BootstrapBrokerString": "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092"
}
上面两条命令获取到的地址在控制台里也可以看到:
# 获取kafka broker地址
export KAFKA=$(aws kafka get-bootstrap-brokers --cluster-arn <cluster-arn-here> --query BootstrapBrokerString --output text)
kafka-topics --bootstrap-server $KAFKA --list # 查看topic列表
kafka-topics --bootstrap-server $KAFKA --create --replication-factor 3 --partitions 3 --topic topic1 # 创建topic
打开一个新的控制台,启动消费者命令:
export KAFKA=$(aws kafka get-bootstrap-brokers --cluster-arn <cluster-arn-here> --query BootstrapBrokerString --output text)
kafka-console-consumer --bootstrap-server $KAFKA --topic topic1 --from-beginning
在原来的控制台执行:
kafka-console-producer --bootstrap-server $KAFKA --topic topic1
并发送消息到topic
在消费者端可以实时的接收到消息:
获取zookeeper的连接地址:
# aws kafka list-clusters --query 'ClusterInfoList[*].[ClusterName, ClusterArn, ZookeeperConnectString]'
kongpingfan:~/environment $ aws kafka list-clusters --query 'ClusterInfoList[*].[ClusterName, ClusterArn, ZookeeperConnectString]'
[
[
"MSKDemo",
"arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4",
"z-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181,z-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181,z-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181"
]
]
根据zookeeper-connect-string进行连接:
# zookeeper-shell <zookeeper-connect-string>
进入shell后,执行ls /brokers/topics
命令获取到所有的topic列表: