本节,我们将编写一个Kafka Streams程序,来实时统计topic的消息中每个单词的数量。流程如下图:
步骤1、2: Kafka producer往input-topic
里实时发送消息
步骤3:WordCount Streams App实时获取input-topic
里的消息,并进行统计,计算所有单词出现的次数,并将结果写入output-topic
.
步骤4、5:Kafka Consumer实时获取到output-topic
的计算结果
本节,我们将使用cloud 9完成整个项目的开发及部署。
关于更多Kafka项目开发工具的选择,请参考拙作
使用maven命令创建一个新项目:
mkdir kafka-project & cd kafka-project
mvn archetype:generate -DgroupId=com.kpingfan.kafka -DartifactId=streams-app -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
创建完成后,项目结构如下:
.
└── streams-app
├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── kpingfan
│ └── kafka
│ └── App.java
└── test
└── java
└── com
└── kpingfan
└── kafka
└── AppTest.java
在src/main/java/com/kpingfan/kafka
目录下新建WordCountApp.java
, 代码内容如下:
package com.kpingfan.kafka;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApp {
public static void main(String[] args) throws Exception {
// 需要四个参数:
// input topic:Kafka Streams从里面持续获取原始消息
// output topic: Kafka Streams将实时处理结果写入此topic
// STREAMS_APPLICATION_ID:Kafka Streams 的app id, 用于区分不同任务
// BOOTSTRAP_SERVERS: 用于连接msk
final String STREAMS_INPUT_TOPIC = System.getenv("STREAMS_INPUT_TOPIC");
final String STREAMS_OUTPUT_TOPIC = System.getenv("STREAMS_OUTPUT_TOPIC");
final String STREAMS_APPLICATION_ID = System.getenv("STREAMS_APPLICATION_ID");
final String BOOTSTRAP_SERVERS = System.getenv("BOOTSTRAP_SERVERS");
if (STREAMS_APPLICATION_ID == null || STREAMS_INPUT_TOPIC == null ||
STREAMS_OUTPUT_TOPIC == null || BOOTSTRAP_SERVERS == null) {
throw new Exception("Required STREAMS_APPLICATION_ID, STREAMS_INPUT_TOPIC, " +
"STREAMS_OUTPUT_TOPIC, BOOTSTRAP_SERVERS");
}
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, STREAMS_APPLICATION_ID);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(STREAMS_INPUT_TOPIC);
// 从input topic中获取到字符串后,以空格进行分割,并进行数量统计。
// 例如输入: hello world hello java, 则计算结果为 (hello, 2), (world, 1), (java, 1)
final KTable<String, Long> counts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" "))) // 将输入的字符串以空格分割,得到数组
.groupBy((key, value) -> value) // 对单词进行group by,并统计每个单词的数量
.count();
// 实时结果写入output topic:
counts.toStream().to(STREAMS_OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
更新pom.xml
,内容替换为:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kpingfan.kafka</groupId>
<artifactId>streams-app</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.kpingfan.kafka.WordCountApp</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<!-- 在这里添加依赖-->
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.7.1</version>
</dependency>
</dependencies>
</project>
pom.xml
里声明了一些依赖和编译生成jar文件的规则
进行编译:
mvn clean compile assembly:single
执行成功后,会在当前目录下生成target
文件夹,里面生成了jar文件:
至此我们的Streams App已开发完成。
开篇介绍过WordCount App的运行流程:
Kafka producer往input-topic
里实时发送消息
WordCount Streams App实时获取input-topic
里的消息,并进行统计,计算所有单词出现的次数,并将结果写入output-topic
.
Kafka Consumer实时获取到output-topic
的计算结果
所以,我们先要提前创建好两个topic:
# 获取brokers连接地址
export KAFKA=$(aws kafka get-bootstrap-brokers --cluster-arn arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4 --query BootstrapBrokerString --output text)
# create input and output topics
kafka-topics --bootstrap-server $KAFKA --create --replication-factor 3 --partitions 3 --topic streams-plaintext-input
kafka-topics --bootstrap-server $KAFKA --create --replication-factor 3 --partitions 3 --topic streams-wordcount-output
运行上一步生成的streams-app-1.0-jar-with-dependencies.jar
文件,注意需要四个环境变量:
STREAMS_INPUT_TOPIC=streams-plaintext-input STREAMS_OUTPUT_TOPIC=streams-wordcount-output STREAMS_APPLICATION_ID=streams-app1 BOOTSTRAP_SERVERS=$KAFKA java -jar target/streams-app-1.0-jar-with-dependencies.jar
现在我们就可以启动一个producer,往input topic里写消息。并启动另一个consumer,验证是否能实时获取到WordCount App的计算结果:
# start a Kafka console consumer (新打开一个terminal运行)
kafka-console-consumer --bootstrap-server $KAFKA \
--topic streams-wordcount-output \
--value-deserializer org.apache.kafka.common.serialization.LongDeserializer \
--property print.key=true \
--property key.separator=" "
# start a Kafka console producer (新打开一个terminal运行)
kafka-console-producer --bootstrap-server $KAFKA --topic streams-plaintext-input
使用producer写入几行测试字符串:
在consumer端,可以实时获取到WordCount App计算结果:
注意默认情况下,从发送消息到获取到计算结果可能有20-30s的延迟
上述的Java App只在一台机器上运行,会有单点故障的隐患。我们也可以做高可用部署,在几台机器或容器上同时运行WordCount App。使用EKS + StatefulSet
部署是很好的选择。这样即保证了应用的高可用,也不会影响实时计算结果。