本节我们将创建MSK Connect,它是沟通 Aurora与MSK之间的桥梁,既能访问Aurora集群,又可以往MSK topic里写数据。配置较为复杂,我们将一一道来
在MSK控制台的MSK Connect
部分,选择Create connectors
:
第一步需要用到上一节我们准备好的Debezium MySQL Connector, 将它的S3路径填写上去:
下一步输入Connector的名称和描述:
接下来是选择目标MSK集群,这里已经提前创建好:
在下面的Connector Configuration
部分,使用如下的设置:
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=<aurora-database-writer-instance-endpoint, 例如database-1.cluster-cp68mt5zk18a.ap-southeast-1.rds.amazonaws.com>
database.port=3306
database.user=admin
database.password=<your-secret>
database.server.id=123456
database.server.name=ecommerce-server
database.include.list=ecommerce
database.history.kafka.topic=dbhistory.ecommerce
database.history.kafka.bootstrap.servers=<bootstrap servers, 例如b-2.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-3.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092>
include.schema.changes=true
connector.class
和tasks.max
是必填的配置:
connector.class
是connector使用的java类tasks.max
:connector创建任务的最大数量其他就是Debezium MySQL connector
的特有配置:
database.hostname
:Aurora数据库的写节点endpointdatabase.server.name
:数据库的逻辑别名,Debezium将基于此创建一个Kafka topic。database.include.list
:要监控的数据库列表database.history.kafka.topic
:Debedium使用此 Kafka topic 来追踪数据库schema的变化database.history.kafka.bootstrap.servers
:MSK的broker连接地址在Connector capacity设置部分,使用默认的autoscaled
,其他选项都保持默认。
在 autoscaled capacity可以配置以下参数
在Access permissions
部分,第二节我们已经创建好msk-connect-role
,选中它:
在security部分选择Plaintext traffic
:
Logs部分,提前创建好一个Cloudwatch Log Group,MSK Connect的相关操作日志将往里面写:
Logs是可选配置,可以留空
最后Review,并点击创建。创建MSK Connector的过程将持续几分钟,等待其创建完成: