etcd-raft 源码学习笔记(Linearizable Read 之 ReadIndx)

Posted by Masutangu on July 5, 2018

这篇文章介绍 etcd-raft 如何实现 linearizable read(linearizable read 简单的说就是不返回 stale 数据,具体可以看这篇文章 《Strong consistency models》)。

raft 论文第 8 节阐述了思路:

Read-only operations can be handled without writing anything into the log. However, with no additional measures, this would run the risk of returning stale data, since the leader responding to the request might have been superseded by a newer leader of which it is unaware. Linearizable reads must not return stale data, and Raft needs two extra precautions to guarantee this without using the log. First, a leader must have the latest information on which entries are committed. The Leader Completeness Property guarantees that a leader has all committed entries, but at the start of its term, it may not know which those are. To find out, it needs to commit an entry from its term. Raft handles this by having each leader commit a blank no-op entry into the log at the start of its term. Second, a leader must check whether it has been deposed before processing a read-only request (its information may be stale if a more recent leader has been elected). Raft handles this by having the leader exchange heartbeat messages with a majority of the cluster before responding to read-only requests.

在收到读请求时,leader 节点保存下当前的 commit index,并往 peers 发送心跳。如果确定该节点依然是 leader,则只需要等到该 commit index 的 log entry 被 apply 到状态机时就可以返回客户端结果。

我们先通过位于 etcd/etcdserver 目录下的样例来看看应用层是如何使用 ReadIndex 来保证 linearizable read 的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// v3_server.go

type RaftKV interface {
	Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
	Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
	DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
	Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
	Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
	var resp *pb.RangeResponse
	var err error

	if !r.Serializable {
		err = s.linearizableReadNotify(ctx)  // 等待 linearizableReadNotify 返回 才能继续往下走
		if err != nil {
			return nil, err
		}
	}
	// 读取数据逻辑 省略..
	...
	return resp, err
}

在读请求 Range 执行前,调用了 linearizableReadNotify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
	s.readMu.RLock()
	nc := s.readNotifier
	s.readMu.RUnlock()

	// signal linearizable loop for current notify if it hasn't been already
	select {
	case s.readwaitc <- struct{}{}:
	default:
	}

	// wait for read state notification
	select {
	case <-nc.c:
		return nc.err
	case <-ctx.Done():
		return ctx.Err()
	case <-s.done:
		return ErrStopped
	}
}

linearizableReadNotifyreadwaitc 发送个空的结构体,并且等待 nc.c 的返回。readwaitc 是在另外的 goroutine linearizableReadLoop 里监听的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

func (s *EtcdServer) linearizableReadLoop() {
	var rs raft.ReadState

	for {
		ctx := make([]byte, 8)
		binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())  // ctx 即请求唯一标识 reqId

		select {
		case <-s.readwaitc:  // 监听 readwaitc
		case <-s.stopping:
			return
		}

		nextnr := newNotifier()
		nr := s.readNotifier
		s.readNotifier = nextnr

		s.r.ReadIndex(cctx, ctx)  // 调用 ReadIndex 接口,往 recvc channel 发送 type 为 pb.MsgReadIndex 的请求

		var (
			timeout bool
			done    bool
		)
		for !timeout && !done {
			select {
			case rs = <-s.r.readStateC:  // 收到 ready 对象时,会往 readStateC channel 传回来 readState,见 etcd/etcdserver/raft.go 文件的 func (r *raftNode) start(rh *raftReadyHandler)
				done = bytes.Equal(rs.RequestCtx, ctx)  // 比较下 reqId 是否一致
			case <-time.After(s.Cfg.ReqTimeout()):
				nr.notify(ErrTimeout)
				timeout = true
			case <-s.stopping:
				return
			}
		}
		if !done {
			continue
		}

        // 等待 readState 里的 index,也就是收到 pb.MsgReadIndex 请求时,leader 节点当前的 commit index 被 apply 到状态机时,此时调用 nr.notify(nil) 通知应用层可以读取状态机里的数据了,确保读到的不是 stale 数据
		if ai := s.getAppliedIndex(); ai < rs.Index {
			select {
			case <-s.applyWait.Wait(rs.Index):
			case <-s.stopping:
				return
			}
		}
		// unblock all l-reads requested at indices before rs.Index
		nr.notify(nil)
	}
}

linearizableReadLoop 调用 nr.notify 后,linearizableReadNotify 从 select 阻塞中返回,此时就可以继续走 Range 的逻辑,读取数据,返回给客户端。

从上面的例子,我们了解了应用层如何使用 Node 的 ReadIndex 接口来实现 linearizable read。下面我们来介绍 ReadIndex 这个新接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Node represents a node in a raft cluster.
type Node interface {
	// Propose proposes that data be appended to the log.
	Propose(ctx context.Context, data []byte) error

	// Ready returns a channel that returns the current point-in-time state.
	// Users of the Node must call Advance after retrieving the state returned by Ready.
	//
	// NOTE: No committed entries from the next Ready may be applied until all committed entries
	// and snapshots from the previous one have finished.
	Ready() <-chan Ready

	// Advance notifies the Node that the application has saved progress up to the last Ready.
	// It prepares the node to return the next available Ready.
	//
	// The application should generally call Advance after it applies the entries in last Ready.
	//
	// However, as an optimization, the application may call Advance while it is applying the
	// commands. For example. when the last Ready contains a snapshot, the application might take
	// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
	// progress, it can call Advance before finishing applying the last ready.
	Advance()

	// ReadIndex request a read state. The read state will be set in the ready.
	// Read state has a read index. Once the application advances further than the read
	// index, any linearizable read requests issued before the read request can be
	// processed safely. The read state will have the same rctx attached.
	ReadIndex(ctx context.Context, rctx []byte) error
}

