本节将介绍MSK Connect连接Postgres的配置,Postgres和MySQL相比原理类似,但在配置上略有不同。MySQL通过Binlog进行数据CDC,而Postgres通过WAL做逻辑复制进行CDC。
如果对Postgres的逻辑复制不熟悉,建议先阅读 https://rds.kpingfan.com/01-logical-replication/
实验流程如下:

Aurora Postgres开启逻辑复制,需要将rds.logical_replication参数设置为1,所以我们先提前创建好一个参数组:

类型选择DB Cluster Parameter Group,命名为aurora-postgres13-cdc:

创建完成后,修改rds.logical_replication值,将其设置为1并保存:

点击创建数据库:

选择Standard create + Postgres-Compatible Edition,选择Postgres 13版本:

为cluster命名,设置数据库密码:

这里不创建Replica,部署在默认VPC,使用默认安全组:

在Additional configuration里,选择上一步创建的参数组:

点击创建。大概十分钟左右创建完成
由于我们的EC2要连接到Aurora Postgres,所以我们将5432放开给上一节创建的机器访问。
default安全组放开5432端口:

Postgres创建完成后,可以在Cloud 9上测试是否连通:

执行show wal_level命令,输出logical,证明已经开启逻辑复制:
postgres=> show wal_level;
wal_level
-----------
logical
(1 row)
如果显示的是replica,则需要检查Aurora Postgres集群,是否使用了默认参数组。
和 https://msk.kpingfan.com/04.kafka-connect/02.%E5%87%86%E5%A4%87iam-role/ 操作一致,如果之前创建过,可略过本部分
本部分,我们将从Debebium官网( https://debezium.io/releases/ )下载Postgres connector plugin,并将其打包上传到S3, 以备后面的步骤使用。
官网上目前最新的版本是1.9,将其下载到本地:

下载下来是tar.gz格式。由于MSK Connect只能使用ZIP或JAR格式,所以需要将其先转为ZIP格式:
$ tar xzf debezium-connector-postgres-1.9.0.Final-plugin.tar.gz
$ cd debezium-connector-postgres/
$ zip -9 ../debezium-connector-postgres-1.9.0.zip *
$ cd ..
最后将zip压缩包上传到S3上,并记录上传的路径:
aws s3 cp debezium-connector-postgres-1.9.0.zip s3://any-s3-buckets
本实验中MSK Connect将读取Postgres以下维度的数据变化:

在postgres中执行以下命令:
create table game (id1 int, id2 int);
insert into game values (1,2);
insert into game values (2,3);

创建一个发布:
CREATE PUBLICATION alltables FOR ALL TABLES;

由于此时还没有创建MSK Connect,故repliation slots为空
如果MSK配置了auto.create.topics.enable=true,则跳过本部分。
否则创建一个新的topic:
kafka-topics --bootstrap-server $KAFKA --create --replication-factor 3 --partitions 3 --topic pgaaa.public.game
在MSK控制台的MSK Connect部分,选择Create connectors:

第一步需要用到上一节我们准备好的Debezium Postgres Connector, 将它的S3路径填写上去:

下一步输入Connector的名称和描述:

接下来是选择目标MSK集群,这里已经提前创建好:

在下面的Connector Configuration部分,使用如下的设置:
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.user=postgres
database.dbname=postgres
tasks.max=1
database.hostname=pg2.cluster-cp68mt5zk18a.ap-southeast-1.rds.amazonaws.com
database.password=<Your password>
database.server.name=pgaaa
plugin.name=pgoutput
database.port=5432
publication.name=alltables
schema.include=public
table.include.list=public.game
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=op,db,table,schema,lsn,source.ts_ms
里面的参数有点多,可参考 https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties 我们挑重点的进行解释:
alltables这个发布database.server.name=pgaaa: MSK Connect将在MSK中创建名为pgaaa.public.game的主题,并将所有数据写入此主题,如果订阅了其他schema和其他table,则创建主题名为<database.server.name>.<schema_name>.<table_name>table.include.list=public.game: 白名单机制,我们只订阅public schema下面game表的变化。如果对以上参数不理解,强烈建议阅读拙作: https://rds.kpingfan.com/01-logical-replication/
继续往下,其他选项都保持默认。
在Access permissions部分,我们已经创建好msk-connect-role,选中它:

在security部分选择Plaintext traffic:

进入下一步,Logs部分我们留空,继续进入下一步。
最后Review,并点击创建。创建MSK Connector的过程将持续几分钟,等待其创建完成。
在创建进入到后段,在postgres中执行 select * from pg_replication_slots;语句,已经能看到名为debezium的slot:

MSK Connect创建完成后,我们可以从game表中读取存量数据, kafka-console-consumer --bootstrap-server $KAFKA --from-beginning --topic pgaaa.public.game:

往postgres里新增一条数据, insert into game values(41,51);:

在topic里能实时读到新增的数据:
