目录

Raft 协议

目录

Raft算法深度解析 - 第一部分:问题的起源

为什么需要Raft?从单机到分布式的痛苦历程

1. 从单机开始:一切都很简单

想象你在开发一个简单的计数器服务:

// 单机版本 - 完美运行
type Counter struct {
    value int
    mu    sync.Mutex
}

func (c *Counter) Increment() {
    c.mu.Lock()
    c.value++
    c.mu.Unlock()
}

func (c *Counter) Get() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

单机的美好世界

  • 数据一致性:只有一份数据,不会冲突
  • 操作原子性:锁机制保证操作不被打断
  • 故障简单:要么正常运行,要么整个服务挂掉

2. 现实的残酷:单机的局限性

但现实很快给你一巴掌:

问题1:单点故障

用户请求 → [单机服务] → 数据库
                ↑
            服务器宕机 = 整个系统不可用

问题2:性能瓶颈

1000个并发请求 → [单机服务] → 处理不过来,响应变慢

问题3:数据丢失风险

服务器硬盘损坏 → 所有数据丢失 → 业务完蛋

3. 天真的解决方案:简单复制

你想:“那我搞多台服务器不就行了?”

用户请求 → 负载均衡器 → [服务器1] [服务器2] [服务器3]
                        ↓        ↓        ↓
                      数据库1   数据库2   数据库3

看起来很美好,但问题马上出现了…

问题1:数据不一致

时刻T1: 用户A向服务器1发送 Increment() → 服务器1的计数器 = 1
时刻T2: 用户B向服务器2发送 Get() → 服务器2的计数器 = 0 (还没同步)

结果:用户B看到的是旧数据!

问题2:并发冲突

时刻T1: 服务器1和服务器2同时收到 Increment() 请求
服务器1: 读取计数器=5, 计算5+1=6, 写入6
服务器2: 读取计数器=5, 计算5+1=6, 写入6

期望结果:7
实际结果:6 (丢失了一次增量)

4. 进一步尝试:主从复制

你想:“那我设置一个主服务器,其他都是从服务器”

写请求 → [主服务器] → 同步到 → [从服务器1] [从服务器2]
读请求 → [从服务器1] [从服务器2] (分担读压力)

这看起来不错,但新问题又来了…

问题1:主服务器挂了怎么办?

主服务器宕机 → 写操作全部失败 → 系统变成只读

问题2:如何选择新的主服务器?

从服务器1: "我应该成为新主服务器!"
从服务器2: "不,我应该成为新主服务器!"
从服务器3: "你们都不行,我来!"

结果:三个主服务器同时存在 → 数据彻底混乱

问题3:网络分区问题

网络故障导致:
[主服务器] ←→ [从服务器1]    网络正常
     ↕
[从服务器2] ←→ [从服务器3]   网络断开

从服务器2和3认为主服务器挂了,选出新主服务器
现在有两个主服务器在同时工作!

5. 问题的本质:分布式系统的根本挑战

通过上面的尝试,我们发现分布式系统面临的核心问题:

5.1 一致性问题 (Consistency)

  • 多个节点的数据如何保持一致?
  • 如何处理并发更新?
  • 如何保证操作的原子性?

5.2 可用性问题 (Availability)

  • 部分节点故障时系统如何继续工作?
  • 如何快速检测和处理故障?
  • 如何避免单点故障?

5.3 分区容错问题 (Partition Tolerance)

  • 网络分区时如何处理?
  • 如何避免脑裂问题?
  • 如何在网络恢复后保持一致性?

6. CAP定理:残酷的现实

CAP定理告诉我们:在分布式系统中,一致性(C)、可用性(A)、分区容错(P)三者不可兼得,最多只能同时满足两个。

网络分区发生时,你必须选择:

选择一致性(C):
- 停止服务,等待网络恢复
- 保证数据不会不一致
- 但系统变得不可用

选择可用性(A):  
- 继续提供服务
- 但可能产生数据不一致
- 网络恢复后需要解决冲突

7. 我们需要什么样的解决方案?

经过上面的分析,我们需要一个算法能够:

  1. 自动选举领导者:当主节点故障时,能自动选出新的主节点
  2. 保证数据一致性:所有节点的数据最终保持一致
  3. 处理网络分区:网络分区时能正确处理,避免脑裂
  4. 容错能力:少数节点故障时系统仍能正常工作
  5. 简单易懂:算法逻辑清晰,便于实现和验证

这就是Raft算法要解决的问题


下一部分预告

在第二部分中,我们将看到Raft如何巧妙地解决这些问题:

  • Raft的核心思想:强领导者模型
  • 为什么选择"过半数"机制?
  • 选举过程的精妙设计
  • 如何保证日志的一致性

Raft算法深度解析 - 第二部分:Raft的核心设计思想

Raft如何巧妙解决分布式难题

1. Raft的核心洞察:强领导者模型

面对第一部分提到的混乱,Raft提出了一个简单而强大的思想:

“任何时刻,集群中最多只能有一个领导者(Leader),所有的写操作都必须通过领导者处理”

传统多主模式的混乱:
[主1] ←→ [主2] ←→ [主3]  (每个都可以接受写请求,容易冲突)

Raft的强领导者模式:
[Leader] → [Follower1] → [Follower2]  (只有Leader接受写请求)

为什么这个设计如此重要?

问题回顾:在第一部分,我们看到多个主服务器同时工作会导致数据混乱。

Raft的解决方案

  • 同一时刻只有一个Leader
  • 所有写操作都通过Leader序列化处理
  • Follower只负责复制Leader的操作

