限制Consumer从同AZ获取数据

在Kafka2.4之后,kafka支持从follower副本读取数据, 可以大大降低consumer访问数据时的跨az费用。

在早先kafka的设计中,为了使consumer读取数据能够保持一致,只允许consumer读取leader副本的数据——follower replica只是单纯地做备份数据。在云上,Kafka存在于多个az,由于只能从leader replica消费数据,那么不得不进行跨az获取数据,而这些流量带宽通常是比较昂贵的,无法利用本地性来减少昂贵的跨az流量。另外,跨az也增加了延迟:

image-20220207180205037

所以kafka推出这个功能,一则节省流量费用,二则降低跨az访问带来的延迟:

image-20220207180312789


在MSK中,每个broker默认已经设置好了broker.rack ,例如us-west-2a对应usw2-az1

image-20220207181104295

要实现访问同az的partition,还需要再配置两个参数:

replica.selector.class:在broker端配置,值设置为为org.apache.kafka.common.replica.RackAwareReplicaSelector

client.rack:consumer端配置,这个参数需要和broker端指定的broker.rack相同,表示去哪个rack中获取数据。

MSK开启replica.selector.class

在MSK中创建新的Custom Configuration:

image-20220208095518019

点击Create configuration后会打开新的页面:

image-20220208095540090

设置replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector,并保存:

image-20220208095803317

在上一个页面刷新,选择刚才创建的configuration,保存更改:

image-20220208095829909

过一段时间后在整个MSK集群生效。

client端测试

首先我们要知道EC2所在的az:

curl -w '\n' http://169.254.169.254/latest/meta-data/placement/availability-zone-id 

image-20220208102334879

如果使用命令行访问topic,则在后面加--consumer-property参数,设置client.rack:

kafka-console-consumer --bootstrap-server=$KAFKA --topic first-topic --consumer-property client.rack=apse1-az2

image-20220208104409566

关于如何开发并运行Kafka Java Code,请参考拙作

在java代码中,也可以设置client.rack

package com.kpingfan.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {
    public static void main(String[] args) {
        Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName());

        String bootstrapServer = "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-6.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-4.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092";  // 根据实际MSK broker地址做替换
        String topicName = "first-topic";  // 提前创建好这个topic
        String groupId = "application1";

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
      
        // 设置从apse1-az2读取partition的数据
        properties.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "apse1-az2");
      
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // subscribe consumer to our topic(s)
        consumer.subscribe(Arrays.asList(topicName));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                logger.info("Key: " + record.key() + " Value: " + record.value());
                logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
            }
        }

    }
}

运行代码,从输出中发现client.rack不再为空,而是使用了指定的值:

image-20220208102511101

程序可以正常的获取到topic中的数据:

image-20220208103421124


参考: https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica

https://zhuanlan.zhihu.com/p/324497008

https://developers.redhat.com/blog/2020/04/29/consuming-messages-from-closest-replicas-in-apache-kafka-2-4-0-and-amq-streams#log_comparison