在分布式系统中,消息队列扮演着至关重要的角色。从最初的同步服务调用,到异步消息传递,再到高吞吐量的流式处理,消息队列技术不断演进。接下来将深入探讨下 Apache Kafka——这个在流式数据处理领域占据主导地位的消息队列系统。
https://github.com/apache/kafka
一、消息队列的演进
1. 传通服务调用模式
问题:
• 同步阻塞,响应时间长
• 服务间强耦合
• 单点故障影响整个链路
• 难以扩展
2. 消息队列的引入
优势:
• 异步解耦
• 削峰填谷
• 提高系统可用性
• 支持多种消费模式
二、主流消息队列对比
特性 | RabbitMQ | RocketMQ | Kafka | Pulsar |
---|---|---|---|---|
吞吐量 | 中等 | 高 | 极高 | 高 |
延迟 | 低 | 低 | 中等 | 低 |
可靠性 | 高 | 高 | 高 | 高 |
持久化 | 支持 | 支持 | 支持 | 支持 |
事务 | 支持 | 支持 | 支持 | 支持 |
多租户 | 不支持 | 不支持 | 不支持 | 支持 |
云原生 | 一般 | 一般 | 好 | 优秀 |
为什么选择 Kafka?
- 超高吞吐量:单机可达百万级 TPS
- 低延迟:毫秒级消息传递
- 高可靠性:数据持久化 + 副本机制
- 水平扩展:支持集群部署
- 生态丰富:与大数据生态深度集成
三、站在AKF视角去理解Kafka
AKF 拆分原则的三个核心维度
第一维度(X 轴):无状态服务的水平复制
X 轴拆分是最基础且最容易理解的拆分方式,它基于 “水平复制” 的理念,将整个系统复制多份,每一份都可以独立处理用户请求。在微服务架构中,对于那些无状态的服务,如提供查询功能的 API 服务,完全可以通过简单的水平复制来提升系统的处理能力。例如,一个电商平台的商品查询服务,当用户访问量激增时,可以快速部署多个相同的商品查询服务实例,通过负载均衡器将用户请求均匀分配到各个实例上,从而实现系统吞吐量的线性增长。这种拆分方式不涉及对业务逻辑的修改,只需要增加服务实例的数量,操作简单且成本较低,能够快速应对突发流量。
第二维度(Y 轴):按业务功能和数据类型拆分
Y 轴拆分是基于业务功能和数据类型的拆分。在实际业务中,不同的业务模块有着不同的功能和数据处理逻辑,例如电商系统中的用户管理、订单处理、商品管理等模块。通过 Y 轴拆分,将这些不同的业务功能拆分成独立的微服务,每个微服务专注于处理特定的业务逻辑。同时,根据数据类型的不同,将数据也进行相应的分离存储。比如,用户数据存储在用户数据库中,订单数据存储在订单数据库中。这样做的好处是,每个微服务可以独立进行开发、部署和扩展,降低了服务之间的耦合度,提高了系统的可维护性和可扩展性。当订单业务增长迅速时,可以单独对订单微服务及其相关的数据存储进行优化和扩展,而不会影响到其他业务模块。
第三维度(Z 轴):按用户或请求属性拆分
Z 轴拆分主要是基于用户或请求属性进行拆分,常见的拆分方式包括按地域、用户群体等进行划分。例如,一个全球性的社交平台,可以根据用户所在的地理位置,将用户数据和服务分别部署在不同地区的服务器上,这样可以减少用户请求的响应时间,提高用户体验。或者,根据用户的付费等级、活跃度等属性,将用户划分为不同的群体,为不同群体提供针对性的服务和资源分配。对于高活跃度的付费用户,可以为其分配更多的系统资源,确保他们能够获得更流畅的使用体验,同时也能提高企业的服务质量和用户满意度。
四、 Kafka运行机制
1. 运行主流程图
性能优化
• Producer 直连 Broker,避免 Zookeeper 带宽饱和
• Zookeeper 专注协调,不被业务流量干扰
• 分区并行处理,提高吞吐量
可靠性保障
• 多副本机制保证数据安全
• 故障自动恢复,避免数据丢失
架构清晰
• 职责分离:Zookeeper 专注分布式协调
• 避免业务层操作影响 ZK 集群
高可扩展性
• Topic 分区分布在不同 Broker
• 消费者组并行处理
• 支持水平扩展
2. 幂等与重复消费
五、 Kafka 为什么快
1. Producer的处理
ACK的级别
• ack: -1 product 发送数据,broker leader 需要拿到全部broker回复,未回复默认等待10s,否则将踢出ISR队列到OSR(安全系数最高,相对速度最慢)
• ack: 0 product 只管发送,只要推送到socket就是成功(安全系数最低,速度最快)
• ack: 1 product 发送数据 broker leader 正确回复ack则表示成功(中等)
异步解耦机制
• 消息写入 RecordAccumulator 后,Producer 主流程立即解耦
• 主线程不等待网络发送完成,继续处理下一条消息
• 发送和业务逻辑完全分离
批量发送优化
• 消息按 topic + partition 分组,形成 16KB Batch
• 单条消息超过 batch.size 直接打包,解决内存碎片问题
• 减少网络请求次数,提高网络利用率
智能分区策略
• Partitioner 决定消息发送到哪个 Broker
• 基于 key 的分区算法,负载均衡
• 减少跨节点网络开销
网络 I/O 优化
• NioSelector 非阻塞 I/O 处理
• InFlightRequest 缓存机制,单队列默认 5 个请求
• Producer 选择空闲节点发送,避免阻塞
请求管理机制
• max.request.size 控制单次请求大小
• 超过队列限制时 wait,防止内存溢出
• 响应处理与发送并行进行
序列化优化
• 消息经过拦截器后直接序列化
• 避免重复的序列化操作
• 支持多种高效的序列化算法
2. Broker的处理
SocketManager
- Nio接收单队列中保障消息的有序性
- 多线程消费提高效率(每次消费完毕针对socket 执行 mute操作 提高跨topic消费的均衡性)
LogManager
kafka 采用 Append-Only 方式写入数据 (也是一种零拷方案)
(FileChannel.open(path, StandardOpenOption.APPEND))
优点
1. 磁盘 I/O 效率最大化
机械硬盘(HDD)的随机读写需要频繁移动磁头,性能极低(毫秒级延迟),而顺序写入可减少磁头移动,接近内存级效率(微秒级)。即使是固态硬盘(SSD),顺序写入的吞吐量也远高于随机写入。Kafka 面向高吞吐场景(如日志采集、数据同步),顺序写入能充分利用磁盘性能。
2. 简化数据一致性
保障顺序追加可避免旧数据被覆盖,配合分区副本机制,能简单高效地保证多副本间的数据一致性(只需同步追加顺序即可)。若支持随机写入,会引入复杂的并发控制和冲突解决逻辑,降低系统稳定性。
3. 便于日志分段与清理
Kafka 将分区日志拆分为多个固定大小的日志段(Log Segment)(如 1GB / 段),每个段包含 .log(数据)、.index(偏移量索引)、.timeindex(时间戳索引)文件。顺序写入使分段管理更简单,过期日志段可直接删除(无需复杂的碎片整理),符合 Kafka 的日志保留策略(按时间 / 大小清理)。
index offset 和 timestamp offset 为稀疏索引,当消费者请求某个偏移量(如 offset=1000)的消息时:
1. 先确定该偏移量属于哪个日志段(通过比较各段的 baseOffset)。
2. 在对应段的 .index 文件中,通过二分查找找到小于等于 1000 的最大索引项(如offset=990,位置 pos=5000)。
3. 从 .log 文件的 pos=5000 开始顺序扫描,直到找到 offset=1000 的消息(因索引稀疏,需少量后续扫描)。
kafka的数据寻址为 从index offset中找到position(timestamp 会找到 index offset),然后直接调用操作系统内核的seek(Java中 RandomAccessFile.seek -> native)来跳转到文件中的具体位置,避免像传通IO那样从头开始遍历
- 消费完了后 使用file channel 顺序写入 log(顺序写可以达到600M/s 随机写为100k/s)
- 攒4k 使用 mmap 写入 offset index 和 timestamp index
- 为了避免操作超大的log,kafka采取log segment方式
这里有一个比较细的点,为什么都是写Log,在LogManager中些log使用的是file channel,写index 是 mmap?
📊 FileChannel vs mmap 对比
基本特性对比
特性 | send_file | mmap |
---|---|---|
文件类型 | Log 文件 (.log) | Index 文件 (.index, .timeindex) |
主要操作 | 顺序写入消息数据 | 随机读取索引信息 |
访问模式 | Append-only 顺序写入 | 随机定位查询 |
内存管理 | JVM 堆内存 | 操作系统内存 |
系统调用 | 需要 read/write 调用 | 直接内存访问 |
性能与适用场景
性能指标 | send_file | mmap |
---|---|---|
顺序写入 | 优异 | 一般 |
随机读取 | 一般 | 优异 |
批量操作 | 支持良好 | 支持一般 |
系统开销 | 有上下文切换 | 无上下文切换 |
选择原因 | 写入密集,需要精确刷盘控制 | 读取密集,需要快速随机访问 |
示例:
00000000000000000000.index
00000000000000000000.log
00000000000000368746.index
00000000000000368746.log
00000000000000737492.index
00000000000000737492.log
ReplicaManager
ACK
- 请求如果是 0,只要接收就可以直接返回None
- 请求如果是1,leader 本地写入就可以返回
- 请求如果是-1,leader 需要发送一个delay任务,观测hw >= req offset && isr > minIsrCount 就可以返回
数据同步 - 用这种方式也算是一种变相的mq消费,减少Leader的负荷
- Flower 采用loop方式从leader拉取log,每次发送LEO,拉取之后的全部数据,Leader根据各个Flower的LEO取min来决定HW(木桶原理)
- 同时loop方式还减少了网络同步次数
3. Consumer的处理
Broker -> Consumer 数据零拷贝
零拷贝技术 (Zero-Copy)
• 使用 sendfile 系统调用直接从 page cache 发送到网络
• 减少 1 次 CPU 拷贝和 2 次上下文切换
• 避免数据从内核缓冲区拷贝到用户缓冲区再拷贝到 Socket 缓冲区
Page Cache 机制
• 操作系统将磁盘数据缓存到内存中的 page cache
• 热数据常驻内存,避免重复磁盘 I/O
• 利用操作系统的文件系统缓存优化
批量拉取机制
• Consumer 一次拉取多条消息,减少网络请求次数
• 配置 fetch.min.bytes 和 fetch.max.wait.ms 控制批量大小
• 提高网络利用率和吞吐量
顺序读取优化
• Kafka 日志文件采用顺序写入,读取也是顺序的
• 磁盘顺序 I/O 性能接近内存访问速度
• 避免随机 I/O 的寻道时间开销
Offset 快速定位
• 使用操作系统 seek 操作快速跳转到指定 offset
• 结合 .index 文件进行二分查找定位
• 避免传统 I/O 的逐行扫描
分区并行消费
• 多个 Consumer 实例并行处理不同分区
• 每个分区独立消费,互不干扰
• 充分利用多核 CPU 资源
3. CreateTopic的处理
kafka天生是分布式的,如何应对分区分配规则
尽量避免热点这种分布式痛点的
kafka支持:1,机算,2,人算,3,再均衡
感兴趣的可以看下 https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/admin/AdminUtils.scala
核心算法源码:
/**
* 副本分配算法
*
* 算法目标:
* 1. 将分区副本均匀分布到不同Broker上
* 2. 避免副本在同一个Broker上聚集
* 3. 确保负载均衡和容错性
*
* 核心思想:
* - 第一个副本按顺序轮询分配
* - 后续副本通过偏移量避免聚集
* - 当分区数超过Broker数时,增加偏移量
*/
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
// 确定第一个副本的起始位置
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
// 当前分区ID,从指定位置开始
var currentPartitionId = math.max(0, startPartitionId)
// 副本偏移量,用于避免副本聚集
// 当分区数 > Broker数时,该参数直接决定下一个partition副本位置
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
// 为每个分区分配副本
for (_ <- 0 until nPartitions) {
// 核心判断:当分区数超过Broker数时,每分配完一轮Broker就增加偏移量
// 目的:避免副本在同一个Broker上聚集
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
// 计算第一个副本的位置:(分区ID + 起始索引) % Broker数量
// 实现轮询分配,确保每个Broker的Leader数量均衡
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
// 为后续副本分配位置
for (j <- 0 until replicationFactor - 1) {
// 调用replicaIndex方法计算后续副本位置
// 确保副本分布在不同Broker上
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
}
// 将分区ID和对应的副本Broker列表存入结果
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
/**
* 计算副本位置的辅助方法
*
* 算法逻辑:
* 1. 计算偏移量:shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
* 2. 计算最终位置:(firstReplicaIndex + shift) % nBrokers
*
* 目的:确保后续副本与第一个副本不在同一个Broker上
*/
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
// 计算偏移量,避免与第一个副本在同一Broker
// 使用 (nBrokers - 1) 确保偏移量不会导致重复
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
// 计算最终位置,使用模运算确保在有效范围内
(firstReplicaIndex + shift) % nBrokers
}
六、 Kafka 如何保障数据可靠性
1. 数据的发送
ACK的级别
ack: -1 product 发送数据,broker leader 需要拿到全部broker回复,未回复默认等待10s,否则将踢出ISR队列到OSR(安全系数最高,相对速度最慢)
ack: 0 product 只管发送,只要推送到socket就是成功(安全系数最低,速度最快)
ack: 1 product 发送数据 broker leader 正确回复ack则表示成功(中等)
安全系数最高的为 ack:-1,会触发kafka的ISR同步机制
kafka的数据一致性由ISR来保障CA的,它是一个弹性的数据最终一致性同步机制,对比zk的数据过半更加灵活,当有节点无响应或落后会被提出ISR队列,由剩下ISR节点和minISR配置来决定数据的可靠性
2. 数据的存储与副本出主机
逃出去的才是真正活下来的
单机把数据落盘做到极致也无法保障主机或机房被炸
kafka对于数据落盘默认依赖与操作系统PageCache的脏页机制
log.flush.interval.messages
刷盘间隔 默认Long.MAX_VALUE
log.flush.interval.ms
刷盘时间间隔 默认Long.MAX_VALUE
log.flush.scheduler.interval.ms
刷盘调度间隔 默认Long.MAX_VALUE
3. 数据的接收
Comsumer 的 metadata信息同步与offset,有Broker的Coordinator存储 group消费进度
七、 反思Kafka的架构设计有什么异同
核心设计理念
Kafka 本身是一个消息队列,其核心思想就是实现异步解耦和削峰填谷。然而,当我们深入分析其内部架构时,会发现 Kafka 的每一层设计都巧妙地运用了消息队列的思想,形成了一个"MQ 套 MQ"的精妙架构。
Kafka 内部的多层 MQ 设计(仅限个人理解 强加的MQ理念)
第一层:Producer 内部 MQ
Producer 生产消息 → RecordAccumulator (消息累加器)
• 设计思想:Producer 将消息放入 RecordAccumulator,实现生产线程与发送线程的解耦
• MQ 特性:异步缓冲、批量处理、背压控制
• 优势:主线程不阻塞,支持高并发生产
第二层:网络传输 MQ
RecordAccumulator → Sender 线程 → Socket Buffer → Broker
• 设计思想:Sender 线程从累加器拉取消息,通过 Socket Buffer 发送到 Broker
• MQ 特性:异步发送、请求队列、网络缓冲
• 优势:网络 I/O 与业务逻辑解耦,支持批量发送
第三层:Broker 请求处理 MQ
Socket Buffer → Request Queue → Request Handler 线程池
• 设计思想:Broker 接收请求后先放入 Request Queue,多线程 Request Handler 异步消费处理
• MQ 特性:请求队列、线程池处理、负载均衡
• 优势:网络接收与业务处理解耦,支持高并发请求处理
第四层:Broker 内部 MQ
RequestLog → Page Cache → Log Segment
• 设计思想:消息先写入 Page Cache,再异步刷盘到 Log Segment
• MQ 特性:内存缓冲、异步刷盘、批量写入
• 优势:利用操作系统缓存,提高写入性能
第五层:副本同步 MQ
Leader Log → Follower 异步拉取 → Follower Log
• 设计思想:Follower 异步从 Leader 拉取日志进行同步
• MQ 特性:异步复制、批量同步、故障恢复
• 优势:保证数据可靠性,不影响主流程性能
第六层:Consumer 拉取 MQ
Broker Log → Consumer 拉取 → Consumer 处理
• 设计思想:Consumer 主动从 Broker 拉取消息进行处理
• MQ 特性:拉取模式、批量消费、Offset 管理
• 优势:消费者控制消费速度,支持多种消费模式