WordCount示例应用

本节,我们将编写一个Kafka Streams程序,来实时统计topic的消息中每个单词的数量。流程如下图:

image-20220206125105281

步骤1、2: Kafka producer往input-topic里实时发送消息

步骤3:WordCount Streams App实时获取input-topic里的消息,并进行统计,计算所有单词出现的次数,并将结果写入output-topic.

步骤4、5:Kafka Consumer实时获取到output-topic的计算结果

创建Streams App项目

本节,我们将使用cloud 9完成整个项目的开发及部署。
关于更多Kafka项目开发工具的选择,请参考拙作

使用maven命令创建一个新项目:

mkdir kafka-project & cd kafka-project

mvn archetype:generate -DgroupId=com.kpingfan.kafka -DartifactId=streams-app -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

创建完成后,项目结构如下:

.
└── streams-app
    ├── pom.xml
    └── src
        ├── main
        │   └── java
        │       └── com
        │           └── kpingfan
        │               └── kafka
        │                   └── App.java
        └── test
            └── java
                └── com
                    └── kpingfan
                        └── kafka
                            └── AppTest.java

src/main/java/com/kpingfan/kafka目录下新建WordCountApp.java, 代码内容如下:

package com.kpingfan.kafka;


import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Properties;

public class WordCountApp {

    public static void main(String[] args) throws Exception {
        // 需要四个参数:
        // input topic:Kafka Streams从里面持续获取原始消息
        // output topic: Kafka Streams将实时处理结果写入此topic
        // STREAMS_APPLICATION_ID:Kafka Streams 的app id, 用于区分不同任务
        // BOOTSTRAP_SERVERS: 用于连接msk
        final String STREAMS_INPUT_TOPIC = System.getenv("STREAMS_INPUT_TOPIC");
        final String STREAMS_OUTPUT_TOPIC = System.getenv("STREAMS_OUTPUT_TOPIC");
        final String STREAMS_APPLICATION_ID = System.getenv("STREAMS_APPLICATION_ID");
        final String BOOTSTRAP_SERVERS = System.getenv("BOOTSTRAP_SERVERS");

        if (STREAMS_APPLICATION_ID == null || STREAMS_INPUT_TOPIC == null ||
                STREAMS_OUTPUT_TOPIC == null || BOOTSTRAP_SERVERS == null) {
            throw new Exception("Required STREAMS_APPLICATION_ID, STREAMS_INPUT_TOPIC, " +
                    "STREAMS_OUTPUT_TOPIC, BOOTSTRAP_SERVERS");
        }

        final Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, STREAMS_APPLICATION_ID);
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream(STREAMS_INPUT_TOPIC);

        // 从input topic中获取到字符串后,以空格进行分割,并进行数量统计。
        // 例如输入: hello world hello java, 则计算结果为 (hello, 2), (world, 1), (java, 1)
        final KTable<String, Long> counts = source
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))  // 将输入的字符串以空格分割,得到数组
                .groupBy((key, value) -> value)  // 对单词进行group by,并统计每个单词的数量
                .count();
        
        // 实时结果写入output topic:
        counts.toStream().to(STREAMS_OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start();
    }
}

更新pom.xml,内容替换为:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.kpingfan.kafka</groupId>
    <artifactId>streams-app</artifactId>
    <version>1.0</version>
    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
    
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.kpingfan.kafka.WordCountApp</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>

<!--    在这里添加依赖-->
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.7.1</version>
        </dependency>

    </dependencies>
</project>

pom.xml里声明了一些依赖和编译生成jar文件的规则

进行编译:

mvn clean compile assembly:single 

执行成功后,会在当前目录下生成target文件夹,里面生成了jar文件:

image-20220206130321818

至此我们的Streams App已开发完成。

运行Streams App并测试

开篇介绍过WordCount App的运行流程:

  1. Kafka producer往input-topic里实时发送消息

  2. WordCount Streams App实时获取input-topic里的消息,并进行统计,计算所有单词出现的次数,并将结果写入output-topic.

  3. Kafka Consumer实时获取到output-topic的计算结果

所以,我们先要提前创建好两个topic:

# 获取brokers连接地址
export KAFKA=$(aws kafka get-bootstrap-brokers --cluster-arn arn:aws:kafka:ap-southeast-1:145197526627:cluster/MSKDemo/89d04308-2643-4e80-b6e2-fe996354f056-4 --query BootstrapBrokerString --output text)


# create input and output topics
kafka-topics --bootstrap-server $KAFKA --create --replication-factor 3 --partitions 3 --topic streams-plaintext-input 
kafka-topics --bootstrap-server $KAFKA --create --replication-factor 3 --partitions 3 --topic streams-wordcount-output

运行上一步生成的streams-app-1.0-jar-with-dependencies.jar文件,注意需要四个环境变量:

STREAMS_INPUT_TOPIC=streams-plaintext-input STREAMS_OUTPUT_TOPIC=streams-wordcount-output STREAMS_APPLICATION_ID=streams-app1 BOOTSTRAP_SERVERS=$KAFKA java -jar target/streams-app-1.0-jar-with-dependencies.jar 

image-20220206124226133

现在我们就可以启动一个producer,往input topic里写消息。并启动另一个consumer,验证是否能实时获取到WordCount App的计算结果:

# start a Kafka console consumer (新打开一个terminal运行)
kafka-console-consumer --bootstrap-server $KAFKA \
    --topic streams-wordcount-output \
    --value-deserializer org.apache.kafka.common.serialization.LongDeserializer \
    --property print.key=true  \
    --property key.separator=" " 

# start a Kafka console producer (新打开一个terminal运行)
kafka-console-producer --bootstrap-server $KAFKA --topic streams-plaintext-input 

使用producer写入几行测试字符串:

image-20220206124802952

在consumer端,可以实时获取到WordCount App计算结果:

image-20220206124814366

注意默认情况下,从发送消息到获取到计算结果可能有20-30s的延迟

高可用部署

上述的Java App只在一台机器上运行,会有单点故障的隐患。我们也可以做高可用部署,在几台机器或容器上同时运行WordCount App。使用EKS + StatefulSet部署是很好的选择。这样即保证了应用的高可用,也不会影响实时计算结果。