前言

今天一看 Kafka 的版本已经更新到了 3.3.1,不确定自己还认不认识它,就照着 QUICKSTART 快速地实践了一把,还尝试了下之前没有使用过的 Kafka ConnectKafka Streams,记录一下全过程,有兴趣的同学可以参考。

不得不说,现在的 Kafka 看起来真有点实时平台的意思。

下载安装 Kafka

下载 Kafka 压缩包:

cd /tmp/

wget --no-check-certificate  https://dlcdn.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz

解压压缩包:

tar zxvf kafka_2.13-3.3.1.tgz

得到安装包:

kafka_2.13-3.3.1

删除压缩包:

rm -rf kafka_2.13-3.3.1.tgz

移动安装包至 Kafka 安装目录:

mv kafka_2.13-3.3.1 /usr/local/

Kafka 下载安装完成, Kafka 安装目录 /usr/local/kafka_2.13-3.3.1。

配置启动 Kafka

配置 Java:

vim /usr/local/kafka_2.13-3.3.1/bin/kafka-run-class.sh

264 fi
265
266 JAVA_HOME=/usr/local/jdk1.8.0_341
267
268 # Which java to use
269 if [ -z "$JAVA_HOME" ]; then
270   JAVA="java"
271 else
272   JAVA="$JAVA_HOME/bin/java"
273 fi

Java 安装目录:/usr/local/jdk1.8.0_341。

Kafka 支持 ZooKeeperKRaft 两种模式,这里使用 KRaft 模式。

创建集群 UUID:

KAFKA_CLUSTER_ID="$(/usr/local/kafka_2.13-3.3.1/bin/kafka-storage.sh random-uuid)"

查看集群 UUID:

echo $KAFKA_CLUSTER_ID

SsrPi8k6RnmKX8yVKxAvPg

格式化目录:

/usr/local/kafka_2.13-3.3.1/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /usr/local/kafka_2.13-3.3.1/config/kraft/server.properties

Formatting /tmp/kraft-combined-logs with metadata.version 3.3-IV3.

启动 Kafka:

/usr/local/kafka_2.13-3.3.1/bin/kafka-server-start.sh /usr/local/kafka_2.13-3.3.1/config/kraft/server.properties > /dev/null 2>&1 &

查看 Kafka 日志:

tailf /usr/local/kafka_2.13-3.3.1/logs/server.log

[2022-10-13 16:21:33,876] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-13 16:21:33,876] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-13 16:21:33,876] INFO Kafka startTimeMs: 1665649293875 (org.apache.kafka.common.utils.AppInfoParser)
[2022-10-13 16:21:33,878] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

看到 Kafka Server started 表示 Kafka 已成功启动,端口号:9092。

终止 Kafka:ps aux | grep java | grep kafka | grep -v grep | awk '{print $2}' | xargs kill

创建 Kafka Topic

创建 Topic,名称为 quickstart-events:

/usr/local/kafka_2.13-3.3.1/bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

Created topic quickstart-events.

查看 Topic:

/usr/local/kafka_2.13-3.3.1/bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

Topic: quickstart-events    TopicId: 6f0Vc-_cR66g_7PYkFp2vQ    PartitionCount: 1    ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: quickstart-events    Partition: 0    Leader: 1    Replicas: 1    Isr: 1

使用命令行工具写入和读取 Kafka Topic 数据

打开一个命令行窗口,执行写入命令:

/usr/local/kafka_2.13-3.3.1/bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

>

输入任意字符串,然后回车,就会写入一条数据到 Topic quickstart-events 中。

再打开另一个命令行容器,执行读取命令:

/usr/local/kafka_2.13-3.3.1/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

在写入的命令行窗口每输入一行字符串,回去之后,就可以在读取的命令行窗口看到同样的一行字符串。

写入和读取完成之后,可以分别在两个命令行容器执行 Ctrl-C 结束运行。

使用 Kafka Connect 导入和导出文件数据

Kafka Connect 是以插件的形式支持的,Kafka 已内置文件导入和导出的 Connector jar,以及示例的配置文件,我们只需要在此基础上面略做修改即可。

