本节我们将创建MSK Connect,它是沟通 Aurora与MSK之间的桥梁,既能访问Aurora集群,又可以往MSK topic里写数据。配置较为复杂,我们将一一道来

在MSK控制台的MSK Connect部分,选择Create connectors:

第一步需要用到上一节我们准备好的Debezium MySQL Connector, 将它的S3路径填写上去:

下一步输入Connector的名称和描述:

接下来是选择目标MSK集群,这里已经提前创建好:

在下面的Connector Configuration部分,使用如下的设置:
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=<aurora-database-writer-instance-endpoint, 例如database-1.cluster-cp68mt5zk18a.ap-southeast-1.rds.amazonaws.com>
database.port=3306
database.user=admin
database.password=<your-secret>
database.server.id=123456
database.server.name=ecommerce-server
database.include.list=ecommerce
database.history.kafka.topic=dbhistory.ecommerce
database.history.kafka.bootstrap.servers=<bootstrap servers, 例如b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092>
include.schema.changes=true
connector.class和tasks.max是必填的配置:
connector.class 是connector使用的java类tasks.max :connector创建任务的最大数量其他就是Debezium MySQL connector的特有配置:
database.hostname :Aurora数据库的写节点endpointdatabase.server.name :数据库的逻辑别名,Debezium将基于此创建一个Kafka topic。database.include.list :要监控的数据库列表database.history.kafka.topic :Debedium使用此 Kafka topic 来追踪数据库schema的变化database.history.kafka.bootstrap.servers :MSK的broker连接地址在Connector capacity设置部分,使用默认的autoscaled ,其他选项都保持默认。

在 autoscaled capacity可以配置以下参数

在Access permissions部分,第二节我们已经创建好msk-connect-role,选中它:

在security部分选择Plaintext traffic:

Logs部分,提前创建好一个Cloudwatch Log Group,MSK Connect的相关操作日志将往里面写:
Logs是可选配置,可以留空

最后Review,并点击创建。创建MSK Connector的过程将持续几分钟,等待其创建完成:
