etcd-raft 源码学习笔记(PreVote)

Posted by Masutangu on July 8, 2018

这篇文章介绍 etcd-raft 的 PreVote 机制,避免由于网络分区导致 candidate 的 term 不断增大。

Election timeout 之后,发送 type 为 pb.MsgHup 的请求,进入选举阶段:

1
2
3
4
5
6
7
8
9
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
	r.electionElapsed++

	if r.promotable() && r.pastElectionTimeout() {
		r.electionElapsed = 0
		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
	}
}

如果设置了 preVote 为 true,则先进入 prevote 阶段。调用 r.campaign 传入 type campaignPreElection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (r *raft) Step(m pb.Message) error {
    switch m.Type {
	case pb.MsgHup:
		if r.state != StateLeader {
			r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
			if r.preVote {
                // 如果 preVote 设置为 true,先发起 campaignPreElection
				r.campaign(campaignPreElection)
			} else {
				r.campaign(campaignElection)
			}
		} else {
			r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
        }
    }
    return nil
}

campaign 方法处理选举逻辑。如果是 campaignPreElection,设置节点状态为 StatePreCandidate,此时不会递增节点的 Term(避免 term 增长过快)。然后向其他 peers 发送 type 为 pb.MsgPreVote 的请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (r *raft) campaign(t CampaignType) {
	var term uint64
	var voteMsg pb.MessageType
	if t == campaignPreElection {
		r.becomePreCandidate()  // state 设置为 StatePreCandidate
		voteMsg = pb.MsgPreVote  // msg type 设置为 preVote
		// PreVote RPCs are sent for the next term before we've incremented r.Term.
		term = r.Term + 1  // preVote 不会递增 r.Term
	} else {
		...
	}
	if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
		// We won the election after voting for ourselves (which must mean that
		// this is a single-node cluster). Advance to the next state.
		if t == campaignPreElection {
			r.campaign(campaignElection)  // prevote 成功,可以发起 campaignElection 了
		} else {
			... 
		}
		return
    }
    
    // 广播 pb.MsgPreVote
	for id := range r.prs {
		if id == r.id {
			continue
		}
		var ctx []byte
		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}

看看 stepCandidate 如何处理 pb.MsgPreVote 请求的回包。检查选票是否达到 quorum 数量,如果已经达到,prevote 成功,可以发起真正的选举了,调用 r.campaign 传入 type campaignElection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
	// Only handle vote responses corresponding to our candidacy (while in
	// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
	// our pre-candidate state).
	var myVoteRespType pb.MessageType
	if r.state == StatePreCandidate {
		myVoteRespType = pb.MsgPreVoteResp
	} else {
		myVoteRespType = pb.MsgVoteResp
	}
	switch m.Type {
	case myVoteRespType:
		gr := r.poll(m.From, m.Type, !m.Reject)
		switch r.quorum() {
		case gr:
			if r.state == StatePreCandidate {
				r.campaign(campaignElection)  // prevote 成功,可以发起 campaignElection
			} else {
				...
			}
		case len(r.votes) - gr:  // prevote 失败(m.Reject 为 true,此时 m.Term > r.Term),转为 follower 角色
			// pb.MsgPreVoteResp contains future term of pre-candidate
			// m.Term > r.Term; reuse r.Term
			r.becomeFollower(r.Term, None)
		}
	}
	return nil
}

如果 campaign 类型为 campaignElection,则调用 r.becomeCandidate,此时设置节点状态为 StateCandidate,递增节点的 Term,并向其他 peers 发送 pb.MsgVote 请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (r *raft) campaign(t CampaignType) {
	var term uint64
	var voteMsg pb.MessageType
	if t == campaignPreElection {
		...
	} else {
		r.becomeCandidate()  // become cancdidate,term 递增
		voteMsg = pb.MsgVote
		term = r.Term
	}
	if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
		// We won the election after voting for ourselves (which must mean that
		// this is a single-node cluster). Advance to the next state.
		if t == campaignPreElection {
			...
		} else {
			r.becomeLeader()  // 得到 quorum 的选票,选举 leader 成功,become leader
		}
		return
    }
    
    // 广播 pb.MsgVote,进行选举
	for id := range r.prs {
		if id == r.id {
			continue
		}
		var ctx []byte
		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
	}
}

收到 pb.MsgVote 的回包后,同样检查是否选票数量是否达到 quorum,成功则该节点当选 leader:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func stepCandidate(r *raft, m pb.Message) error {
	// Only handle vote responses corresponding to our candidacy (while in
	// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
	// our pre-candidate state).
	var myVoteRespType pb.MessageType
	if r.state == StatePreCandidate {
		myVoteRespType = pb.MsgPreVoteResp
	} else {
		myVoteRespType = pb.MsgVoteResp
	}
	switch m.Type {
	case myVoteRespType:
		gr := r.poll(m.From, m.Type, !m.Reject)
		r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
		switch r.quorum() {
		case gr:
			if r.state == StatePreCandidate {
				...
			} else {
				r.becomeLeader() // 收到 quorum 选票,选举成功
				r.bcastAppend()  // 广播 append 消息
			}
		case len(r.votes) - gr:
			// pb.MsgPreVoteResp contains future term of pre-candidate
			// m.Term > r.Term; reuse r.Term
			r.becomeFollower(r.Term, None)
		}
	}
	return nil
}