FileSream Source Connector - Standalone模式

本章开始我们提到,Kafka Connect的Worker 分为两种模式,单机模式(standalone)和分布式模式

单机模式比较简单,但是功能也受限,只有一些特殊的场景会使用到,例如收集主机的日志、或者用户开发测试环境。

本节我们先部署一个单机模式的Connector,它持续地从文件中读取最近数据,将其传输至Kafka Topic

image-20220508152206067

运行Filestream connector

创建filestream-connector目录,我们将在这个目录下创建worker的配置文件:

mkdir filestream-connector && cd filestream-connector

创建worker.properties文件:

touch worker.properties

更新内容如下:

# from more information, visit: http://docs.confluent.io/3.2.0/connect/userguide.html#common-worker-configs
bootstrap.servers=127.0.0.1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# we always leave the internal key to JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
# Rest API
rest.port=8086
rest.host.name=127.0.0.1
# this config is only for standalone workers
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000

创建filestream-standalone.properties文件:

touch filestream-standalone.properties

其内容如下:

# These are standard kafka connect parameters, need for ALL connectors
name=filestream-standalone
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# Parameters can be found here: https://github.com/apache/kafka/blob/trunk/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
file=file1.txt
topic=filestream-standalone

创建file1.txt:

touch file1.txt

在当前目录下运行一个docker容器:

docker run --rm -it -v "$(pwd)":/tutorial --net=host landoop/fast-data-dev bash
  • 将当前目录下的三个文件(两个properties文件,一个txt文件)映射到docker的/tutorial目录下

  • 使用host网络,这样能以127.0.0.1访问本机的各个端口。

进入容器内的/tutorial目录:

image-20220508162619932

容器内可以执行kafka相关的命名。我们先把filestream-standalone topic创建好:

kafka-topics --create --topic filestream-standalone --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181 
kafka-topics --zookeeper 127.0.0.1:2181 --list # 查看topic列表

image-20220508162826871

运行standalone模式的connect worker:

# Usage is connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties]
connect-standalone worker.properties filestream-standalone.properties

image-20220508164225562


测试FileStream Connector效果

在本地的file1.txt中插入一些数据行:

image-20220508163050376

file1.txt会被映射到docker镜像中的tutorial/file1.txt,然后被Kafka Connect捕捉到文件的更新,将更新传送至Kafka topic。

访问本地8080端口的Ladoop UI,进入Topic页面:

image-20220508163106073

我们看到filestream-standalone topic中已经有了文件新插入的数据:

image-20220508163150413

我们按Ctrl + C主动停止运行掉connect运行:

image-20220508164620057

此时回到filestream-connector目录, 会发现有一个standalone.offset文件,里面记录了kafka connect实时读取文件的位置:

image-20220508163226305

在配置properties时,有一行offset.storage.file.filename=standalone.offsets 指定了这个文件名称

往file1.txt中新增几行数据:

image-20220508163317973

重新运行connect:

connect-standalone worker.properties filestream-standalone.properties

查看filestream-standalone topic的数据,发现kafka connect仅传入了新添加的数据:

image-20220508163603348

所以,Kafka connect的一大优点是保证生产者和消费者服务的高可用性,重启服务后生产者恢复到之前读取的位置。