这样就从根本上避免了并发写冲突!

2. 为什么选择"过半数"机制?

你可能会问:“为什么Raft总是说’过半数’?为什么不是全部同意?”

让我们通过具体例子来理解:

场景1:要求全部节点同意

5个节点的集群:[A] [B] [C] [D] [E]

写操作流程:
1. Leader A收到写请求
2. A向B、C、D、E发送复制请求
3. 等待B、C、D、E全部确认
4. 如果E节点网络故障 → 整个写操作失败

结果:一个节点故障就导致整个系统不可用

场景2:Raft的过半数机制

5个节点的集群:[A] [B] [C] [D] [E]

写操作流程:
1. Leader A收到写请求
2. A向B、C、D、E发送复制请求  
3. 只要收到3个确认(包括自己)就成功
4. 即使D、E故障,仍然有A、B、C三个节点确认

结果:少数节点故障不影响系统可用性

过半数的数学原理

关键洞察:任意两个过半数的集合必然有交集!

5个节点:[A] [B] [C] [D] [E]

过半数集合1:{A, B, C}
过半数集合2:{A, D, E}
交集:{A}

过半数集合1:{B, C, D}  
过半数集合2:{C, D, E}
交集:{C, D}

这意味着什么?

  • 如果一个操作被过半数节点确认,那么任何后续的过半数决策都会"看到"这个操作
  • 这保证了一致性:不会出现两个不相交的过半数集合做出冲突的决策

3. 网络分区时的脑裂防护

现在我们看看Raft如何解决最棘手的脑裂问题:

场景:网络分区

原始集群:[A] [B] [C] [D] [E],A是Leader

网络分区后:
分区1:[A] [B]        (2个节点)
分区2:[C] [D] [E]    (3个节点)

传统方案的问题

分区1:A认为自己还是Leader,继续处理写请求
分区2:C、D、E选出新Leader(比如C),也处理写请求

结果:两个Leader同时工作 → 脑裂!

Raft的解决方案

分区1:[A] [B] (2个节点,不足过半数)
- A无法获得过半数确认
- A自动降级为Follower
- 分区1变为只读状态

分区2:[C] [D] [E] (3个节点,超过半数)  
- 可以选出新Leader
- 继续处理写请求
- 系统保持可用

关键点

  • 只有拥有过半数节点的分区才能选出Leader
  • 少数分区自动变为只读,避免脑裂
  • 网络恢复后,少数分区的节点会同步多数分区的数据

4. Raft的三种角色状态

Raft将每个节点设计为三种状态之一:

4.1 Follower(跟随者)

// 伪代码
type Follower struct {
    currentTerm int64
    votedFor    string
    log         []LogEntry
}

func (f *Follower) HandleMessage(msg Message) {
    switch msg.Type {
    case AppendEntries:
        // 接收Leader的日志复制
        f.appendEntries(msg)
    case RequestVote:
        // 处理投票请求
        f.handleVoteRequest(msg)
    case Timeout:
        // 超时没收到Leader心跳,转为Candidate
        f.becomeCandidate()
    }
}

Follower的职责

  • 被动接收Leader的日志复制
  • 响应投票请求
  • 检测Leader故障(心跳超时)

4.2 Candidate(候选者)

type Candidate struct {
    currentTerm int64
    votedFor    string
    log         []LogEntry
    votes       map[string]bool
}

func (c *Candidate) StartElection() {
    c.currentTerm++           // 增加任期号
    c.votedFor = c.nodeID     // 投票给自己
    c.votes[c.nodeID] = true  // 记录自己的票
    
    // 向所有其他节点发送投票请求
    for _, peer := range c.peers {
        go c.sendVoteRequest(peer)
    }
}

Candidate的职责

  • 发起选举
  • 收集投票
  • 根据结果转为Leader或Follower

4.3 Leader(领导者)

type Leader struct {
    currentTerm int64
    log         []LogEntry
    nextIndex   map[string]int64  // 每个Follower的下一个日志索引
    matchIndex  map[string]int64  // 每个Follower已复制的最高日志索引
}

func (l *Leader) HandleClientRequest(req ClientRequest) {
    // 1. 将请求添加到本地日志
    entry := LogEntry{
        Term:    l.currentTerm,
        Index:   len(l.log) + 1,
        Command: req.Command,
    }
    l.log = append(l.log, entry)
    
    // 2. 并行复制到所有Follower
    for _, peer := range l.peers {
        go l.replicateLog(peer)
    }
}

Leader的职责

  • 处理客户端请求
  • 复制日志到Follower
  • 发送心跳维持权威
  • 决定何时提交日志

5. 状态转换的精妙设计

启动时所有节点都是Follower
         ↓
    [Follower] ←─────────────┐
         │                  │
    超时/没收到心跳           │
         ↓                  │
    [Candidate] ─────────────┤
         │                  │
    ┌────┴────┐              │
    │         │              │
获得多数票   选举失败/         │
    │      发现更高任期        │
    ↓         │              │
 [Leader] ────┴──────────────┘
    │
发现更高任期/失去多数支持
    │
    ↓
[Follower]

状态转换的触发条件

Follower → Candidate

  • 选举超时时间内没收到Leader心跳
  • 说明Leader可能故障了

Candidate → Leader

  • 获得过半数投票
  • 成为新的Leader

Candidate → Follower

  • 选举失败(其他节点成为Leader)
  • 发现更高的任期号

