03、Consul arch 一致性协议 raft

Raft 是一种强一致性协议。注意是 consul 中 raft 指 servers 间的协议,与 client 无关。

当consul server 收到 consul client 转发的请求或者同步过来的信息时,consul servers 内部需要保持一致,这就是 raft 的作用。

Raft 的几个关键概念:

Log - 行为序列(ordered sequence of entries)。Entry 包括所有的集群行为:添加节点、注册服务、写kv对等等。

Peer set - 参与 log replication 的所有成员,即所有 servers。

Quorum - 法定人数。对于 N 来说,quorum = (N / 2) + 1,即刚刚超过半数的 servers 数量。

即当 servers 只有 2 个时,quorum 也为 2,则当其中一个不可用时,整个系统则无法达到 quorum 数,将无法更新集群状态,同时,考虑同步性能,推荐 server 数为 3 或者 5 个。为什么不是不推荐 4 个?因为 4 个和 3 个一样,都只能容错一台节点错误。

Committed Entry - 达到 quorum 数的 server 收到了某个 entry,则称这个 entry 为 committed entry。

Term - 任期。每个 Term 都是一个连续递增的编号,每一轮选举都是一个 Term 周期,在一个 Term 中只能产生一个 leader。

Raft 的节点一共有三种角色:leader、follower、candidate。所有的 server 一开始都是 follower 。

正常情况下 follower 从 leader 处接收 entry,如果超过一定时间没有接收到 entry,则暗示着 leader 有可能出问题,则该 follower 自动成为 candidate。注意,当 follower 接收到 client 的 RPC 请求,follower 不进行处理,而是转发给 leader 进行处理。

在candidate 状态下,可以请求其他的节点给自己投票。当票数达到 quorum 时,就转变为 leader 状态。

在leader 状态下,需要处理 log。如果是读请求,则 leader 根据当前状态返回所需结果,如果是写请求,则生成一个新的 log 并且开始同步,直到 这个 log 被 committed(达到 quorum 数节点收到) 并且 applied(写入状态机)则写完成。

一致性模式

default - 只允许 leader 节点响应请求,但是当 leader 节点变更时,可能会出现两个 leader ,在这种情况下,老 leader 响应的可能是脏数据。

consistent - 强一致性但是最慢,leader 在响应读请求时,需要遍历其他节点确保自己是 leader。

stale - 允许非 leader 节点响应读请求。速度最快,但是值有可能是过期的(通常不会超过50ms过期)。

关键过程

raft 主要包括 Leader ElectionLog Replication 两个关键步骤。

Leader Election

心跳超时(heartbeat timeout) - follower 与 leader 进行通信的最大时间间隔。

选举超时(election timeout) - candidate 收到每张选票的最大时间间隔。

当follower 心跳超时,会成为 candidate 进行选举开始选举。candidate 会向每个节点请求投票(request votes),若出现选举超时,则重新开始请求投票;若收到法定人数的计票则成为 leader。注意 candidate 会在开始请求投票之前给自己投一票,即只需(法定人数 - 1)张票即可。

注意当candidate 和 leader 接收到新 term 的请求时,都会退化为 follower。

Log Replication

leader 需要向其他节点同步操作,即 log replication 过程,该过程通过 append entry 指令发出。当有法定数节点(包括 leader 自身)感知到状态更新后,即称为该 log 为 committed。

所以一个有 3 个和 4 个 servers 的 raft 集群,容错节点数都为 1 个节点。

一次完整 Log Replication 如下进行:

1、 client请求leader更新状态;
2、 leader向其他节点发出appendentry指令;
3、 其他节点相应指令并成功返回给leader;
4、 当leader发现log已committed,即返回给client;

实现

当agent 运行在 server mode 下时会初始化 raft 实例:

https://github.com/hashicorp/raft

 func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {

    // ...

    // 初始化为 follower
    r.setState(Follower)

    // Start as leader if specified. This should only be used
    // for testing purposes.
    if conf.StartAsLeader {

        r.setState(Leader)
        r.setLeader(r.localAddr)
    }

    // ...

    // Start the background work.
    r.goFunc(r.run)
    r.goFunc(r.runFSM)
    r.goFunc(r.runSnapshots)
    return r, nil
}

重点关注 r.goFunc(r.run) 中的 run 方法:

 // run is a long running goroutine that runs the Raft FSM.
