消息的生产与消费流程

消息的生产与消费流程 #

消息生产 #

Producer生产消息
可以指定消息的主题、Key、Value、时间戳、分区策略、可靠性参数
此外客户端还需要配置序列化器、性能优化参数等

分区策略 #

默认分区 #

  • Key != null:对Key进行Hash取模确定分区
  • Key == null:轮询(RoundRobin)分配分区

自定义分区器 #

通过实现 Partitioner 接口自定义路由逻辑

    /**
     * 自定义分区器,确保同一用户的消息总是发送到同一分区
     * 通过对用户ID进行哈希计算,确定分区号
     */
    private static class UserIdPartitioner implements org.apache.kafka.clients.producer.Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, org.apache.kafka.common.Cluster cluster) {
            // 获取主题的分区数
            int partitionCount = cluster.partitionsForTopic(topic).size();
            if (partitionCount <= 0) {
                return 0;
            }
            
            // 如果key为null,使用轮询方式分配分区
            if (key == null) {
                return Utils.toPositive(Utils.murmur2(valueBytes)) % partitionCount;
            }
            
            // 对用户ID进行哈希,确保同一用户的消息总是发送到同一分区
            return Utils.toPositive(Utils.murmur2(keyBytes)) % partitionCount;
        }

        @Override
        public void close() {
            // 无需关闭资源
        }

        @Override
        public void configure(Map<String, ?> configs) {
            // 无需配置
        }
    }

可靠性参数 #

acks #

  • 0:无需确认(可能丢数据)。
  • 1:Leader 写入成功即确认(默认)。
  • -1/all:ISR 所有副本同步成功才确认(最强保证)。

retries #

发送失败时的重试次数。

enable.idempotence #

开启幂等性,避免重试导致的消息重复

transactional.id #

开启事务,保证跨分区、跨 Topic 的原子性

性能优化参数 #

batch.size #

批量发送的字节数阈值,达到阈值时触发发送。

linger.ms #

批量发送的等待时间,即使未达到 batch.size 也会发送。
指定生产者在发送批量消息前等待的时间,当设置此参数后,即便没有达到批量消息的指定大小,到达时间后生产者也会发送批量消息到 broker

compression.type #

消息压缩类型(none/gzip/snappy/lz4/zstd)

max.request.size #

单个请求的最大字节数,防止超大消息。

其他配置 #

序列化器 #

org.apache.kafka.common.serialization.StringSerializer 等

拦截器 #

实现 ProducerInterceptor 接口,在消息发送前后插入自定义逻辑(如添加 TraceID)

连接参数 #

  • bootstrap.servers:Kafka Broker 地址列表(无需全部,用于发现集群)。
  • connections.max.idle.ms:空闲连接的超时时间。

消息存储和分布 #

分区 #

Kafka的分区指的是一个主题的一个分区
主题可以划分为多个分区,每个分区是一个有序的,不可变的消息序列
收到生产者的消息,最终只会被路由到一个分区进行存放
即一条消息,必须属于且仅属于一个分区

副本 #

Kafka使用副本机制为分区提供数据备份存储,实现数据冗余和高可用
每个分区可能会有一个Leader副本和多个Follower副本,具体取决于设置的replicas值
Leader副本负责处理所有读写请求,多个Follower副本会定期从Leader副本拉取数据备份,保证数据一致性

ISR(In-Sync Replicas) #

指的是处于同步状态的副本集合,仅包括那些同步进度跟上Leader的副本

故障切换 #

Kafka集群有一个控制器节点,负责管理副本生命周期和处理分区领导者选举
当Leader副本不可用时,控制器会在ISR中选举新的Leader副本,保证分区高可用

下线判断 #

  1. Follower
    Follower 向 Leader 发送 FetchRequest 后,若在 replica.lag.time.max.ms(默认 10 秒)内未收到响应,会认为 Leader 可能下线,并触发 重新获取元数据(MetadataRequest)操作,确认 Leader 是否仍为当前节点

  2. Leader
    Leader 维护一个 副本列表(ReplicaManager),记录每个 Follower 的最近心跳时间戳。若某个 Follower 的心跳间隔超过 replica.lag.time.max.ms,Leader 会将其从 ISR(In-Sync Replicas) 中移除(但不会立即触发选举,仅标记为 “不同步”)

  3. 控制器

  • ZooKeeper 模式:
    控制器通过监听 ZooKeeper 中 /brokers/ids/[brokerId] 节点的删除事件,判断 Broker 是否下线。若 Leader 所在 Broker 下线,控制器会触发分区的领导者选举。
  • KRaft 模式:
    控制器通过 Raft 协议的 心跳机制(如 AppendEntries 请求)检测节点存活状态。若 Leader 节点在 election.timeout.ms(默认 100-500 毫秒)内未响应心跳,其他节点会发起 Raft 选举,重新选出控制器和分区领导者。