Leader → Follower

  • 发现更高的任期号
  • 网络分区导致失去多数支持

6. 任期号(Term):Raft的逻辑时钟

Raft引入了"任期号"概念来解决时序问题:

时间线:
Term 1: [A是Leader] ────────────→ A故障
Term 2: [选举中...] ────────────→ B成为Leader  
Term 3: [B是Leader] ────────────→ 网络分区
Term 4: [选举中...] ────────────→ C成为Leader

任期号的作用

1. 检测过期信息

if msg.Term < currentTerm {
    // 这是来自过期Leader的消息,忽略
    return
}

2. 发现更新的Leader

if msg.Term > currentTerm {
    // 发现更高任期,立即转为Follower
    currentTerm = msg.Term
    becomeFollower()
}

3. 防止重复投票

if msg.Term == currentTerm && votedFor != nil {
    // 这个任期已经投过票了
    return false
}

7. 为什么这个设计如此优雅?

回顾第一部分提到的问题,看看Raft如何解决:

问题1:如何选择新的主服务器?

Raft解决方案

  • 自动选举机制
  • 过半数投票保证唯一性
  • 任期号防止冲突

问题2:如何避免脑裂?

Raft解决方案

  • 只有过半数分区能选出Leader
  • 少数分区自动变为只读
  • 数学上保证不会有两个Leader

问题3:如何保证数据一致性?

Raft解决方案

  • 强领导者模型序列化所有写操作
  • 过半数复制保证持久性
  • 日志复制保证顺序一致性

下一部分预告

在第三部分中,我们将深入Raft的两个核心算法:

  • 领导者选举的详细过程
  • 日志复制的精妙机制
  • 如何处理各种边界情况

现在应该理解了Raft的核心设计思想 - 它通过强领导者模型和过半数机制,优雅地解决了分布式系统的根本问题。

Raft算法深度解析 - 第三部分A:领导者选举算法

选举算法:如何在混乱中产生唯一领导者

1. 选举的触发时机

选举不是随时发生的,而是有明确的触发条件:

触发条件1:启动时没有Leader

集群启动:[A] [B] [C] [D] [E]
所有节点都是Follower,等待Leader心跳
超时后发现没有Leader → 开始选举

触发条件2:Leader故障

正常运行:Leader A → [B] [C] [D] [E]
A突然宕机 → B、C、D、E等待心跳超时 → 开始选举

触发条件3:网络分区

分区前:Leader A → [B] [C] [D] [E]
分区后:[A] [B] | [C] [D] [E]
右侧分区检测不到A的心跳 → 开始选举

2. 选举超时:防止同时选举的巧妙设计

问题:如果所有Follower同时开始选举会怎样?

时刻T:A、B、C、D、E同时超时
A: "我要当Leader!" → 向B、C、D、E发送投票请求
B: "我要当Leader!" → 向A、C、D、E发送投票请求  
C: "我要当Leader!" → 向A、B、D、E发送投票请求
...

结果:每个人都投票给自己,没人能获得过半数票 → 选举失败

Raft的解决方案:随机化选举超时

// 每个节点的选举超时时间都不同
func randomElectionTimeout() time.Duration {
    // 150ms - 300ms之间的随机值
    return time.Duration(150 + rand.Intn(150)) * time.Millisecond
}

效果

时刻T:所有节点开始计时
时刻T+180ms:节点B首先超时,开始选举
时刻T+200ms:节点C超时,但发现B已经在选举,投票给B
时刻T+250ms:节点A超时,但B已经成为Leader

3. 投票请求的详细过程

当一个Follower决定开始选举时:

步骤1:转为Candidate状态

func (n *Node) becomeCandidate() {
    n.state = Candidate
    n.currentTerm++           // 增加任期号
    n.votedFor = n.nodeID     // 投票给自己
    n.voteCount = 1           // 自己的一票
    n.resetElectionTimer()    // 重置选举计时器
}

步骤2:发送投票请求

type VoteRequest struct {
    Term         int64  // 候选者的任期号
    CandidateID  string // 候选者ID
    LastLogIndex int64  // 候选者最后一条日志的索引
    LastLogTerm  int64  // 候选者最后一条日志的任期号
}

func (n *Node) sendVoteRequests() {
    req := VoteRequest{
        Term:         n.currentTerm,
        CandidateID:  n.nodeID,
        LastLogIndex: n.getLastLogIndex(),
        LastLogTerm:  n.getLastLogTerm(),
    }
    
    for _, peer := range n.peers {
        go n.sendVoteRequest(peer, req)
    }
}

4. 投票决策:不是所有投票请求都会被接受

收到投票请求的节点需要做多项检查:

检查1:任期号检查

func (n *Node) handleVoteRequest(req VoteRequest) VoteResponse {
    if req.Term < n.currentTerm {
        // 过期的候选者,拒绝投票
        return VoteResponse{
            Term:        n.currentTerm,
            VoteGranted: false,
        }
    }
    
    if req.Term > n.currentTerm {
        // 发现更高任期,更新自己的状态
        n.currentTerm = req.Term
        n.votedFor = ""
        n.becomeFollower()
    }
}

检查2:重复投票检查

if n.votedFor != "" && n.votedFor != req.CandidateID {
    // 这个任期已经投票给别人了
    return VoteResponse{
        Term:        n.currentTerm,
        VoteGranted: false,
    }
}

检查3:日志新旧检查(关键!)

// 候选者的日志必须至少和自己一样新
lastLogIndex := n.getLastLogIndex()
lastLogTerm := n.getLastLogTerm()

