本节将介绍使用MSK Connect如何将MSK Topic中的数据传输至AWS OpenSearch(或ElasticSearch)
流程如下:

创建OpenSearch集群的流程略过,创建完成后,复制Domain URL:

MSK Connect和Opensearch之前的网络要打通,可以在Opensearch的安全组上放通MSK集群的安全组。
到 https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch 下载ElasiticSearch Sink Connector包:

下载完成后,将zip文件上传至S3:
❯ aws s3 cp ~/Downloads/confluentinc-kafka-connect-elasticsearch-11.1.10.zip s3://{your-bucket}
在MSK页面上传plugin:

输入S3路径,为plugin命名,然后点击创建:

点击create connector:

选择上一步创建的plugin:

为connector命名:

选择MSK集群和认证方式:

输入Elastisearch Sink Connector的配置:

topics: 表示从哪个MSK topic中拉取数据connection.url: ElasticSearch 地址# Basic configuration for our connector
name=sink-elastic-distributed
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
# We can have parallelism here so we have two tasks!
tasks.max=2
topics=pg1.public.name
# the input topic has a schema, so we enable schemas conversion here too
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
# ElasticSearch connector specific configuration
# # http://docs.confluent.io/3.3.0/connect/connect-elasticsearch/docs/configuration_options.html
connection.url=https://vpc-test-es-f6rxybex5274ml434qa6ovslpi.ap-southeast-1.es.amazonaws.com
type.name=kafka-connect
# because our keys from the topics are null, we have key.ignore=true
key.ignore=true
其他配置参数参考: https://docs.confluent.io/kafka-connect-elasticsearch/current/configuration_options.html
例如ElasticSearch的帐号密码认证:

选择MSK Connect Role:

选择传输方式:

选择MSK Connect日志的输出位置,建议将其收集到CloudWatch Logs,便于排查错误:

Connector创建完成后,在Kafka的Topic中会增加一个消费者组,证明MSK Connect会持续的从Kafka中获取最新的数据,并将其传入ElasticSearch:

访问ElasticSearch数据,MSK Connect会自动创建好对应的index,并把topic中数据不断插入:

curl https://${ES_URL}/_cat/indices
curl 'https://${ES_URL}/${topic_name}/_search?q=*'