https://www.oracle.com/java/technologies/downloads/#java17
根据本机环境下载软件包进行安装即可:
安装完成后,执行java -version
确认成功:
❯ java -version
java version "17.0.2" 2022-01-18 LTS
Java(TM) SE Runtime Environment (build 17.0.2+8-LTS-86)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.2+8-LTS-86, mixed mode, sharing)
安装IDEA
社区版本下载地址: https://www.jetbrains.com/idea/download/#section=mac
安装好IDEA后,打开并创建新项目。选择maven项目(因为后面要用到kafka-client
依赖):
根据个人情况配置项目:
创建完成后,在根目录找到pom.xml
文件:
我们需要要在这个xml中添加依赖,在 https://mvnrepository.com/artifact/org.apache.kafka 搜索kafka的maven的依赖:
打开后,根据Kafka的版本选择依赖:
将上面这段复制下来,同slf4j
一起添加到pom.xml
(slf4j
是kafka-client依赖的包)。pom.xml
内容最终如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka-learn</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<!-- 在这里添加依赖-->
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
打开maven面板,点击刷新按钮引入依赖:
在src目录下新建包名(例如com.kpingfan.kafka
)和java文件(ProducerDemo.java
):
ProducerDemo.java
内容:
package com.kpingfan.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
String bootstrapServers = "b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-5.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-6.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092"; // 根据Kafka broker地址做替换
// create Producer properties
// 详细参数见:https://kafka.apache.org/documentation/#producerconfigs
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// create a producer record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("first-topic", "hello world"); // 根据topic名称做替换
// send data - asynchronous
producer.send(record);
// flush data
producer.flush();
// flush and close producer
producer.close();
}
}
运行main函数:
由于本机与Kafka Broker地址不通,所以此消息不会发送成功(可以在本地电脑搭建Kafka并进行测试)。但在控制台能看到相应的提示,这证明代码本身的依赖是满足的