if req.LastLogTerm < lastLogTerm {
    // 候选者的最后日志任期更旧,拒绝
    return VoteResponse{VoteGranted: false}
}

if req.LastLogTerm == lastLogTerm && req.LastLogIndex < lastLogIndex {
    // 任期相同但索引更小,拒绝
    return VoteResponse{VoteGranted: false}
}

// 通过所有检查,同意投票
n.votedFor = req.CandidateID
return VoteResponse{
    Term:        n.currentTerm,
    VoteGranted: true,
}

5. 为什么需要日志新旧检查?

这是Raft最精妙的设计之一:

场景:没有日志检查会发生什么?

初始状态:
Leader A: [log1, log2, log3]  (最新)
Node B:   [log1, log2]        (落后)
Node C:   [log1, log2, log3]  (最新)

A宕机后:
B开始选举,C投票给B → B成为新Leader
但B的日志比C旧!B会覆盖C的log3 → 数据丢失!

Raft的保护机制:

B开始选举,向C发送投票请求:
B的LastLogIndex=2, LastLogTerm=1
C的LastLogIndex=3, LastLogTerm=1

C检查:B的日志比我旧 → 拒绝投票
B无法获得过半数票 → 选举失败

结果:只有拥有最新日志的节点才能成为Leader,保证数据不丢失!

6. 选举结果的三种情况

情况1:获得过半数票,成为Leader

func (n *Node) handleVoteResponse(resp VoteResponse) {
    if resp.VoteGranted {
        n.voteCount++
        if n.voteCount > len(n.peers)/2 {
            n.becomeLeader()
        }
    }
}

func (n *Node) becomeLeader() {
    n.state = Leader
    n.initializeLeaderState()
    n.sendHeartbeats() // 立即发送心跳确立权威
}

情况2:发现更高任期,转为Follower

if resp.Term > n.currentTerm {
    n.currentTerm = resp.Term
    n.becomeFollower()
}

情况3:选举超时,重新开始选举

func (n *Node) onElectionTimeout() {
    if n.state == Candidate {
        // 选举失败,重新开始
        n.becomeCandidate()
        n.sendVoteRequests()
    }
}

7. 选举的活性保证

问题:会不会出现永远选不出Leader的情况?

理论上可能

轮次1:A和B同时选举,票数相等 → 失败
轮次2:A和B又同时选举,票数相等 → 失败
...无限循环

Raft的解决方案

  1. 随机化超时:降低同时选举的概率
  2. 指数退避:选举失败后等待时间逐渐增加
  3. 数学证明:在网络最终可靠的假设下,最终一定能选出Leader

8. 选举算法的完整流程图

[Follower] 
    │
    │ 选举超时
    ▼
[Candidate]
    │
    ├─ 增加任期号
    ├─ 投票给自己  
    ├─ 重置选举计时器
    └─ 发送投票请求
    │
    ▼
等待投票响应
    │
    ├─ 获得过半数票 ──→ [Leader]
    ├─ 发现更高任期 ──→ [Follower]  
    └─ 选举超时 ────→ 重新选举

小结

选举算法的精妙之处:

  1. 随机化超时:避免同时选举的冲突
  2. 任期号机制:解决时序和冲突问题
  3. 日志新旧检查:保证新Leader拥有最完整的数据
  4. 过半数投票:数学上保证唯一性

下一步讲解日志复制算法,展示Leader如何保证所有节点的数据一致性。

Raft算法深度解析 - 第三部分B:日志复制算法

日志复制:如何保证所有节点数据一致

1. 为什么需要日志?

在分布式系统中,“日志"不是用来调试的log,而是操作序列的有序记录

传统方式的问题

客户端请求:SET x=1
Leader直接更新:x=1
然后通知Follower:x=1

问题:如果通知丢失,Follower的x还是旧值

Raft的日志方式

客户端请求:SET x=1
Leader记录日志:[Index=1, Term=1, Command="SET x=1"]
复制日志到Follower
确认过半数复制后,才执行命令

关键洞察:先记录操作,再执行操作,保证可重放和一致性。

2. 日志条目的结构

type LogEntry struct {
    Index   int64       // 日志索引(从1开始)
    Term    int64       // 创建这条日志时的任期号
    Command interface{} // 客户端命令(如"SET x=1")
}

// 示例日志序列
log := []LogEntry{
    {Index: 1, Term: 1, Command: "SET x=1"},
    {Index: 2, Term: 1, Command: "SET y=2"},
    {Index: 3, Term: 2, Command: "DELETE x"},
    {Index: 4, Term: 2, Command: "SET z=3"},
}

每个字段的作用

  • Index:确定操作顺序,保证所有节点按相同顺序执行
  • Term:标识创建时的Leader,用于检测冲突
  • Command:实际要执行的操作

3. 日志复制的基本流程

步骤1:Leader接收客户端请求

func (l *Leader) HandleClientRequest(cmd Command) {
    // 1. 创建新的日志条目
    entry := LogEntry{
        Index:   len(l.log) + 1,
        Term:    l.currentTerm,
        Command: cmd,
    }
    
    // 2. 添加到本地日志(但不执行)
    l.log = append(l.log, entry)
    
    // 3. 开始复制到Follower
    l.replicateToFollowers(entry)
}

步骤2:并行复制到所有Follower

func (l *Leader) replicateToFollowers(entry LogEntry) {
    for _, follower := range l.followers {
        go l.sendAppendEntries(follower, entry)
    }
}

