Topic/Consumer/Producer相关命令

首先设置BOOTSTAPE_SERVER环境变量,在后面都将使用它来连接Kafka集群。

例如:

export BOOTSTRAP_SERVER=b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-4.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-6.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092

Topic相关命令

创建topic:

$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --create --replication-factor=3 --partitions=3
Created topic first-topic.

列出创建的topic:

$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --list

image-20220116231506308

查看某个topic的详细信息:

$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --describe

Topic: first-topic      PartitionCount: 3       ReplicationFactor: 3    Configs: min.insync.replicas=2,message.format.version=2.7-IV2,unclean.leader.election.enable=true
        Topic: first-topic      Partition: 0    Leader: 6       Replicas: 6,4,2 Isr: 6,4,2
        Topic: first-topic      Partition: 1    Leader: 4       Replicas: 4,2,3 Isr: 4,2,3
        Topic: first-topic      Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

删除topic:

$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --delete

Producer相关命令

往某个topic里写数据:

$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --create --replication-factor=3 --partitions=3
$ kafka-console-producer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic 

image-20220116231542627

可在producer-property参数指定ack模式, 例如acks=all

$ kafka-console-producer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --producer-property acks=all

带key的producer:

# Produce message with keys
$ kafka-console-producer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --property parse.key=true --property key.separator=,
> key,value
> another key,another value

此时发送的消息中如果不带key,则会报错:

image-20220116230540690

Consumer with keys:

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,

Consumer相关命令

从最新位置读取消息(consumer不会读取到原来topic中已有的数据):

$ kafka-console-consumer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic

image-20220117085240935

如果想从头开始读取topic里所有的数据,则可以使用参数from-beginning:

$ kafka-console-consumer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --from-beginning

image-20220117085327749