首先设置BOOTSTAPE_SERVER
环境变量,在后面都将使用它来连接Kafka集群。
例如:
export BOOTSTRAP_SERVER=b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-4.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-6.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092
创建topic:
$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --create --replication-factor=3 --partitions=3
Created topic first-topic.
列出创建的topic:
$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --list
查看某个topic的详细信息:
$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --describe
Topic: first-topic PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2,message.format.version=2.7-IV2,unclean.leader.election.enable=true
Topic: first-topic Partition: 0 Leader: 6 Replicas: 6,4,2 Isr: 6,4,2
Topic: first-topic Partition: 1 Leader: 4 Replicas: 4,2,3 Isr: 4,2,3
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
删除topic:
$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --delete
往某个topic里写数据:
$ kafka-topics --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --create --replication-factor=3 --partitions=3
$ kafka-console-producer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic
可在producer-property
参数指定ack模式, 例如acks=all
:
$ kafka-console-producer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --producer-property acks=all
带key的producer:
# Produce message with keys
$ kafka-console-producer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --property parse.key=true --property key.separator=,
> key,value
> another key,another value
此时发送的消息中如果不带key,则会报错:
Consumer with keys:
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,
从最新位置读取消息(consumer不会读取到原来topic中已有的数据):
$ kafka-console-consumer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic
如果想从头开始读取topic里所有的数据,则可以使用参数from-beginning
:
$ kafka-console-consumer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic --from-beginning