步骤3:等待过半数确认

func (l *Leader) waitForMajority(entry LogEntry) {
    confirmCount := 1 // Leader自己算一票
    
    for response := range l.responseChan {
        if response.Success {
            confirmCount++
            if confirmCount > len(l.cluster)/2 {
                // 过半数确认,可以提交了
                l.commitEntry(entry)
                break
            }
        }
    }
}

步骤4:提交并执行

func (l *Leader) commitEntry(entry LogEntry) {
    // 1. 标记为已提交
    l.commitIndex = entry.Index
    
    // 2. 执行命令
    result := l.stateMachine.Apply(entry.Command)
    
    // 3. 返回结果给客户端
    l.respondToClient(result)
    
    // 4. 通知Follower可以提交了
    l.notifyFollowersToCommit(entry.Index)
}

4. AppendEntries RPC:日志复制的核心

这是Raft中最重要的RPC调用:

AppendEntries请求结构

type AppendEntriesRequest struct {
    Term         int64      // Leader的任期号
    LeaderID     string     // Leader的ID
    PrevLogIndex int64      // 新日志条目前一条的索引
    PrevLogTerm  int64      // 新日志条目前一条的任期号
    Entries      []LogEntry // 要复制的日志条目(心跳时为空)
    LeaderCommit int64      // Leader的提交索引
}

为什么需要PrevLogIndex和PrevLogTerm?

关键问题:如何保证日志的连续性?

错误的复制方式:
Leader: [1, 2, 3, 4, 5]
Follower: [1, 2, ?, ?, ?]

Leader直接发送条目4和5 → Follower变成[1, 2, 4, 5]
缺少条目3,日志不连续!

Raft的解决方案

// Leader发送条目4时
req := AppendEntriesRequest{
    PrevLogIndex: 3,        // 条目4的前一条是3
    PrevLogTerm:  2,        // 条目3的任期号是2
    Entries:      [entry4], // 要添加的条目
}

// Follower检查
if follower.log[3].Term != req.PrevLogTerm {
    // 条目3不匹配,拒绝添加条目4
    return AppendEntriesResponse{Success: false}
}

5. Follower的日志一致性检查

Follower收到AppendEntries请求时的处理逻辑:

func (f *Follower) handleAppendEntries(req AppendEntriesRequest) AppendEntriesResponse {
    // 1. 任期检查
    if req.Term < f.currentTerm {
        return AppendEntriesResponse{
            Term:    f.currentTerm,
            Success: false,
        }
    }
    
    // 2. 更新任期和Leader信息
    if req.Term > f.currentTerm {
        f.currentTerm = req.Term
        f.votedFor = ""
    }
    f.leaderID = req.LeaderID
    f.resetElectionTimer() // 收到Leader消息,重置选举计时器
    
    // 3. 日志一致性检查
    if req.PrevLogIndex > 0 {
        if len(f.log) < req.PrevLogIndex {
            // 日志太短,缺少前置条目
            return AppendEntriesResponse{Success: false}
        }
        
        if f.log[req.PrevLogIndex-1].Term != req.PrevLogTerm {
            // 前置条目的任期不匹配
            return AppendEntriesResponse{Success: false}
        }
    }
    
    // 4. 添加新条目
    for i, entry := range req.Entries {
        index := req.PrevLogIndex + i + 1
        
        if len(f.log) >= index {
            // 检查是否有冲突
            if f.log[index-1].Term != entry.Term {
                // 有冲突,删除这个位置及之后的所有条目
                f.log = f.log[:index-1]
            }
        }
        
        if len(f.log) < index {
            // 添加新条目
            f.log = append(f.log, entry)
        }
    }
    
    // 5. 更新提交索引
    if req.LeaderCommit > f.commitIndex {
        f.commitIndex = min(req.LeaderCommit, len(f.log))
        f.applyCommittedEntries()
    }
    
    return AppendEntriesResponse{
        Term:    f.currentTerm,
        Success: true,
    }
}

6. 日志修复:处理不一致的情况

当Follower的日志与Leader不一致时,Raft如何修复?

场景:网络分区导致的日志分歧

分区前的状态:
Leader A: [1, 2, 3]
Node B:   [1, 2, 3]
Node C:   [1, 2, 3]

网络分区:A | B,C
A继续接收请求:[1, 2, 3, 4, 5]
B成为新Leader,接收请求:[1, 2, 3, 6, 7]

分区恢复后:
A: [1, 2, 3, 4, 5]  (旧Leader)
B: [1, 2, 3, 6, 7]  (新Leader)

Raft的修复过程

步骤1:B成为Leader后,尝试复制日志

// B向A发送AppendEntries
req := AppendEntriesRequest{
    PrevLogIndex: 5,     // B认为A应该有5条日志
    PrevLogTerm:  2,     // 第5条的任期应该是2
    Entries:      [],    // 先发心跳探测
}

// A检查发现:我的第5条日志任期是1,不是2
// A回复:Success: false

步骤2:B递减索引,寻找一致点

// B收到失败响应,递减nextIndex[A]
B.nextIndex[A] = 4

// 再次尝试
req := AppendEntriesRequest{
    PrevLogIndex: 4,
    PrevLogTerm:  1,     // 第4条的任期是1
    Entries:      [],
}

// A检查:我的第4条日志任期也是1,但我没有第4条!
// A回复:Success: false

步骤3:继续递减直到找到一致点

// B继续递减
B.nextIndex[A] = 3

