FileSream Source Connector - Distributed模式

Kafka Connect Worker 分为两种模式,单机模式和分布式模式。 本节介绍分布式模式(distributed mode)的使用, 我们将部署分布式模式的Connector,它持续地从文件中读取最近数据,将其传输至Kafka Topic:

image-20220508165733166

创建connector

我们先启动一个容器,在里面创建topic filestream-distributed,用于后面的实验接收文件中新增的数据:

docker run --rm -it --net=host landoop/fast-data-dev bash
kafka-topics --create --topic filestream-distributed --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181

image-20220508165902866

从Ladoop UI进入Connectors页面:

image-20220508165944834

点击NEW,创建新的Connector:

image-20220508170004963

Lensesio已经帮我们内置好了三十多个Source ConnectorSink Connector。这里我们选择File Connector:

image-20220508170024736

将以下配置粘帖进去:

# These are standard kafka connect parameters, need for ALL connectors
name=filestream-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# Parameters can be found here: https://github.com/apache/kafka/blob/trunk/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
file=file2.txt
topic=filestream-distributed
# Added configuration for the distributed mode:
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

点击创建:

image-20220508170206551

测试

connector已经创建好,我们在里面配置了读取file2.txt文件的变化,应该如何往这个文件新增行呢?

首先进入容器内部shell:

image-20220508170317782

在当前目录下创建file2.txt,并往里面新写入几行数据:

image-20220508170428316

此时查看filestream-distributed topic中的数据,File Connector帮我们把新增的数据传入到了这个topic中:

image-20220508170515158