func (r *Raft) run() {

    for {

        // Check if we are doing a shutdown
        select {

        case <-r.shutdownCh:
            r.setLeader("")
            return
        default:
        }

        // 根据节点当前角色运行对应的处理函数
        switch r.getState() {

        case Follower:
            r.runFollower()
        case Candidate:
            r.runCandidate()
        case Leader:
            r.runLeader()
        }
    }
}

对于follower 来说需要处理心跳超时:

 // runFollower runs the FSM for a follower.
func (r *Raft) runFollower() {

    didWarn := false
    r.logger.Info("entering follower state", "follower", r, "leader", r.Leader())
    metrics.IncrCounter([]string{

     "raft", "state", "follower"}, 1)
    heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)

    for r.getState() == Follower {

        select {

        case rpc := <-r.rpcCh:
            r.processRPC(rpc) // setLastContact

        // ...

        case a := <-r.applyCh:
            // Reject any operations since we are not the leader
            a.respond(ErrNotLeader)

        case <-heartbeatTimer: // 心跳超时
            // 重置随机 timeout,避免同时进行选举
            heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout) 

            // Check if we have had a successful contact
            lastContact := r.LastContact() // 上一次和 leader 通信的时刻
            if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout {

                continue
            }

            r.setLeader("") // 至此, leader 通信失败,置空 leader

            r.setState(Candidate) // 转为 candidate 角色,跳出循环

        case <-r.shutdownCh:
            return
        }
    }
}

其中的processRPC 方法会执行 append entry 等指令:

 func (r *Raft) processRPC(rpc RPC) {

    if err := r.checkRPCHeader(rpc); err != nil {

        rpc.Respond(nil, err)
        return
    }

    switch cmd := rpc.Command.(type) {

    case *AppendEntriesRequest: // 适用于 follower 的 append entry
        r.appendEntries(rpc, cmd)
    case *RequestVoteRequest: // candidate 请求投票
        r.requestVote(rpc, cmd)
    case *InstallSnapshotRequest:
        r.installSnapshot(rpc, cmd)
    case *TimeoutNowRequest:
        r.timeoutNow(rpc, cmd)
    default:
        r.logger.Error("got unexpected command",
            "command", hclog.Fmt("%#v", rpc.Command))
        rpc.Respond(nil, fmt.Errorf("unexpected command"))
    }
}

针对candidate 来说需要处理选举超时:

 // runCandidate runs the FSM for a candidate.
func (r *Raft) runCandidate() {

    r.logger.Info("entering candidate state", "node", r, "term", r.getCurrentTerm()+1)
    metrics.IncrCounter([]string{

     "raft", "state", "candidate"}, 1)

    // 开始请求其他节点给自己投票,注意自己会给自己投一票
    voteCh := r.electSelf()

    electionTimer := randomTimeout(r.conf.ElectionTimeout)

    // Tally the votes, need a simple majority
    grantedVotes := 0
    votesNeeded := r.quorumSize() // 法定人数即超过半数节点数
    r.logger.Debug("votes", "needed", votesNeeded)

    for r.getState() == Candidate {

        select {

        case rpc := <-r.rpcCh:
            r.processRPC(rpc)

        case vote := <-voteCh:
            // 如果发现有更新的 term 则退化为 follower
            if vote.Term > r.getCurrentTerm() {

                r.logger.Debug("newer term discovered, fallback to follower")
                r.setState(Follower)
                r.setCurrentTerm(vote.Term)
                return
            }

            // Check if the vote is granted
            if vote.Granted {

                grantedVotes++
                r.logger.Debug("vote granted", "from", vote.voterID, "term", vote.Term, "tally", grantedVotes)
            }

            // 成功当选为 leader
            if grantedVotes >= votesNeeded {

                r.logger.Info("election won", "tally", grantedVotes)
                r.setState(Leader)
                r.setLeader(r.localAddr)
                return
            }

        case a := <-r.applyCh:
            a.respond(ErrNotLeader)
        case <-electionTimer: // 选举超时
            r.logger.Warn("Election timeout reached, restarting election")
            return

        case <-r.shutdownCh:
            return
        }
    }
}

参考:

  • https://www.consul.io/docs/architecture/consensus
  • http://thesecretlivesofdata.com/raft
  • https://raft.github.io/

来源:https://blog.csdn.net/jikunk8/category_10622611.html