req := AppendEntriesRequest{
    PrevLogIndex: 3,
    PrevLogTerm:  1,     // 第3条的任期是1
    Entries:      [],
}

// A检查:我的第3条日志任期确实是1
// A回复:Success: true

步骤4:从一致点开始覆盖

// B找到一致点,开始发送正确的日志
req := AppendEntriesRequest{
    PrevLogIndex: 3,
    PrevLogTerm:  1,
    Entries:      [entry6, entry7], // B的第4、5条日志
}

// A接收后:
// 删除自己的第4、5条日志(冲突的4、5)
// 添加B发送的第4、5条日志(6、7)
// 最终:A的日志变成[1, 2, 3, 6, 7]

7. 提交规则:什么时候可以执行命令?

关键原则:只有被过半数节点复制的日志条目才能被提交执行。

提交的条件

func (l *Leader) canCommit(index int64) bool {
    // 1. 必须是当前任期的日志
    if l.log[index-1].Term != l.currentTerm {
        return false
    }
    
    // 2. 必须被过半数节点复制
    replicaCount := 1 // Leader自己
    for _, follower := range l.followers {
        if l.matchIndex[follower] >= index {
            replicaCount++
        }
    }
    
    return replicaCount > len(l.cluster)/2
}

为什么不能提交旧任期的日志?

危险场景

初始:Leader A,日志[1, 2]
A宕机,B成为Leader,添加日志3:[1, 2, 3]
B宕机,A恢复成为Leader,日志还是[1, 2]

如果A直接提交日志2,然后宕机
C成为Leader,可能会覆盖已提交的日志2!

Raft的安全规则

  • Leader只能提交当前任期的日志
  • 提交当前任期日志时,会间接提交之前的所有日志

8. 日志复制的性能优化

优化1:批量发送

// 不是一条一条发送,而是批量发送
func (l *Leader) batchAppendEntries(follower string) {
    entries := l.getPendingEntries(follower)
    if len(entries) > 0 {
        l.sendAppendEntries(follower, entries)
    }
}

优化2:并行复制

// 并行向所有Follower发送
func (l *Leader) replicateInParallel() {
    var wg sync.WaitGroup
    for _, follower := range l.followers {
        wg.Add(1)
        go func(f string) {
            defer wg.Done()
            l.replicateToFollower(f)
        }(follower)
    }
    wg.Wait()
}

优化3:快速回退

// 当发现不一致时,快速定位一致点
type AppendEntriesResponse struct {
    Success      bool
    Term         int64
    ConflictTerm int64  // 冲突条目的任期
    FirstIndex   int64  // 该任期的第一个索引
}

小结

日志复制算法的精妙之处:

  1. 日志条目结构:Index保证顺序,Term检测冲突
  2. 一致性检查:PrevLogIndex/PrevLogTerm确保连续性
  3. 冲突解决:自动回退到一致点,然后覆盖
  4. 提交规则:只提交当前任期的过半数复制日志

这套机制保证了所有节点最终拥有相同的日志序列,从而保证状态机的一致性。

下一步将讲解第四部分:安全性证明和边界情况处理。

Raft算法深度解析 - 第四部分A:安全性保证

Raft的安全性:为什么它能保证数据不丢失不错乱

1. Raft的安全性承诺

Raft算法做出了几个关键的安全性保证:

保证1:选举安全性 (Election Safety)

承诺:任何给定任期内,最多只有一个Leader被选出

为什么重要

如果同时有两个Leader:
Leader A: 处理 SET x=1
Leader B: 处理 SET x=2
结果:数据冲突,系统状态不确定

保证2:Leader只追加 (Leader Append-Only)

承诺:Leader永远不会覆盖或删除自己日志中的条目,只会追加新条目

保证3:日志匹配 (Log Matching)

承诺:如果两个日志在某个索引位置的条目有相同的任期号,那么它们在该位置及之前的所有条目都相同

保证4:Leader完整性 (Leader Completeness)

承诺:如果一个日志条目在某个任期被提交,那么该条目会出现在所有更高任期的Leader日志中

保证5:状态机安全性 (State Machine Safety)

承诺:如果一个服务器在某个索引位置应用了日志条目,那么其他服务器在该索引位置不会应用不同的日志条目

2. 选举安全性的数学证明

定理:在任何给定任期T内,最多只有一个Leader

证明

假设在任期T内有两个Leader:A和B

要成为Leader,A必须获得过半数投票:|votes_A| > n/2
要成为Leader,B必须获得过半数投票:|votes_B| > n/2

因此:|votes_A| + |votes_B| > n

但是,每个节点在任期T内最多只能投票一次
所以:|votes_A ∩ votes_B| = ∅

这意味着:|votes_A| + |votes_B| ≤ n

矛盾!因此任期T内最多只有一个Leader。

3. Leader完整性的证明

定理:如果日志条目在任期T被提交,那么该条目会出现在所有任期>T的Leader日志中

证明思路

  1. 日志条目被提交意味着被过半数节点复制
  2. 要成为新Leader必须获得过半数投票
  3. 任意两个过半数集合必有交集
  4. 投票时的日志新旧检查保证新Leader拥有最完整的日志

4. 为什么不能提交旧任期的日志?

危险场景

S1: [1, 2] (Term=2, Leader)
S2: [1, 2]
S3: [1]

S1宕机,S3成为Leader (Term=3),添加条目3:
S3: [1, 3] (Term=3, Leader)

S1恢复成为Leader (Term=4):
如果S1直接提交索引2的条目,然后宕机
S3重新成为Leader,会覆盖已提交的条目2!