选举流程 #

步骤 1:控制器发起选举请求 #

  • ZooKeeper 模式:
    控制器从 ZooKeeper 的 /controller 节点获取当前分区的 ISR 列表,优先从 ISR 中选择 LEO(Log End Offset)最大的副本 作为新 Leader(选择最多数据的副本作为新的Leader)。
    • 通信逻辑:控制器通过 PartitionStateMachine 调用各 Broker 的 updateMetadata 接口,通知新 Leader 信息。
  • KRaft 模式:
    控制器通过 Raft 协议的 RequestVote 和 AppendEntries 消息协调选举,直接与分区的副本节点通信,确认候选者的 EpochLog Offset 是否符合条件(需满足 “多数派原则”“日志完整性检查”)。

多数派原则
Kafka 使用多数派原则来保证数据的一致性和可用性
在分布式系统中,多数派是指超过一半的节点或副本
生产者发送消息到 Kafka 集群时,只有当消息被写入到多数派副本中,才会被认为是成功提交。
这样即使部分副本所在节点出现故障,只要多数派副本可用,数据就不会丢失,也能保证数据的一致性。
当需要选举新的领导者时,也会基于多数派原则,只有在多数派副本中的副本才有资格被选举为新的领导者,以确保新领导者拥有最新的数据(肯定也是数据最新最多的副本)

日志完整性检查
日志完整性检查是 Kafka 用于确保分区日志数据完整性和一致性的机制
Kafka 中的数据是以日志的形式存储在分区中的,每个分区由多个日志段组成

  • 数据写入时:生产者发送消息到 Kafka 集群,Kafka 会将消息追加到分区的日志中。在写入过程中,会对消息进行校验和验证,确保消息的格式正确、大小符合限制等。同时,Kafka 会为每条消息分配一个唯一的偏移量(offset),用于标识消息在分区中的位置和顺序1。
  • 副本同步时: follower 副本会从 leader 副本拉取消息并写入自己的日志中。在这个过程中,会进行日志完整性检查,确保拉取到的消息与 leader 副本上的消息一致。如果发现不一致,会根据一定的策略进行处理,如重新拉取消息或标记副本为不同步状态。
  • 节点故障恢复时:当 broker 节点发生故障重启后,会对本地存储的分区日志进行完整性检查。它会检查日志文件的格式是否正确、偏移量是否连续、是否存在损坏的消息等。如果发现问题,会尝试通过与其他正常副本进行数据同步来修复日志

步骤 2:副本节点的响应与状态变更 #

控制器直接告诉某个节点被选中了,然后发送消息告知。被选中的是候选者,没被选中的继续当Follower

  • 候选者(Candidate):
    副本节点收到选举请求后,会对比自身的 日志偏移量(Log Offset)领导者纪元(Leader Epoch)(领导者版本)
    • 若自身 Log Offset 大于等于其他候选者,且处于 ISR 中(ZooKeeper 模式),则返回 同意投票(Vote Grant),并将自己状态切换为 Leader
    • 若不符合条件(如 Log 落后),则返回 拒绝投票(Vote Deny),并保持 Follower 状态。
  • 非候选者(Follower): 节点收到新 Leader 的元数据更新后,会通过 MetadataRequest 刷新本地缓存,并向新 Leader 发送 FetchRequest 建立心跳连接,开始同步数据。 步骤 3:选举结果的全局广播
  • 控制器通过 UpdateMetadataRequest 协议向所有 Broker 广播新 Leader 信息,包含:
    • 分区的新 Leader 节点 ID、地址。
    • 新的 ISR 列表(可能因选举调整)。
  • Broker 接收到请求后,更新本地的 分区元数据缓存,并通知消费者和生产者路由变更

脑裂问题 #

脑裂是指部分节点失去与集群之间的网络连接,或者某个节点因为故障与集群断开连接后无法恢复。从而形成多个子集群,各自都认为自己是主集群,导致数据不一致或服务异常

Kafka的脑裂可能的形式 #