配置 Kafka Connect:

vim /usr/local/kafka_2.13-3.3.1/config/connect-standalone.properties

21 key.converter=org.apache.kafka.connect.storage.StringConverter
22 value.converter=org.apache.kafka.connect.storage.StringConverter

25 key.converter.schemas.enable=false
26 value.converter.schemas.enable=false

41 plugin.path=/usr/local/kafka_2.13-3.3.1/libs/connect-file-3.3.1.jar

创建文件 test.source.txt 和 test.sink.txt:

touch /tmp/test.source.txt
touch /tmp/test.sink.txt

我们需要启动一个 Source Connector,把写入到文件 test.source.txt 的数据导入到 Kafka Topic quickstart-events 中去;再启动一个 Sink Connector,把导入到 Kafka Topic quickstart-events 中的数据导出到文件 test.sink.txt。

配置 Source Connector:

vim /usr/local/kafka_2.13-3.3.1/config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/test.source.txt
topic=quickstart-events

配置 Sink Connector:

vim /usr/local/kafka_2.13-3.3.1/config/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=quickstart-events

使用 standalone 模式启动 Source Connector 和 Sink Connector:

/usr/local/kafka_2.13-3.3.1/bin/connect-standalone.sh /usr/local/kafka_2.13-3.3.1/config/connect-standalone.properties /usr/local/kafka_2.13-3.3.1/config/connect-file-source.properties /usr/local/kafka_2.13-3.3.1/config/connect-file-sink.properties

不断追加写入数据到文件 test.source.txt:

echo "1" >> /tmp/test.source.txt
echo "2" >> /tmp/test.source.txt
echo "3" >> /tmp/test.source.txt

查看文件 test.sink.txt:

tailf /tmp/test.sink.txt

1
2
3

执行 Ctrl-C 结束运行。

使用 Java API 写入和读取 Kafka Topic 数据

添加 Maven 依赖:

<dependency>  
    <groupId>org.apache.kafka</groupId>  
    <artifactId>kafka-clients</artifactId>  
    <version>3.3.1</version>  
</dependency>

写入 Kafka Topic 数据:

package io.github.hodgepodge.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @author yurun
 */
public class ProducerMain {
    public static void main(String[] args) {
        Properties props = new Properties();

        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<>(props);

        int total = 100;
        for (int index = 0; index < total; index++) {
            producer.send(new ProducerRecord<>("quickstart-events",
                    Integer.toString(index),
                    index + "-" + System.currentTimeMillis()));
        }

        producer.close();
    }
}

读取 Kafka Topic 数据:

package io.github.hodgepodge.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author yurun
 */
public class ConsumerMain {
    public static void main(String[] args) {
        Properties props = new Properties();

        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", StringDeserializer.class.getName());
        props.setProperty("value.deserializer", StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("quickstart-events"));

        int index = 0;
        int total = 100;

        while (index++ < total) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset: %d, key: %s, value: %s%n", record.offset(), record.key(), record.value());
            }
        }

        consumer.close();
    }
}

使用 Kafka Streams 处理数据

添加 Maven 依赖:

<dependency>  
    <groupId>org.apache.kafka</groupId>  
    <artifactId>kafka-streams</artifactId>  
    <version>3.3.1</version>  
</dependency>

我们把 Topic quickstart-events 中的数据读取出来,处理之后再写入到 Topic quickstart-events-split 中去,处理逻辑很简单:

把字符串(Value)按分隔符 - 切割之后,取第一个子串。

package io.github.hodgepodge.kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

/**
 * @author yurun
 */
public class StreamMain {
    public static void main(String[] args) {
        Properties props = new Properties();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("quickstart-events");
        source
                .mapValues(value -> value.split("-", -1)[0])
                .to("quickstart-events-split");

        Topology topology = builder.build();

        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            // do nothing
        }

        streams.close();
    }
}

结语

Kafka ConnectKafka Streams 的内容有点多,后续找时间好好研究一下。

results matching ""

    No results matching ""