Raft的解决方案

// 只能提交当前任期的日志
if l.log[index-1].Term != l.currentTerm {
    return false
}

5. 网络分区时的安全性

分区:[A,B] | [C,D,E]

分区1 (少数):
- A无法获得过半数确认
- 变为只读状态

分区2 (多数):
- 可以选出新Leader
- 继续处理写请求

网络恢复:
- A自动降级为Follower
- 同步新Leader的日志

小结

Raft通过严格的数学证明保证安全性:

  • 过半数机制防止脑裂
  • 日志检查保证完整性
  • 提交规则保证一致性

下一步将讲解边界情况处理。

Raft算法深度解析 - 第四部分B:边界情况和实现细节

魔鬼在细节:Raft实现中的边界情况

1. 时序问题:选举超时的精妙设计

问题:如何避免选举活锁?

场景:两个节点同时开始选举

时刻T:A和B同时超时,开始选举
A → C: 请求投票 (Term=5)
B → C: 请求投票 (Term=5)

C只能投票给其中一个,假设投给A
A获得2票(自己+C),B获得1票(自己)
都没有过半数 → 选举失败

下一轮:A和B又可能同时超时...

解决方案:随机化超时时间

func (n *Node) resetElectionTimer() {
    // 150-300ms的随机超时
    timeout := time.Duration(150+rand.Intn(150)) * time.Millisecond
    n.electionTimer.Reset(timeout)
}

为什么这样有效?

  • 数学上,随机化打破了同步性
  • 即使偶尔同时超时,下次超时时间也不同
  • 最终总有一个节点会"抢跑"成功

超时时间的选择原则

选举超时 >> 心跳间隔 >> 网络往返时间

典型配置:
- 心跳间隔:50ms
- 选举超时:150-300ms  
- 网络RTT:1-5ms

原因:
- 避免网络抖动导致误选举
- 给Leader足够时间发送心跳
- 故障检测要快于选举超时

2. 日志压缩:快照机制

问题:日志无限增长怎么办?

随着系统运行,日志越来越长:
[1, 2, 3, 4, 5, ..., 1000000]

问题:
- 内存占用过大
- 新节点同步时间过长
- 重启恢复时间过长

解决方案:快照(Snapshot)机制

type Snapshot struct {
    LastIncludedIndex int64  // 快照包含的最后一条日志索引
    LastIncludedTerm  int64  // 快照包含的最后一条日志任期
    Data              []byte // 状态机快照数据
}

func (n *Node) createSnapshot() {
    // 1. 创建状态机快照
    snapshotData := n.stateMachine.CreateSnapshot()
    
    // 2. 记录快照元信息
    snapshot := Snapshot{
        LastIncludedIndex: n.lastApplied,
        LastIncludedTerm:  n.log[n.lastApplied-1].Term,
        Data:              snapshotData,
    }
    
    // 3. 保存快照
    n.saveSnapshot(snapshot)
    
    // 4. 删除已快照的日志
    n.log = n.log[n.lastApplied:]
    
    // 5. 调整索引
    n.adjustIndices(n.lastApplied)
}

快照的发送和安装

// Leader向落后的Follower发送快照
type InstallSnapshotRequest struct {
    Term              int64
    LeaderID          string
    LastIncludedIndex int64
    LastIncludedTerm  int64
    Data              []byte
}

func (f *Follower) handleInstallSnapshot(req InstallSnapshotRequest) {
    // 1. 检查任期
    if req.Term < f.currentTerm {
        return InstallSnapshotResponse{Term: f.currentTerm}
    }
    
    // 2. 安装快照
    f.stateMachine.InstallSnapshot(req.Data)
    
    // 3. 更新日志状态
    f.log = []LogEntry{} // 清空日志
    f.lastApplied = req.LastIncludedIndex
    f.commitIndex = req.LastIncludedIndex
    
    // 4. 保存快照元信息
    f.lastIncludedIndex = req.LastIncludedIndex
    f.lastIncludedTerm = req.LastIncludedTerm
}

3. 成员变更:动态调整集群大小

问题:如何安全地添加/删除节点?

天真的方法:直接切换配置

旧配置:[A, B, C] (3个节点)
新配置:[A, B, C, D, E] (5个节点)

问题:不同节点可能在不同时间看到配置变更
A,B看到旧配置:需要2票成为Leader
C,D,E看到新配置:需要3票成为Leader
可能同时有两个Leader!

Raft的解决方案:联合共识(Joint Consensus)

阶段1:进入联合配置 C_old,new
- 需要在C_old和C_new中都获得过半数
- 例如:C_old=[A,B,C], C_new=[A,B,C,D,E]
- 成为Leader需要:C_old中2票 AND C_new中3票

阶段2:切换到新配置 C_new
- 只需要在C_new中获得过半数
- 安全地完成配置变更
type Configuration struct {
    Old []string // 旧配置节点列表
    New []string // 新配置节点列表
}

func (l *Leader) addServer(newServer string) {
    // 1. 创建联合配置
    jointConfig := Configuration{
        Old: l.config.Servers,
        New: append(l.config.Servers, newServer),
    }
    
    // 2. 提交联合配置
    l.proposeConfigChange(jointConfig)
    
    // 3. 等待联合配置提交后,提交新配置
    go func() {
        <-l.jointConfigCommitted
        newConfig := Configuration{
            Old: nil,
            New: jointConfig.New,
        }
        l.proposeConfigChange(newConfig)
    }()
}

4. 网络分区恢复:数据同步

