使用命令行操作MSK

获取zookeeper和broker连接地址

获取MSK集群列表:

kongpingfan:~/environment $ aws kafka list-clusters | jq .ClusterInfoList[].ClusterArn
"arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4"

获取某个集群的详细信息:

aws kafka describe-cluster --cluster-arn <cluster-arn>

image-20211223092328296

获取集群的Zookeeper连接地址:

# aws kafka describe-cluster --cluster-arn <cluster-arn> --query 'ClusterInfo.ZookeeperConnectString'

kongpingfan:~/environment $ aws kafka describe-cluster --cluster-arn arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4 --query 'ClusterInfo.ZookeeperConnectString'                                                                                                          
"z-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181,z-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181,z-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181"

获取集群的Broker地址:

# aws kafka get-bootstrap-brokers --cluster-arn <cluster-arn>

kongpingfan:~/environment $ aws kafka get-bootstrap-brokers --cluster-arn arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4
{
    "BootstrapBrokerStringSaslIam": "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9098,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9098", 
    "BootstrapBrokerStringTls": "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9094", 
    "BootstrapBrokerString": "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092"
}

上面两条命令获取到的地址在控制台里也可以看到:

image-20211223092712591

创建topic

# 获取kafka broker地址
export KAFKA=$(aws kafka get-bootstrap-brokers --cluster-arn <cluster-arn-here> --query BootstrapBrokerString --output text)

kafka-topics --bootstrap-server $KAFKA --list # 查看topic列表

kafka-topics --bootstrap-server $KAFKA --create --replication-factor 3 --partitions 3 --topic topic1  # 创建topic

image-20211223093327786

生产 / 消费消息

打开一个新的控制台,启动消费者命令:

export KAFKA=$(aws kafka get-bootstrap-brokers --cluster-arn <cluster-arn-here> --query BootstrapBrokerString --output text)
kafka-console-consumer --bootstrap-server $KAFKA --topic topic1 --from-beginning

在原来的控制台执行:

kafka-console-producer --bootstrap-server $KAFKA --topic topic1

并发送消息到topic

image-20211223093745896

在消费者端可以实时的接收到消息:

image-20211223093809465

Zookeeper shell

获取zookeeper的连接地址:

# aws kafka list-clusters --query 'ClusterInfoList[*].[ClusterName, ClusterArn, ZookeeperConnectString]'


kongpingfan:~/environment $ aws kafka list-clusters --query 'ClusterInfoList[*].[ClusterName, ClusterArn, ZookeeperConnectString]'
[
    [
        "MSKDemo", 
        "arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4", 
        "z-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181,z-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181,z-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:2181"
    ]
]

根据zookeeper-connect-string进行连接:

# zookeeper-shell <zookeeper-connect-string>

进入shell后,执行ls /brokers/topics命令获取到所有的topic列表:

image-20211223094029240