分区选举导致

  1. Leader节点与其他节点之间出现网络分区,其他节点无法与Leader通信,于是选举出新Leader
  2. 旧Leader仍然认为自己是Leader,网络恢复后,出现新旧Leader并存的情况,导致生产者向两个 Leader 写入数据,引发日志不一致 Zookeeper分区导致
    早期版本的 Kafka 依赖 ZooKeeper 存储分区 Leader 信息,若 ZooKeeper 集群出现短暂分区,不同 Broker 可能读取到不同的 Leader 元数据,导致对 “谁是当前 Leader” 的认知不一致

Kafka避免脑裂 #

新旧Leader并存:(领导者版本判断)
新 Leader 当选后,Epoch 递增,旧 Leader 收到请求时若发现自身 Epoch 小于当前 Epoch,会拒绝处理并返回 NotLeaderException

底层高效读写 #

架构层优化 #

  1. 分区机制
    一个主题的分为多个分区,多个分区分散在不同Broker上
    写入消息可以在多个Broker上并行写入
    消费者按分区消费,容易进行负载均衡,避免单一分区读压力过大
  2. 副本机制
    分区有个副本,进行读写分流(2.4版本以前不支持,所有读写都由Leader副本承担)

存储层优化 #

  1. 顺序写盘
    Kafka消息是追加式写入,追加到分区日志文件末尾,避免随机写
    顺序写盘速度可达600MB/s以上,远高于随机写的100MB/s
  2. 页缓存
    Kafka消息日志直接使用操作系统页缓存,无需额外JVM堆缓存
    写入时写入页缓存,操作系统异步刷盘
    读取时先找页缓存获取数据,避免频繁磁盘IO
  3. 日志分段
    每个分区的日志文件还会按大小或时间切分为多个Segment文件(如1GB/段)
    可以方便删除老数据,直接删除Segment文件
    索引文件(.index)和数据文件(.log)分离,可以加速消息定位

网络传输优化 #

  1. 零拷贝
    Kafka与网卡之间的数据传输也使用零拷贝
    传统拷贝:磁盘 - 内核缓存 - 用户空间 - 套接字缓存 - 网卡
    零拷贝:磁盘 - 内核缓存 - 网卡(跳过用户空间)
    减少2次上下文切换和3次数据拷贝
  2. 批量处理压缩
    生产者消息是批量发送的,减少网络请求次数
    消息可以压缩,支持GZIP、Snappy、LZ4等压缩算法
    压缩消息减少网络带宽,提升磁盘存储效率

索引查找优化 #

  1. 稀疏索引
    Kafka给每个日志段维护偏移量索引(.index)和时间戳索引(.timeindex)
  • 偏移量索引:记录消息在日志文件内的偏移量,每间隔4KB日志文件建立一个索引项(不精确稠密的稀疏索引)
  • 时间戳索引:映射时间戳到偏移量,便于按时间范围查询
  1. 二分查找
    先通过二分查找在索引文件找到最近的索引
    再从索引的位置开始顺序扫描日志文件,找到目标消息

综论 #

Kafka 通过 分布式分区 实现并行处理,利用 顺序写磁盘 和 零拷贝技术 优化 I/O 性能,结合 批量压缩 和 页缓存 大幅提升吞吐量
这种设计使其特别适合大数据场景下的实时数据流处理,成为业界高吞吐量消息系统的首选方案

消息消费 #

Consumer 订阅消息并消费消息,每个Comsumer都会属于一个 Comsumer-Group
采用不同的消息拉取模式获取主题内的消息
消费还有许多需要的场景,基于Kafka或业务搭配设计来实现

消费者组 #

Consumer-Group
Consumer-Group是一组消费者,共同协作消费一个或多个主题中的消息
每个Consumer-Group有一个唯一标识符
属于同一组的消费者会协同工作,保证一个组内消费者只会对每条消息消费一次
具体来说

  1. 每个Consumer-Group的每个消费者独立消费不同分区的数据,一个分区只能被一个消费者消费
  2. 即使多个消费者在一个组消费同一个Topic,Kafka也会确保每条消息只会被组内其中一个消费者处理,极大提升并发性和处理速度,保证消息的高效处理
  3. Consumer-Group可以实现负载均衡。当有新的消费者加入或者离开组,Kafka会自动均衡分区消费,将需要消费的分区重新分配给现存消费者

分区分配策略 #

