本节将介绍MSK Connect连接Postgres的配置,Postgres和MySQL相比原理类似,但在配置上略有不同。MySQL通过Binlog进行数据CDC,而Postgres通过WAL做逻辑复制进行CDC。
如果对Postgres的逻辑复制不熟悉,建议先阅读 https://rds.kpingfan.com/01-logical-replication/
实验流程如下:
Aurora Postgres开启逻辑复制,需要将rds.logical_replication
参数设置为1,所以我们先提前创建好一个参数组:
类型选择DB Cluster Parameter Group
,命名为aurora-postgres13-cdc
:
创建完成后,修改rds.logical_replication
值,将其设置为1并保存:
点击创建数据库:
选择Standard create
+ Postgres-Compatible Edition
,选择Postgres 13版本:
为cluster命名,设置数据库密码:
这里不创建Replica,部署在默认VPC,使用默认安全组:
在Additional configuration里,选择上一步创建的参数组:
点击创建。大概十分钟左右创建完成
由于我们的EC2要连接到Aurora Postgres,所以我们将5432放开给上一节创建的机器访问。
default安全组放开5432端口:
Postgres创建完成后,可以在Cloud 9上测试是否连通:
执行show wal_level
命令,输出logical,证明已经开启逻辑复制:
postgres=> show wal_level;
wal_level
-----------
logical
(1 row)
如果显示的是replica,则需要检查Aurora Postgres集群,是否使用了默认参数组。
和 https://msk.kpingfan.com/04.kafka-connect/02.%E5%87%86%E5%A4%87iam-role/ 操作一致,如果之前创建过,可略过本部分
本部分,我们将从Debebium官网( https://debezium.io/releases/ )下载Postgres connector plugin,并将其打包上传到S3, 以备后面的步骤使用。
官网上目前最新的版本是1.9,将其下载到本地:
下载下来是tar.gz
格式。由于MSK Connect只能使用ZIP或JAR格式,所以需要将其先转为ZIP格式:
$ tar xzf debezium-connector-postgres-1.9.0.Final-plugin.tar.gz
$ cd debezium-connector-postgres/
$ zip -9 ../debezium-connector-postgres-1.9.0.zip *
$ cd ..
最后将zip压缩包上传到S3上,并记录上传的路径:
aws s3 cp debezium-connector-postgres-1.9.0.zip s3://any-s3-buckets
本实验中MSK Connect将读取Postgres以下维度的数据变化:
在postgres中执行以下命令:
create table game (id1 int, id2 int);
insert into game values (1,2);
insert into game values (2,3);
创建一个发布:
CREATE PUBLICATION alltables FOR ALL TABLES;
由于此时还没有创建MSK Connect,故repliation slots为空
如果MSK配置了auto.create.topics.enable=true
,则跳过本部分。
否则创建一个新的topic:
kafka-topics --bootstrap-server $KAFKA --create --replication-factor 3 --partitions 3 --topic pgaaa.public.game
在MSK控制台的MSK Connect
部分,选择Create connectors
:
第一步需要用到上一节我们准备好的Debezium Postgres Connector, 将它的S3路径填写上去:
下一步输入Connector的名称和描述:
接下来是选择目标MSK集群,这里已经提前创建好:
在下面的Connector Configuration
部分,使用如下的设置:
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.user=postgres
database.dbname=postgres
tasks.max=1
database.hostname=pg2.cluster-cp68mt5zk18a.ap-southeast-1.rds.amazonaws.com
database.password=<Your password>
database.server.name=pgaaa
plugin.name=pgoutput
database.port=5432
publication.name=alltables
schema.include=public
table.include.list=public.game
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=op,db,table,schema,lsn,source.ts_ms
里面的参数有点多,可参考 https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties 我们挑重点的进行解释:
alltables
这个发布database.server.name=pgaaa
: MSK Connect将在MSK中创建名为pgaaa.public.game
的主题,并将所有数据写入此主题,如果订阅了其他schema和其他table,则创建主题名为<database.server.name>.<schema_name>.<table_name>
table.include.list=public.game
: 白名单机制,我们只订阅public schema下面game表的变化。如果对以上参数不理解,强烈建议阅读拙作: https://rds.kpingfan.com/01-logical-replication/
继续往下,其他选项都保持默认。
在Access permissions
部分,我们已经创建好msk-connect-role
,选中它:
在security部分选择Plaintext traffic
:
进入下一步,Logs部分我们留空,继续进入下一步。
最后Review,并点击创建。创建MSK Connector的过程将持续几分钟,等待其创建完成。
在创建进入到后段,在postgres中执行 select * from pg_replication_slots;
语句,已经能看到名为debezium
的slot:
MSK Connect创建完成后,我们可以从game表中读取存量数据, kafka-console-consumer --bootstrap-server $KAFKA --from-beginning --topic pgaaa.public.game
:
往postgres里新增一条数据, insert into game values(41,51);
:
在topic里能实时读到新增的数据: