Consumer Group
是kafka提供的可扩展且具有容错性的消费者机制。组内有一个或多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。
当我们没有指定group时,kafka console其实在后台为我们默认创建了一个group,例如console-consumer-12345
$ kafka-console-consumer --bootstrap-server $BOOTSTRAP_SERVER --topic first-topic
使用--group
参数可以设置group:
$ kafka-console-consumer --bootstrap-server $BOOTSTRAP_SERVER --topic topic_name --group group_name
例如:
first-topic
有三个partition,如果先启动两个consumer来读它,会发现数据随机在两个consumer里被消费:
使用kafka-consumer-groups.sh
可以查看所有consumer group列表:
kafka-consumer-groups --bootstrap-server=$BOOTSTRAP_SERVER --list
使用describe可以查看某个consumer group的详细信息,例如当前连接的consumer,LAG等:
kafka-consumer-groups --bootstrap-server=$BOOTSTRAP_SERVER --describe --group first-consumer-group
由于此时两个consumer一直和producer保持连接,所以LAG为0
CONSUMER-ID这一列能看到总共有两个不同的consumer id
如果topic中有新的数据,且当前的consumer没有接收数据会产生LAG
主动把两个consumer的连接都断掉,然后使用producer往topic里写数据:
- 由于此时没有consumer连接,producer新写入到topic的数据就会产生堆积
- 如果consumer重新连接会读取到新写入的数据,再执行describe命令时不会看到有LAG
consumer group只能消费一次topic里的数据, 因为consumer消费到数据后会实时提交offset。
如果想重新开始遍历之前的数据,可以进行reset操作。
不加任何参数执行kafka-consumer-groups.sh
, 看到有一个选项reset-offsets
,里面以不同方式(--to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current
)对offset进行重置:
例如将first-consumer-group
重置到最早的位置:
$ kafka-consumer-groups --bootstrap-server=$BOOTSTRAP_SERVER --group first-consumer-group --reset-offsets --to-earliest --topic first-topic --execute
可以选择某个topic进行重置,或者使用
all-topics
对所有topic进行offset重置
启动consumer,此时会重新获取topic中所有的数据:
shift-by
表示向前或向后重置N个单位offset,如果单位是负值则向前重置。
$ kafka-consumer-groups --bootstrap-server=$BOOTSTRAP_SERVER --group first-consumer-group --reset-offsets --shift-by -2 --topic first-topic --execute
启动consumer,此时会重新到获取3个partition的6条数据:
仅当没有consumer连接时,才可以重置。
如果有consumer连接,则不能重置,会报以下错误: