Cloud 9默认已经安装好java和maven环境,对于开发十分便捷
在Cloud 9环境下开发Kafka代码,方式有两种:
本节将分别介绍这两种方式
在上一节中我们在IDEA运行了
ProducerDemo.java
的main函数,其实在运行之前IDEA已经执行了build步骤。这时会在target目录下生成对应的
ProducerDemo.class
文件,我们可以将其上传到cloud 9环境并运行:
在cloud 9中新建一个目录(例如java
),选择Upload Local Files
,这样将把IDEA的项目文件上传到此目录下:
选择Select Folder
:
将IDEA的kafka项目根目录上传, 上传后在Cloud 9的目录结构和IDEA项目是一致的:
在根目录(能列出pom.xml
)下执行:
mvn exec:java -Dexec.mainClass="com.kpingfan.kafka.ProducerDemo" # Class根据实际名称做替换
maven首先会安装依赖,并运行main函数,成功地往topic里写入一条消息:
此时如果提前在另一个控制台启动consumer,会实时的看到消息输出:
使用maven命令创建一个新项目:
mkdir kafka-project & cd kafka-project
mvn archetype:generate -DgroupId=com.kpingfan.kafka -DartifactId=kafka-app -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
项目结构如下:
.
└── kafka-app
├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── kpingfan
│ └── kafka
│ └── App.java
└── test
└── java
└── com
└── kpingfan
└── kafka
└── AppTest.java
将properties
和dependency
部分添加到pom.xml
:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kpingfan.kafka</groupId>
<artifactId>kafka-app</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>kafka-app</name>
<url>http://maven.apache.org</url>
<!-- 这一部分声明使用java编译器的版本,一定要添加!-->
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
在src/main/java/com/kpingfan/kafka
目录下新建ProducerDemo.java
, 内容不变:
package com.kpingfan.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
String bootstrapServers = "b-1.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-5.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-6.mskdemo.mxqzz7.c4.kafka.ap-southeast-1.amazonaws.com:9092";
// create Producer properties
// 详细参数见:https://kafka.apache.org/documentation/#producerconfigs
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// create a producer record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("first-topic", "hello world");
// send data - asynchronous
producer.send(record);
// flush data
producer.flush();
// flush and close producer
producer.close();
}
}
进行build:
cd kafka-app
mvn package
build完成后会提示success:
执行程序:
mvn exec:java -Dexec.mainClass="com.kpingfan.kafka.ProducerDemo"
此时如果提前在另一个控制台启动consumer,会实时的看到消息输出:
参考: https://mkyong.com/maven/how-to-create-a-java-project-with-maven/