本节我们进行MSK Connector测试,我们预期的结果是——向Aurora数据表中插入或更新数据,在MSK的topic里能实时看到这些操作细节。
如果MSK使用了 cluster configuration
并将 auto.create.topics.enable
属性设置为 true
,则可直接进入下一步的操作。
否则要提前创建三个topic:dbhistory.ecommerce, ecommerce-server, ecommerce-server.ecommerce.orders
。 MSK Connector将往这几个topic中写数据。
创建topic可用cli:kafka-topics --bootstrap-server $BROKER_URL --create --replication-factor 10 --partitions 3 --topic $TOPIC_NAME
打开两个终端,分别对MSK的ecommerce-server
和ecommerce-server.ecommerce.orders
主题进行监控:
kafka-console-consumer --bootstrap-server $BROKER_URL \
--topic ecommerce-server --from-beginning
kafka-console-consumer --bootstrap-server $BROKER_URL \
--topic ecommerce-server.ecommerce.orders --from-beginning
ecommerce-server
主题是Debezium监控schema变化用的ecommerce-server.ecommerce.orders
(database server: ecommerce-server
, database: ecommerce
, table: orders
)主题是Debezium监控表数据变化用的在第三个终端里,我们将往Aurora里插入数据:
$ mysql -h <aurora-database-writer-instance-endpoint> -u <database-user> -p
# 连接上去后创建数据库和数据表
CREATE DATABASE ecommerce;
USE ecommerce
CREATE TABLE orders (
order_id VARCHAR(255),
customer_id VARCHAR(255),
item_description VARCHAR(255),
price DECIMAL(6,2),
order_date DATETIME DEFAULT CURRENT_TIMESTAMP
);
在第一个终端里, 我们能看到schema的变化已经被MSK Connect抓取到:
往数据表里插入几条数据:
INSERT INTO orders VALUES ("123456", "123", "A super noisy mechanical keyboard", "50.00", "2021-08-16 10:11:12");
INSERT INTO orders VALUES ("123457", "123", "An extremely wide monitor", "500.00", "2021-08-16 11:12:13");
INSERT INTO orders VALUES ("123458", "123", "A too sensible microphone", "150.00", "2021-08-16 12:13:14");
在第二个终端里,我们看到表数据的变化已经被MSK Connect抓取到:
CDC(change data capature)会持续由MSK Connect管理并运行