本节将介绍使用MSK Connect如何将MSK Topic中的数据传输至AWS OpenSearch(或ElasticSearch)
流程如下:
创建OpenSearch集群的流程略过,创建完成后,复制Domain URL:
MSK Connect和Opensearch之前的网络要打通,可以在Opensearch的安全组上放通MSK集群的安全组。
到 https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch 下载ElasiticSearch Sink Connector
包:
下载完成后,将zip文件上传至S3:
❯ aws s3 cp ~/Downloads/confluentinc-kafka-connect-elasticsearch-11.1.10.zip s3://{your-bucket}
在MSK页面上传plugin:
输入S3路径,为plugin命名,然后点击创建:
点击create connector
:
选择上一步创建的plugin:
为connector命名:
选择MSK集群和认证方式:
输入Elastisearch Sink Connector
的配置:
topics
: 表示从哪个MSK topic中拉取数据connection.url
: ElasticSearch 地址# Basic configuration for our connector
name=sink-elastic-distributed
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
# We can have parallelism here so we have two tasks!
tasks.max=2
topics=pg1.public.name
# the input topic has a schema, so we enable schemas conversion here too
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
# ElasticSearch connector specific configuration
# # http://docs.confluent.io/3.3.0/connect/connect-elasticsearch/docs/configuration_options.html
connection.url=https://vpc-test-es-f6rxybex5274ml434qa6ovslpi.ap-southeast-1.es.amazonaws.com
type.name=kafka-connect
# because our keys from the topics are null, we have key.ignore=true
key.ignore=true
其他配置参数参考: https://docs.confluent.io/kafka-connect-elasticsearch/current/configuration_options.html
例如ElasticSearch的帐号密码认证:
选择MSK Connect Role:
选择传输方式:
选择MSK Connect日志的输出位置,建议将其收集到CloudWatch Logs,便于排查错误:
Connector创建完成后,在Kafka的Topic中会增加一个消费者组,证明MSK Connect会持续的从Kafka中获取最新的数据,并将其传入ElasticSearch:
访问ElasticSearch数据,MSK Connect会自动创建好对应的index,并把topic中数据不断插入:
curl https://${ES_URL}/_cat/indices
curl 'https://${ES_URL}/${topic_name}/_search?q=*'