Consumer Group相关命令

Consumer Group是kafka提供的可扩展且具有容错性的消费者机制。组内有一个或多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。


当我们没有指定group时,kafka console其实在后台为我们默认创建了一个group,例如console-consumer-12345

$ kafka-console-consumer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic

设置group

使用--group参数可以设置group:

$ kafka-console-consumer --bootstrap-server $BOOTSTRAP_SERVER --topic topic_name --group group_name

例如:

first-topic有三个partition,如果先启动两个consumer来读它,会发现数据随机在两个consumer里被消费:

image-20220123144055000

image-20220123144143606

image-20220123144203775


查看consumer-group信息

使用kafka-consumer-groups.sh可以查看所有consumer group列表:

 kafka-consumer-groups --bootstrap-server=$BOOTSTRAP_SERVER --list

image-20220123144315742

使用describe可以查看某个consumer group的详细信息,例如当前连接的consumer,LAG等:

kafka-consumer-groups --bootstrap-server=$BOOTSTRAP_SERVER --describe --group first-consumer-group

image-20220123144852542

  • 由于此时两个consumer一直和producer保持连接,所以LAG为0

  • CONSUMER-ID这一列能看到总共有两个不同的consumer id


如果topic中有新的数据,且当前的consumer没有接收数据会产生LAG

主动把两个consumer的连接都断掉,然后使用producer往topic里写数据:

image-20220123145243838

  • 由于此时没有consumer连接,producer新写入到topic的数据就会产生堆积
  • 如果consumer重新连接会读取到新写入的数据,再执行describe命令时不会看到有LAG

Reset Offsets

consumer group只能消费一次topic里的数据, 因为consumer消费到数据后会实时提交offset。

如果想重新开始遍历之前的数据,可以进行reset操作。


不加任何参数执行kafka-consumer-groups.sh, 看到有一个选项reset-offsets,里面以不同方式(--to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current)对offset进行重置:

image-20201206155347797


例如将first-consumer-group重置到最早的位置:

$ kafka-consumer-groups --bootstrap-server=$BOOTSTRAP_SERVER --group first-consumer-group --reset-offsets --to-earliest --topic first-topic  --execute

image-20220123151506309

可以选择某个topic进行重置,或者使用all-topics对所有topic进行offset重置

启动consumer,此时会重新获取topic中所有的数据:

image-20220123151644470

shift-by重置

shift-by表示向前或向后重置N个单位offset,如果单位是负值则向前重置。

$ kafka-consumer-groups --bootstrap-server=$BOOTSTRAP_SERVER --group first-consumer-group --reset-offsets --shift-by -2 --topic first-topic  --execute

image-20220123152015174

启动consumer,此时会重新到获取3个partition的6条数据:

image-20220123152158183


仅当没有consumer连接时,才可以重置。
如果有consumer连接,则不能重置,会报以下错误:

image-20220123152256364