场景:长时间分区后的恢复

分区前:[A, B, C, D, E], A是Leader

长时间分区:
分区1:[A, B] (处理了100个请求)
分区2:[C, D, E] (C成为Leader,处理了200个请求)

分区恢复:如何同步数据?

恢复过程

func (f *Follower) handleAppendEntries(req AppendEntriesRequest) {
    // 1. 发现更高任期,立即转为Follower
    if req.Term > f.currentTerm {
        f.currentTerm = req.Term
        f.votedFor = ""
        f.state = Follower
    }
    
    // 2. 日志一致性检查失败
    if req.PrevLogIndex > len(f.log) {
        return AppendEntriesResponse{
            Success:      false,
            ConflictIndex: len(f.log) + 1,
        }
    }
    
    // 3. 发现冲突,删除冲突及之后的所有日志
    if f.log[req.PrevLogIndex-1].Term != req.PrevLogTerm {
        conflictTerm := f.log[req.PrevLogIndex-1].Term
        conflictIndex := req.PrevLogIndex
        
        // 找到冲突任期的第一个索引
        for i := req.PrevLogIndex - 1; i >= 0; i-- {
            if f.log[i].Term != conflictTerm {
                break
            }
            conflictIndex = i + 1
        }
        
        // 删除冲突的日志
        f.log = f.log[:conflictIndex-1]
        
        return AppendEntriesResponse{
            Success:       false,
            ConflictTerm:  conflictTerm,
            ConflictIndex: conflictIndex,
        }
    }
}

5. 客户端交互:幂等性和重复检测

问题:网络重传导致的重复执行

客户端发送:SET x=1
Leader执行:x=1
响应丢失,客户端重传:SET x=1
Leader再次执行:x=1 (重复执行)

解决方案:客户端会话和序列号

type ClientRequest struct {
    ClientID   string // 客户端唯一标识
    SequenceID int64  // 请求序列号
    Command    interface{}
}

type ClientSession struct {
    LastSequenceID int64      // 最后处理的序列号
    LastResponse   interface{} // 最后的响应结果
}

func (l *Leader) handleClientRequest(req ClientRequest) {
    session := l.clientSessions[req.ClientID]
    
    // 检查是否是重复请求
    if req.SequenceID <= session.LastSequenceID {
        // 返回缓存的响应
        return session.LastResponse
    }
    
    // 处理新请求
    entry := LogEntry{
        Index:      len(l.log) + 1,
        Term:       l.currentTerm,
        Command:    req,
        ClientID:   req.ClientID,
        SequenceID: req.SequenceID,
    }
    
    l.log = append(l.log, entry)
    l.replicateToFollowers(entry)
}

6. 性能优化:批处理和流水线

批处理:减少网络开销

func (l *Leader) batchReplication() {
    ticker := time.NewTicker(10 * time.Millisecond)
    
    for {
        select {
        case <-ticker.C:
            for _, follower := range l.followers {
                entries := l.getPendingEntries(follower)
                if len(entries) > 0 {
                    l.sendBatchAppendEntries(follower, entries)
                }
            }
        }
    }
}

流水线:并行处理多个请求

func (l *Leader) pipelineReplication(follower string) {
    // 不等待前一个请求的响应,直接发送下一个
    for {
        entries := l.getNextBatch(follower)
        go l.sendAppendEntries(follower, entries)
        
        // 根据网络状况调整发送速率
        time.Sleep(l.calculateDelay(follower))
    }
}

7. 故障检测:心跳和超时

精确的故障检测

type FailureDetector struct {
    heartbeatInterval time.Duration
    timeoutThreshold  time.Duration
    lastHeartbeat     map[string]time.Time
}

func (fd *FailureDetector) isNodeAlive(nodeID string) bool {
    lastSeen := fd.lastHeartbeat[nodeID]
    return time.Since(lastSeen) < fd.timeoutThreshold
}

// 自适应超时:根据网络状况调整
func (fd *FailureDetector) updateTimeout(nodeID string, rtt time.Duration) {
    // 超时 = 平均RTT * 倍数 + 安全边界
    fd.timeoutThreshold = rtt*4 + 50*time.Millisecond
}

8. 持久化:WAL和状态恢复

Write-Ahead Log (WAL)

func (n *Node) persistState() error {
    state := PersistentState{
        CurrentTerm: n.currentTerm,
        VotedFor:    n.votedFor,
        Log:         n.log,
    }
    
    // 原子写入:先写临时文件,再重命名
    tempFile := n.dataDir + "/state.tmp"
    if err := writeToFile(tempFile, state); err != nil {
        return err
    }
    
    return os.Rename(tempFile, n.dataDir+"/state.dat")
}

func (n *Node) recoverState() error {
    data, err := ioutil.ReadFile(n.dataDir + "/state.dat")
    if err != nil {
        return err
    }
    
    var state PersistentState
    if err := json.Unmarshal(data, &state); err != nil {
        return err
    }
    
    n.currentTerm = state.CurrentTerm
    n.votedFor = state.VotedFor
    n.log = state.Log
    
    return nil
}

小结

Raft的实现细节体现了分布式系统的复杂性:

  1. 时序控制:随机化解决同步问题
  2. 资源管理:快照机制控制内存增长
  3. 动态配置:联合共识保证安全变更
  4. 容错恢复:智能的日志修复机制
  5. 性能优化:批处理和流水线技术
  6. 持久化:WAL保证数据不丢失

这些细节让Raft从理论算法变成可用的工程实现。