Kafka Connect Worker
分为两种模式,单机模式和分布式模式。 本节介绍分布式模式(distributed mode)的使用, 我们将部署分布式模式的Connector,它持续地从文件中读取最近数据,将其传输至Kafka Topic:
我们先启动一个容器,在里面创建topic filestream-distributed
,用于后面的实验接收文件中新增的数据:
docker run --rm -it --net=host landoop/fast-data-dev bash
kafka-topics --create --topic filestream-distributed --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181
从Ladoop UI进入Connectors
页面:
点击NEW
,创建新的Connector:
Lensesio已经帮我们内置好了三十多个Source Connector
和Sink Connector
。这里我们选择File Connector
:
将以下配置粘帖进去:
# These are standard kafka connect parameters, need for ALL connectors
name=filestream-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# Parameters can be found here: https://github.com/apache/kafka/blob/trunk/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
file=file2.txt
topic=filestream-distributed
# Added configuration for the distributed mode:
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
点击创建:
connector已经创建好,我们在里面配置了读取file2.txt
文件的变化,应该如何往这个文件新增行呢?
首先进入容器内部shell:
在当前目录下创建file2.txt
,并往里面新写入几行数据:
此时查看filestream-distributed
topic中的数据,File Connector帮我们把新增的数据传入到了这个topic中: