CDC测试

本节我们进行MSK Connector测试,我们预期的结果是——向Aurora数据表中插入或更新数据,在MSK的topic里能实时看到这些操作细节。

创建Topic

如果MSK使用了 cluster configuration 并将 auto.create.topics.enable 属性设置为 true,则可直接进入下一步的操作。

否则要提前创建三个topic:dbhistory.ecommerce, ecommerce-server, ecommerce-server.ecommerce.orders。 MSK Connector将往这几个topic中写数据。

image-20211231082542494

创建topic可用cli:kafka-topics --bootstrap-server $BROKER_URL --create --replication-factor 10 --partitions 3 --topic $TOPIC_NAME

测试

打开两个终端,分别对MSK的ecommerce-serverecommerce-server.ecommerce.orders 主题进行监控:

kafka-console-consumer --bootstrap-server  $BROKER_URL \
--topic ecommerce-server --from-beginning

kafka-console-consumer --bootstrap-server  $BROKER_URL \
--topic ecommerce-server.ecommerce.orders --from-beginning
  • ecommerce-server主题是Debezium监控schema变化用的
  • ecommerce-server.ecommerce.orders(database server: ecommerce-server, database: ecommerce, table: orders)主题是Debezium监控表数据变化用的

image-20211231083047546

在第三个终端里,我们将往Aurora里插入数据:

$ mysql -h <aurora-database-writer-instance-endpoint> -u <database-user> -p

# 连接上去后创建数据库和数据表
CREATE DATABASE ecommerce;

USE ecommerce

CREATE TABLE orders (
       order_id VARCHAR(255),
       customer_id VARCHAR(255),
       item_description VARCHAR(255),
       price DECIMAL(6,2),
       order_date DATETIME DEFAULT CURRENT_TIMESTAMP
);

在第一个终端里, 我们能看到schema的变化已经被MSK Connect抓取到:

image-20211231083502580

往数据表里插入几条数据:

INSERT INTO orders VALUES ("123456", "123", "A super noisy mechanical keyboard", "50.00", "2021-08-16 10:11:12");
INSERT INTO orders VALUES ("123457", "123", "An extremely wide monitor", "500.00", "2021-08-16 11:12:13");
INSERT INTO orders VALUES ("123458", "123", "A too sensible microphone", "150.00", "2021-08-16 12:13:14");

在第二个终端里,我们看到表数据的变化已经被MSK Connect抓取到:

image-20211231083620933

CDC(change data capature)会持续由MSK Connect管理并运行