每个消费者组会得到全量数据
由消费者组内的消费者实例负载均衡的消费,具体是按分区消费

  • 分区肯定要整个整个的交给消费者
    如果消费者组内消费者数量小于分区数量时,依赖分区分配策略而定
    如果分区数量小于组内消费者数量,就会有一部分消费者处于空闲状态,什么消息都拿不到
  1. Range 范围分配策略
    Kafka默认的分配策略
    首先,将分区按照序号排列,然后计算每个消费者平均分配到的分区数量n = 分区数量 / 消费者数量,以及剩余的分区数量m = 分区数量 % 消费者数量。
    每个消费者会消费n个分区,然后再将剩余分区给前m个消费者,直到分完
    前m个消费者会多消费一个分区,即前m个消费者每个消费n+1个分区,剩余消费者每个消费n个分区
    例如,有 10 个分区和 3 个消费者,那么n=3,m=1,则消费者C1消费 4 个分区,C2和C3各消费 3 个分区。需要注意的是,Range 范围分配策略是针对每个 Topic 的
  2. RoundRobin 轮询策略
    将消费组内所有消费者以及消费者所订阅的所有 Topic 的 Partition 按照字典序排序(Topic 和分区的 hashcode 进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者
    比如有三个消费者C1、C2、C3,会被排序后组成环状结构
    然后有主题T1(三个分区P1、P2、P3)、主题T2(四个分区P1、P2、P3、P4),主题T3(两个分区P1、P2)。分区按顺序排列好。然后轮询将分区逐个交给消费者。
  3. Sticky 粘性分配策略
    从 Kafka 0.11.x 开始引入。
    该策略主要有两个目的
  • 一是使分区分配尽可能均匀
  • 二是在发生 Rebalance 的时候,分区的分配尽可能与上一次分配保持相同
    在没有发生 Rebalance 时,Sticky 粘性分配策略和 RoundRobin 分配策略类似。当发生 Rebalance 时,例如某个消费者崩溃了,该策略会尽量保留之前的分配结果,只将原本由崩溃消费者负责的分区再均匀分配给其他消费者,这样可以减少系统资源的浪费

广播消息 #

比较常见的消费模式
当一个主题下,消费者不满足仅获取某个分区的消息,而是希望每条消息每个消费者都能消费
可以考虑:

  1. 单分区主题 + 每个消费者一个消费者组
    首先让全量消息都放在一个分区内,还能保证顺序
    然后每个消费者组都能获取全量数据,从而让每个消费者都能获取全量数据
  2. 多分区主题 + 每个消费者一个消费者组
    消息可以分区存放
    但所有分区都会被分配给组内唯一的消费者,使得各个消费者都能获取全量数据,但是可能多分区无顺序
  3. 辨析
    多分区本来就是为了负载均衡存在的,包括生产者写入的负载均衡和消费者消费的负载均衡
    这种需求基本没有消费者的负载均衡,所以每个消费者一个消费者组
    如果生产者不一,且高并发,则使用多分区存放消息。否则单分区即可

消息拉取模式 #

Kafka的消费者消息获取,主要基于Pull拉取模式,而非传统的Push推送模式

  1. Push模式的问题
    Push模式是Broker将生产者消息推送到消费者
    如果消费者消费速度慢,Push会导致消费者缓冲区溢出或系统负载过高(导致背压问题)
    而Broker难以根据消费者状态动态调整推送速率
  2. Pull模式原理
    消费者主动从Broker拉取消息
// 消费者拉取消息的核心代码
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息
        System.out.printf("offset = %d, key = %s, value = %s%n", 
                          record.offset(), record.key(), record.value());
    }
    // 手动提交偏移量
    consumer.commitSync();
}

关键机制:

  1. 消费者主动拉取
    通过poll方法定期从Broker拉取消息,拉取频率和批量大小可配置
  2. 基于offset定位
    消费者维护消费偏移量offset,可以随时指定位置拉取(比如从头消费、从特定位置恢复)
  3. 长轮询
  • 如果Broker没有新消息,poll不会立即返回,而是等待一小段时间如500ms,避免Broker频繁处理空请求
  • 通过 fetch.min.bytes和fetch.max.wait.ms 参数可以控制长轮询行为
  1. 批量传输
    Broker批量返回数据,提升吞吐量,减少网络开销

Pull模式的优势:

  1. 消费者自主控制拉取频率和批量大小,避免过载
    背压消息被存储到Kafka的磁盘中缓冲等待消费,而磁盘很便宜
  2. 故障恢复灵活
    消费者可以通过重置 offset 重新消费历史消息(如系统故障后恢复)
  3. 高效利用网络带宽
    批量拉取和长轮询,减少空请求,优化网络利用率
  4. 支持多消费模式
    每个消费者组的消费者都能自行独立控制消费进度,互不影响

如何保证消费顺序性 #

消息存储的顺序性 #

