MSK Connect - ElasticSearch Sink

本节将介绍使用MSK Connect如何将MSK Topic中的数据传输至AWS OpenSearch(或ElasticSearch)

流程如下:

image-20220510113657803

创建OpenSearch集群

创建OpenSearch集群的流程略过,创建完成后,复制Domain URL:

image-20220510113834347

MSK Connect和Opensearch之前的网络要打通,可以在Opensearch的安全组上放通MSK集群的安全组。

上传Elasticsearch Sink Connector

https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch 下载ElasiticSearch Sink Connector包:

image-20220510135646044

下载完成后,将zip文件上传至S3:

❯ aws s3 cp ~/Downloads/confluentinc-kafka-connect-elasticsearch-11.1.10.zip s3://{your-bucket}

在MSK页面上传plugin:

image-20220510140432395

输入S3路径,为plugin命名,然后点击创建:

image-20220510140518372

创建MSK Connect

点击create connector:

image-20220510140558066

选择上一步创建的plugin:

image-20220510140628335

为connector命名:

image-20220510140655561

选择MSK集群和认证方式:

image-20220510140713081

输入Elastisearch Sink Connector的配置:

image-20220510140823940

  • topics: 表示从哪个MSK topic中拉取数据
  • connection.url: ElasticSearch 地址
# Basic configuration for our connector
name=sink-elastic-distributed
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
# We can have parallelism here so we have two tasks!
tasks.max=2
topics=pg1.public.name
# the input topic has a schema, so we enable schemas conversion here too
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

# ElasticSearch connector specific configuration
# # http://docs.confluent.io/3.3.0/connect/connect-elasticsearch/docs/configuration_options.html
connection.url=https://vpc-test-es-f6rxybex5274ml434qa6ovslpi.ap-southeast-1.es.amazonaws.com
type.name=kafka-connect
# because our keys from the topics are null, we have key.ignore=true
key.ignore=true

其他配置参数参考: https://docs.confluent.io/kafka-connect-elasticsearch/current/configuration_options.html

例如ElasticSearch的帐号密码认证:

image-20220510135632339

选择MSK Connect Role:

image-20220510140918595

选择传输方式:

image-20220510140929095

选择MSK Connect日志的输出位置,建议将其收集到CloudWatch Logs,便于排查错误:

image-20220510140946306

验证

Connector创建完成后,在Kafka的Topic中会增加一个消费者组,证明MSK Connect会持续的从Kafka中获取最新的数据,并将其传入ElasticSearch:

image-20220510141050499

访问ElasticSearch数据,MSK Connect会自动创建好对应的index,并把topic中数据不断插入:

image-20220510141200316

curl https://${ES_URL}/_cat/indices
curl 'https://${ES_URL}/${topic_name}/_search?q=*'