在Kafka2.4之后,kafka支持从follower副本读取数据, 可以大大降低consumer访问数据时的跨az费用。
在早先kafka的设计中,为了使consumer读取数据能够保持一致,只允许consumer读取leader副本的数据——follower replica只是单纯地做备份数据。在云上,Kafka存在于多个az,由于只能从leader replica消费数据,那么不得不进行跨az获取数据,而这些流量带宽通常是比较昂贵的,无法利用本地性来减少昂贵的跨az流量。另外,跨az也增加了延迟:
所以kafka推出这个功能,一则节省流量费用,二则降低跨az访问带来的延迟:
在MSK中,每个broker默认已经设置好了broker.rack
,例如us-west-2a
对应usw2-az1
要实现访问同az的partition,还需要再配置两个参数:
replica.selector.class:在broker端配置,值设置为为org.apache.kafka.common.replica.RackAwareReplicaSelector
client.rack:consumer端配置,这个参数需要和broker端指定的broker.rack
相同,表示去哪个rack中获取数据。
在MSK中创建新的Custom Configuration
:
点击Create configuration
后会打开新的页面:
设置replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
,并保存:
在上一个页面刷新,选择刚才创建的configuration,保存更改:
过一段时间后在整个MSK集群生效。
首先我们要知道EC2所在的az:
curl -w '\n' http://169.254.169.254/latest/meta-data/placement/availability-zone-id
如果使用命令行访问topic,则在后面加--consumer-property
参数,设置client.rack:
kafka-console-consumer --bootstrap-server=$KAFKA --topic first-topic --consumer-property client.rack=apse1-az2
关于如何开发并运行Kafka Java Code,请参考拙作
在java代码中,也可以设置client.rack
:
package com.kpingfan.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName());
String bootstrapServer = "b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-6.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-4.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092"; // 根据实际MSK broker地址做替换
String topicName = "first-topic"; // 提前创建好这个topic
String groupId = "application1";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
// 设置从apse1-az2读取partition的数据
properties.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "apse1-az2");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info("Key: " + record.key() + " Value: " + record.value());
logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
}
}
运行代码,从输出中发现client.rack
不再为空,而是使用了指定的值:
程序可以正常的获取到topic中的数据: