本章开始我们提到,Kafka Connect的Worker 分为两种模式,单机模式(standalone)和分布式模式
单机模式比较简单,但是功能也受限,只有一些特殊的场景会使用到,例如收集主机的日志、或者用户开发测试环境。
本节我们先部署一个单机模式的Connector,它持续地从文件中读取最近数据,将其传输至Kafka Topic
创建filestream-connector
目录,我们将在这个目录下创建worker的配置文件:
mkdir filestream-connector && cd filestream-connector
创建worker.properties
文件:
touch worker.properties
更新内容如下:
# from more information, visit: http://docs.confluent.io/3.2.0/connect/userguide.html#common-worker-configs
bootstrap.servers=127.0.0.1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# we always leave the internal key to JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
# Rest API
rest.port=8086
rest.host.name=127.0.0.1
# this config is only for standalone workers
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000
创建filestream-standalone.properties
文件:
touch filestream-standalone.properties
其内容如下:
# These are standard kafka connect parameters, need for ALL connectors
name=filestream-standalone
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# Parameters can be found here: https://github.com/apache/kafka/blob/trunk/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
file=file1.txt
topic=filestream-standalone
创建file1.txt
:
touch file1.txt
在当前目录下运行一个docker容器:
docker run --rm -it -v "$(pwd)":/tutorial --net=host landoop/fast-data-dev bash
将当前目录下的三个文件(两个properties文件,一个txt文件)映射到docker的/tutorial
目录下
使用host网络,这样能以127.0.0.1
访问本机的各个端口。
进入容器内的/tutorial
目录:
容器内可以执行kafka相关的命名。我们先把filestream-standalone
topic创建好:
kafka-topics --create --topic filestream-standalone --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181
kafka-topics --zookeeper 127.0.0.1:2181 --list # 查看topic列表
运行standalone模式的connect worker:
# Usage is connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties]
connect-standalone worker.properties filestream-standalone.properties
在本地的file1.txt
中插入一些数据行:
file1.txt
会被映射到docker镜像中的tutorial/file1.txt
,然后被Kafka Connect捕捉到文件的更新,将更新传送至Kafka topic。
访问本地8080端口的Ladoop UI,进入Topic页面:
我们看到filestream-standalone
topic中已经有了文件新插入的数据:
我们按Ctrl + C
主动停止运行掉connect运行:
此时回到filestream-connector
目录, 会发现有一个standalone.offset
文件,里面记录了kafka connect实时读取文件的位置:
在配置properties时,有一行offset.storage.file.filename=standalone.offsets
指定了这个文件名称
往file1.txt中新增几行数据:
重新运行connect:
connect-standalone worker.properties filestream-standalone.properties
查看filestream-standalone
topic的数据,发现kafka connect仅传入了新添加的数据:
所以,Kafka connect的一大优点是保证生产者和消费者服务的高可用性,重启服务后生产者恢复到之前读取的位置。