数据存储时自然在分区内使用Offset保证了消息的顺序性
但多分区的顺序性无法保证
可以通过在生产时指定Key,使得相同Key的消息被路由到同一个分区存储,从而保证顺序性
Key可以设置成业务含以上对消息的分组,如同一用户同一订单的消息等
从而使得业务含义上需要组顺序的消息保证顺序存储

消息消费的顺序性 #

分区的消息会整个交给一个消费者消息
所以消费者应该单线程消费,以保证消费顺序性(禁止并行并发消费)
避免并发消费导致顺序混乱
对顺序不敏感的分区,允许并行消费,通过业务层去重或排序补偿

如何避免重复消费(幂等性) #

生产者消息提交幂等 #

生产者有消息重试发送的机制,需要确保重试发送消息不产生重复消息
生产者实例在启动时,Kafka会分配唯一PID。重启后变更、但同一实例的PID唯一固定
每个PID对每个分区会维护一个递增序列号,发送消息会附带PID和序列号
重试发送消息时,如果发现有相同PID+分区+序列号的消息,则忽略重复请求

# 在生产者配置中启用幂等性(默认 false)
producer.properties:
  enable.idempotence = true
  • 仅保证 单会话内的幂等性(生产者重启后 PID 变更,无法避免跨会话重复)。
  • 仅作用于 单分区内的消息,无法保证跨分区或跨主题的事务性

消费者消费幂等 #

消费者通过提交偏移量标记已经消费的消息位置,避免消费到已经消费的消息
所以需要谨慎提交偏移量,以免拉取到重复消息
应关闭自动提交,改为处理完消息后手动同步提交

props.put("enable.auto.commit", "false");
// 处理完消息后提交
consumer.commitSync();

业务层幂等 #

如果Kafka无法避免重复消息(比如提交偏移量前宕机),通过业务设计实现消费幂等

  1. 消息携带唯一标识,如订单号、消息ID、请求ID等,Redis高效缓存一定时间的消息ID,重复消息忽略
  2. 状态机校验:对于状态业务,仅允许按一定顺序变更状态(创建-支付-完成),拒绝重复操作和无效操作
  3. 数据库唯一约束(类似Redis缓存)
    重复消息的写入操作被数据库丢弃或报错失效

如何实现事务消息 #

Kafka的事务性消息,指的是生产者原子性的写入,是生产者端的写入保障机制
事务的成功意味着消息被可靠的提交到Kafka集群,而消费者是否正确消费需要应用自行处理(幂等、重试等)

消息清理策略 #

每个主题多个分区,每个分区物理上存储为多个日志文件序列
可以按时间或大小分段存储
分为:多个固定大小或时间范围的分段文件(.log),以及每个日志分段对应各自的偏移量索引文件(.index)和时间戳索引文件(.timeindex)
便于快速删除过期分段、避免全量扫描

保留策略 #

可以配置保留时间或保留大小自动清理消息,支持以下两种策略(可以同时配置,满足任意一个即可触发清理)

  1. 按时间保留(默认策略)
    通过log.retention.hours配置,默认168小时=7天
    可以细分为log.retention.minutes或log.retention.ms
    可以在主题级别覆盖全局配置
    清理逻辑:定期扫描分区日志分段,删除早于保留时间的分段
  2. 按大小保留
    通过log.retention.bytes配置,默认-1表示不限制
    按分区总大小控制,当分区数据超过阈值,删除最旧的分段直到满足大小要求
    优先级比时间保留低,仅在空间紧张时作为补充
    可以与时间策略结合使用,避免单个分区无限增长
    注意:清理的删除是永久删除,需要确保消费端已经消费处理

日志压缩 #

适用于键值对场景(应该很少用),保留同一键的最新值,清理旧数据,确保仅存储最新状态
设置:
cleanup.policy=compact(创建主题时设置,或通过 alter topic 修改)
min.compcation.lag.ms 消息在日志中保留的最小时间(避免频发压缩)
工作原理

  1. 后台线程定期扫描分区,按消息Key分组
  2. 对每个键,仅保留最新Offset的消息,删除旧版本
  3. 压缩后的分段文件名 clean-xxx.log,同步更新索引文件 注意:
    压缩会破坏分区的全局顺序,同一键的消息顺序保留

清理触发机制 #

通过后台线程定期执行清理任务

  1. 定时清理
    每个Broker启动时创建Log Clean线程,默认每30秒检查一次是否需要清理

综论 #

生产者生产的一条消息会被Kafka路由到一个分区存放,各个分区内的消息不会重复,单一分区内的消息有序
每个消费者组会得到全量数据,由消费者组内的消费者实例负载均衡的消费,具体是按分区消费