Kafka安装、部署、测试
一、介绍
Apache Kafka 是一个分布式流处理平台(Distributed Streaming Platform),它最初由 LinkedIn 公司开发,后来贡献给了 Apache 基金会。
简单来说,Kafka 经常被归类为“消息队列”(Message Queue, MQ)的一种,但它的功能和性能远超传统的消息队列。
为了让你更容易理解,我们可以分三个层面来讲:通俗比喻、核心概念、以及它主要用来做什么。
通俗比喻:Kafka 是什么?
你可以把 Kafka 想象成一个巨大的、分布式的、极高性能的“日志记录本”。
- 生产者(Producer):就像是负责写日记的人。不管是网站的点击流、银行的转账记录,还是服务器的报错日志,他们只管往这个本子的末尾疯狂地写数据。
- Kafka 集群(Broker):就是负责保管这些日志本的图书管理员。为了防止本子写满了或者丢失,管理员会把本子撕成几份(分区),甚至复印好几份(副本)存在不同的柜子(服务器)里。
- 消费者(Consumer):就像是来阅读日记的人。
- 传统的信件(传统MQ)通常是“你看完就销毁了”。
- Kafka 的特点是:你看完了,日志还在那里。你可以现在看,也可以明天看,甚至可以让另一个人从头再看一遍。
核心技术概念
如果你是技术人员,以下几个术语是必须掌握的:
- Topic(主题):
- 数据的分类。比如“用户注册数据”是一个 Topic,“订单数据”是另一个 Topic。
- Partition(分区):
- 为了实现高性能,一个 Topic 会被切分成多个 Partition。
- 作用:并发读写。比如一个 Topic 有 3 个分区,理论上就可以同时有 3 个人并行写入,速度提升 3 倍。
- Producer(生产者) & Consumer(消费者):
- 发消息的和收消息的客户端。
- Broker(代理/服务器):
- 运行 Kafka 进程的服务器节点。一个 Kafka 集群由多个 Broker 组成。
- Consumer Group(消费者组):
- 这是 Kafka 的精髓。一个组内有多个消费者,它们配合起来消费一个 Topic。
- 规则:组内的消费者不会重复消费同一条消息(负载均衡)。不同组之间互不干扰(广播模式)。
Kafka 为什么这么火?(特点)
- 高吞吐量(High Throughput):
- 它每秒可以处理百万级的消息。即便是在普通的硬件上,它的性能也非常惊人。
- 原理:使用了顺序读写磁盘(Sequential I/O)和零拷贝(Zero Copy)技术。
- 持久化(Durability):
- 数据存储在磁盘上,不会因为程序挂掉而丢失。你可以设置数据保留 7 天、30 天甚至永久。
- 可扩展性(Scalability):
- 需要处理更多数据?加服务器(Broker)就行,系统会自动重新平衡。
- 解耦(Decoupling):
- 生产者只管发,不需要知道谁在收。这让微服务架构变得非常灵活。
Kafka 主要用来做什么?(应用场景)
- 日志收集系统:
- 这是 Kafka 最初的用途。把成千上万台服务器的日志统一收集起来,发送到 Kafka,然后由后端系统(如 ELK Stack)去分析。
- 用户活动跟踪:
- 记录用户在网站上的每一次点击、浏览、搜索。这些数据量巨大,Kafka 可以轻松抗住,然后推给推荐系统做实时推荐。
- 微服务通信:
- 服务 A 完成了一个任务,发个消息给 Kafka,服务 B 和服务 C 订阅这个消息并分别执行后续操作(比如:用户下单 -> 发消息 -> 库存服务减库存 + 积分服务加积分)。
- 实时流处理:
- 结合 Flink、Spark Streaming 或 Kafka Streams,对数据进行实时的计算(比如实时监控股票价格、实时反欺诈检测)。
二、下载
自行选择版本,本文以3.2.0版本为例,且不使用zookeeper,前面的2.12以及2.13表示scala的版本,后面的3.2.0表示kafka的版本
三、安装
-
解压下载下来的压缩包
tar -zxvf kafka_2.12-3.2.0.tgz如果使用zookeeper进行服务管理,则可跳过这段
-
使用
kafka-storage脚本为集群生成一个id,这个id将在同一个集群中所有实例使用./bin/kafka-storage.sh random-uuid -
格式化存储目录 (注意:如果使用kraft做服务管理的话就要使用config目录下的kraft目录下的
server.properties配置文件)./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties -
如果有多实例,则需要都操作一遍步骤1和步骤2,可自行编写脚本进行处理
-
-
启动 KafkaServer,
-daemon表示在后台启动,不加此参数 KafkaServer 将在控制台关闭的时候停止bin/kafka-server-start.sh <-daemon> config/kraft/server.properties
四、测试
- 创建一个topic(下面指令将创建一个 topic 名称为
foo,分区数(partitions)为1,副本数(replication-factor)为2)bin/kafka-topics.sh --create --topic foo --partitions 1 --replication-factor 2 --bootstrap-server ip:port - 启动消息消费者,用于测试消息是否能成功接收(通过下面的指令启动的控制台消费者将监听topic名称为
foo的主题)bin/kafka-console-consumer.sh --bootstrap-server ip:port --topic foo - 启动消息生产者,配合消息消费者测试消息是否能成功发送(通过下面的指令启动的控制台生产者将向topic名称为
foo的主题发送消息)bin/kafka-console-producer.sh --bootstrap-server ip:port --topic foo