附:Postgres篇

本节将介绍MSK Connect连接Postgres的配置,Postgres和MySQL相比原理类似,但在配置上略有不同。MySQL通过Binlog进行数据CDC,而Postgres通过WAL做逻辑复制进行CDC。
如果对Postgres的逻辑复制不熟悉,建议先阅读 https://rds.kpingfan.com/01-logical-replication/

实验流程如下:

image-20220427073857818

  • MSK Connect使用逻辑复制,订阅Postgres中的主题。持续接收到Game表中变化的数据
  • MSK Connect将数据实时写入MSK中特定主题,供其他消费者使用

创建参数组

Aurora Postgres开启逻辑复制,需要将rds.logical_replication参数设置为1,所以我们先提前创建好一个参数组:

image-20220414210635222

类型选择DB Cluster Parameter Group,命名为aurora-postgres13-cdc:

image-20220414210740998

创建完成后,修改rds.logical_replication值,将其设置为1并保存:

image-20220414210831285

创建Aurora Postgres

点击创建数据库:

image-20220414211141678

选择Standard create + Postgres-Compatible Edition,选择Postgres 13版本:

image-20220414211453009

为cluster命名,设置数据库密码:

image-20220414211535443

这里不创建Replica,部署在默认VPC,使用默认安全组:

image-20220414211646394

在Additional configuration里,选择上一步创建的参数组:

image-20220414211739809

点击创建。大概十分钟左右创建完成

更改安全组

由于我们的EC2要连接到Aurora Postgres,所以我们将5432放开给上一节创建的机器访问。

default安全组放开5432端口:

image-20220414212146103

测试连接

Postgres创建完成后,可以在Cloud 9上测试是否连通:

image-20220414212403484

执行show wal_level命令,输出logical,证明已经开启逻辑复制:

postgres=> show wal_level;
 wal_level 
-----------
 logical
(1 row)

如果显示的是replica,则需要检查Aurora Postgres集群,是否使用了默认参数组。

准备IAM Role

https://msk.kpingfan.com/04.kafka-connect/02.%E5%87%86%E5%A4%87iam-role/ 操作一致,如果之前创建过,可略过本部分

配置Debezium Postgres Connector

本部分,我们将从Debebium官网( https://debezium.io/releases/ )下载Postgres connector plugin,并将其打包上传到S3, 以备后面的步骤使用。

官网上目前最新的版本是1.9,将其下载到本地:

image-20220414212859829

下载下来是tar.gz格式。由于MSK Connect只能使用ZIP或JAR格式,所以需要将其先转为ZIP格式:

$ tar xzf debezium-connector-postgres-1.9.0.Final-plugin.tar.gz
$ cd debezium-connector-postgres/
$ zip -9 ../debezium-connector-postgres-1.9.0.zip *
$ cd ..

最后将zip压缩包上传到S3上,并记录上传的路径:

aws s3 cp debezium-connector-postgres-1.9.0.zip s3://any-s3-buckets

创建数据表

本实验中MSK Connect将读取Postgres以下维度的数据变化:

  • database :postgres(连接上去的默认数据库,我们将不再创建新库)
  • schema: public(默认的schema,我们将不创建新的schema)
  • table : game, 我们需要创建一张新表

image-20220427074112176

在postgres中执行以下命令:

create table game (id1 int, id2 int);
insert into game   values (1,2);
insert into game   values (2,3);

image-20220427074453267

创建一个发布:

CREATE PUBLICATION alltables FOR ALL TABLES;

image-20220427074523768

由于此时还没有创建MSK Connect,故repliation slots为空

创建MSK Topic

如果MSK配置了auto.create.topics.enable=true,则跳过本部分。

否则创建一个新的topic:

kafka-topics --bootstrap-server $KAFKA --create --replication-factor 3 --partitions 3 --topic pgaaa.public.game 

创建MSK Connect

在MSK控制台的MSK Connect部分,选择Create connectors:

image-20211230225730490

第一步需要用到上一节我们准备好的Debezium Postgres Connector, 将它的S3路径填写上去:

image-20220414213537618

下一步输入Connector的名称和描述:

image-20220414213609077

接下来是选择目标MSK集群,这里已经提前创建好:

  • 它和Aurora在同一个VPC
  • Aurora的安全组放通了MSK所在的安全组,保证网络访问的通畅
  • 没有配置SSL/SASL认证

image-20220414222820604

在下面的Connector Configuration部分,使用如下的设置:

connector.class=io.debezium.connector.postgresql.PostgresConnector
database.user=postgres
database.dbname=postgres
tasks.max=1
database.hostname=pg2.cluster-cp68mt5zk18a.ap-southeast-1.rds.amazonaws.com
database.password=<Your password>
database.server.name=pgaaa
plugin.name=pgoutput
database.port=5432
publication.name=alltables

schema.include=public
table.include.list=public.game
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=op,db,table,schema,lsn,source.ts_ms

里面的参数有点多,可参考 https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties 我们挑重点的进行解释:

  • plugin.name – Using the logical decoding feature, an output plug-in enables clients to consume changes to the transaction log in a user-friendly manner. Debezium supports decoderbufs, wal2json and pgoutput plug-ins. Both wal2json and pgoutput are available in Amazon RDS for PostgreSQL and Amazon Aurora PostgreSQL. decoderbufs requires a separate installation and it is excluded from the option. Among the 2 supported plug-ins, pgoutput is selected because it is the standard logical decoding output plug-in in PostgreSQL 10+ and has better performance for large transactions .
  • publication.name – 上一部分,我们在Postgres创建了alltables这个发布
  • key.converter/value.converter – Although Avro serialization is recommended, JSON is a format that can be generated without schema registry and can be read by DeltaStreamer.
  • transforms* – A Debezium event data has a complex structure that provides a wealth of information. It can be quite difficult to process such a structure using DeltaStreamer. Debezium’s event flattening single message transformation (SMT) is configured to flatten the output payload.
  • database.server.name=pgaaa: MSK Connect将在MSK中创建名为pgaaa.public.game的主题,并将所有数据写入此主题,如果订阅了其他schema和其他table,则创建主题名为<database.server.name>.<schema_name>.<table_name>
  • table.include.list=public.game: 白名单机制,我们只订阅public schema下面game表的变化。

如果对以上参数不理解,强烈建议阅读拙作: https://rds.kpingfan.com/01-logical-replication/

继续往下,其他选项都保持默认。

Access permissions部分,我们已经创建好msk-connect-role,选中它:

image-20211230235143609

在security部分选择Plaintext traffic:

image-20211230235239808

进入下一步,Logs部分我们留空,继续进入下一步。

最后Review,并点击创建。创建MSK Connector的过程将持续几分钟,等待其创建完成。


在创建进入到后段,在postgres中执行 select * from pg_replication_slots;语句,已经能看到名为debezium的slot:

image-20220427075800685

测试

MSK Connect创建完成后,我们可以从game表中读取存量数据, kafka-console-consumer --bootstrap-server $KAFKA --from-beginning --topic pgaaa.public.game

image-20220427081131168

往postgres里新增一条数据, insert into game values(41,51);

image-20220427081214930

在topic里能实时读到新增的数据:

image-20220427081235033