前言
今天一看 Kafka 的版本已经更新到了 3.3.1,不确定自己还认不认识它,就照着 QUICKSTART 快速地实践了一把,还尝试了下之前没有使用过的 Kafka Connect 和 Kafka Streams,记录一下全过程,有兴趣的同学可以参考。
不得不说,现在的 Kafka 看起来真有点实时平台的意思。
下载安装 Kafka
下载 Kafka 压缩包:
cd /tmp/
wget --no-check-certificate https://dlcdn.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
解压压缩包:
tar zxvf kafka_2.13-3.3.1.tgz
得到安装包:
kafka_2.13-3.3.1
删除压缩包:
rm -rf kafka_2.13-3.3.1.tgz
移动安装包至 Kafka 安装目录:
mv kafka_2.13-3.3.1 /usr/local/
Kafka 下载安装完成, Kafka 安装目录 /usr/local/kafka_2.13-3.3.1。
配置启动 Kafka
配置 Java:
vim /usr/local/kafka_2.13-3.3.1/bin/kafka-run-class.sh
264 fi
265
266 JAVA_HOME=/usr/local/jdk1.8.0_341
267
268 # Which java to use
269 if [ -z "$JAVA_HOME" ]; then
270 JAVA="java"
271 else
272 JAVA="$JAVA_HOME/bin/java"
273 fi
Java 安装目录:/usr/local/jdk1.8.0_341。
Kafka 支持 ZooKeeper 和 KRaft 两种模式,这里使用 KRaft 模式。
创建集群 UUID:
KAFKA_CLUSTER_ID="$(/usr/local/kafka_2.13-3.3.1/bin/kafka-storage.sh random-uuid)"
查看集群 UUID:
echo $KAFKA_CLUSTER_ID
SsrPi8k6RnmKX8yVKxAvPg
格式化目录:
/usr/local/kafka_2.13-3.3.1/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /usr/local/kafka_2.13-3.3.1/config/kraft/server.properties
Formatting /tmp/kraft-combined-logs with metadata.version 3.3-IV3.
启动 Kafka:
/usr/local/kafka_2.13-3.3.1/bin/kafka-server-start.sh /usr/local/kafka_2.13-3.3.1/config/kraft/server.properties > /dev/null 2>&1 &
查看 Kafka 日志:
tailf /usr/local/kafka_2.13-3.3.1/logs/server.log
[2022-10-13 16:21:33,876] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-13 16:21:33,876] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-13 16:21:33,876] INFO Kafka startTimeMs: 1665649293875 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-13 16:21:33,878] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)
看到 Kafka Server started 表示 Kafka 已成功启动,端口号:9092。
终止 Kafka:ps aux | grep java | grep kafka | grep -v grep | awk '{print $2}' | xargs kill
创建 Kafka Topic
创建 Topic,名称为 quickstart-events:
/usr/local/kafka_2.13-3.3.1/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.
查看 Topic:
/usr/local/kafka_2.13-3.3.1/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: 6f0Vc-_cR66g_7PYkFp2vQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1
使用命令行工具写入和读取 Kafka Topic 数据
打开一个命令行窗口,执行写入命令:
/usr/local/kafka_2.13-3.3.1/bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>
输入任意字符串,然后回车,就会写入一条数据到 Topic quickstart-events 中。
再打开另一个命令行容器,执行读取命令:
/usr/local/kafka_2.13-3.3.1/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
在写入的命令行窗口每输入一行字符串,回去之后,就可以在读取的命令行窗口看到同样的一行字符串。
写入和读取完成之后,可以分别在两个命令行容器执行
Ctrl-C
结束运行。
使用 Kafka Connect 导入和导出文件数据
Kafka Connect 是以插件的形式支持的,Kafka 已内置文件导入和导出的 Connector jar,以及示例的配置文件,我们只需要在此基础上面略做修改即可。
配置 Kafka Connect:
vim /usr/local/kafka_2.13-3.3.1/config/connect-standalone.properties
21 key.converter=org.apache.kafka.connect.storage.StringConverter
22 value.converter=org.apache.kafka.connect.storage.StringConverter
25 key.converter.schemas.enable=false
26 value.converter.schemas.enable=false
41 plugin.path=/usr/local/kafka_2.13-3.3.1/libs/connect-file-3.3.1.jar
创建文件 test.source.txt 和 test.sink.txt:
touch /tmp/test.source.txt
touch /tmp/test.sink.txt
我们需要启动一个 Source Connector,把写入到文件 test.source.txt 的数据导入到 Kafka Topic quickstart-events 中去;再启动一个 Sink Connector,把导入到 Kafka Topic quickstart-events 中的数据导出到文件 test.sink.txt。
配置 Source Connector:
vim /usr/local/kafka_2.13-3.3.1/config/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/test.source.txt
topic=quickstart-events
配置 Sink Connector:
vim /usr/local/kafka_2.13-3.3.1/config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=quickstart-events
使用 standalone 模式启动 Source Connector 和 Sink Connector:
/usr/local/kafka_2.13-3.3.1/bin/connect-standalone.sh /usr/local/kafka_2.13-3.3.1/config/connect-standalone.properties /usr/local/kafka_2.13-3.3.1/config/connect-file-source.properties /usr/local/kafka_2.13-3.3.1/config/connect-file-sink.properties
不断追加写入数据到文件 test.source.txt:
echo "1" >> /tmp/test.source.txt
echo "2" >> /tmp/test.source.txt
echo "3" >> /tmp/test.source.txt
查看文件 test.sink.txt:
tailf /tmp/test.sink.txt
1
2
3
执行
Ctrl-C
结束运行。
使用 Java API 写入和读取 Kafka Topic 数据
添加 Maven 依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
写入 Kafka Topic 数据:
package io.github.hodgepodge.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author yurun
*/
public class ProducerMain {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("linger.ms", 1);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(props);
int total = 100;
for (int index = 0; index < total; index++) {
producer.send(new ProducerRecord<>("quickstart-events",
Integer.toString(index),
index + "-" + System.currentTimeMillis()));
}
producer.close();
}
}
读取 Kafka Topic 数据:
package io.github.hodgepodge.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @author yurun
*/
public class ConsumerMain {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("quickstart-events"));
int index = 0;
int total = 100;
while (index++ < total) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset: %d, key: %s, value: %s%n", record.offset(), record.key(), record.value());
}
}
consumer.close();
}
}
使用 Kafka Streams 处理数据
添加 Maven 依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.3.1</version>
</dependency>
我们把 Topic quickstart-events 中的数据读取出来,处理之后再写入到 Topic quickstart-events-split 中去,处理逻辑很简单:
把字符串(Value)按分隔符 - 切割之后,取第一个子串。
package io.github.hodgepodge.kafka;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
/**
* @author yurun
*/
public class StreamMain {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("quickstart-events");
source
.mapValues(value -> value.split("-", -1)[0])
.to("quickstart-events-split");
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
// do nothing
}
streams.close();
}
}
结语
Kafka Connect 和 Kafka Streams 的内容有点多,后续找时间好好研究一下。