func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
	return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry})
}

上篇文章 《etcd-raft 源码学习笔记(概览篇)》 提到当节点为 leader 时,step 被设置为 stepLeader 。我们来看看 stepLeader 是如何处理 type 为 pb.MsgReadIndex 的 readIndexReq 的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func stepLeader(r *raft, m pb.Message) error {
	// These message types do not require any progress for m.From.
	switch m.Type {
	case pb.MsgReadIndex:
		// raft 5.4 safty 检查
		if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
			// Reject read only request when this leader has not committed any log entry at its term.
			return nil
		}

		// thinking: use an interally defined context instead of the user given context.
		// We can express this in terms of the term and index instead of a user-supplied value.
		// This would allow multiple reads to piggyback on the same message.
		switch r.readOnly.option {
		case ReadOnlySafe:
			r.readOnly.addRequest(r.raftLog.committed, m)  // r.raftLog.committed 为 当前 commit index
			r.bcastHeartbeatWithCtx(m.Entries[0].Data)  // 广播心跳包
		}
		return nil
	}
	return nil
}

	

收到 readIndexReq 后,首先调用 r.readOnly.addRequest 保存下,然后调用 bcastHeartbeatWithCtx 广播心跳包, ctx 即唯一标识 readIndexReq 的 reqId。

来看看 raft 是如何管理 readIndexReq 的:

1
2
3
4
5
6
7
8
9
10
11
12
// addRequest adds a read only reuqest into readonly struct.
// `index` is the commit index of the raft state machine when it received
// the read only request.
// `m` is the original read only request message from the local or remote node.
func (ro *readOnly) addRequest(index uint64, m pb.Message) {
	ctx := string(m.Entries[0].Data)  // ctx 即 reqId
	if _, ok := ro.pendingReadIndex[ctx]; ok {
		return
	}
	ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}  // acks 用于记录哪些 peer 已经 ack 确认。之后用于统计是否大于 quonum
	ro.readIndexQueue = append(ro.readIndexQueue, ctx)  // append 进 readIndexQueue
}

再看看 stepLeader 如何处理心跳回包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func stepLeader(r *raft, m pb.Message) error {
	// These message types do not require any progress for m.From.
	switch m.Type {
	case pb.MsgHeartbeatResp:
		pr.RecentActive = true
		pr.resume()

		if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
			return nil
		}

		ackCount := r.readOnly.recvAck(m)
		if ackCount < r.quorum() {  // 判断是否收到 quorum 的心跳回包
			return nil
		}

        // 收到 quorum 的心跳回包了,把 readIndexReq 依次 append r.readStates 中,返回 ready 对象时会包含 r.readStates
		rss := r.readOnly.advance(m)
		for _, rs := range rss {
			req := rs.req
			if req.From == None || req.From == r.id { // from local member
				r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
			} else {
				r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
			}
		}
	return nil
	}
	return nil
}

调用 r.readOnly.recvAck,根据 readIndeReq 的 reqId 统计收到心跳回包的数量:

1
2
3
4
5
6
7
8
9
10
11
12
13
// recvAck notifies the readonly struct that the raft state machine received
// an acknowledgment of the heartbeat that attached with the read only request
// context.
func (ro *readOnly) recvAck(m pb.Message) int {
	rs, ok := ro.pendingReadIndex[string(m.Context)]
	if !ok {
		return 0
	}

	rs.acks[m.From] = struct{}{}  // 记录下收到 m.From 这个节点的 ack
	// add one to include an ack from local node
	return len(rs.acks) + 1
}

如果超过 quonum 表示该节点依然是 leader,此时从 r.readOnly.advance 拿到保存的 readIndexReq,append 到 r.readStates 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// advance advances the read only request queue kept by the readonly struct.
// It dequeues the requests until it finds the read only request that has
// the same context as the given `m`.
func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
	var (
		i     int
		found bool
	)

	ctx := string(m.Context)
	rss := []*readIndexStatus{}

	for _, okctx := range ro.readIndexQueue {
		i++
		rs, ok := ro.pendingReadIndex[okctx]
		if !ok {
			panic("cannot find corresponding read state from pending map")
		}
		rss = append(rss, rs)
		if okctx == ctx {
			// 取出 reqId 相同的 ReadState 和其前面的所有 ReadState 
			found = true
			break
		}
	}

	if found {
		ro.readIndexQueue = ro.readIndexQueue[i:]
		for _, rs := range rss {
			delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
		}
		return rss
	}

	return nil
}

之后调用 newReady 会把 r.readStates 返回给应用层,应用层取出 readIndexReq 中的 commit index,等到其被 apply 到状态机就可以允许读操作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
	rd := Ready{
		Entries:          r.raftLog.unstableEntries(),
		CommittedEntries: r.raftLog.nextEnts(),
		Messages:         r.msgs,
	}
	...
	
	if len(r.readStates) != 0 {
		rd.ReadStates = r.readStates  // 附上 r.readStates
	}
	...
	return rd
}