Masutangu

长风破浪会有时 直挂云帆济沧海

也許我這一生 始終在追逐那顆九號球


etcd-raft 源码学习笔记(概览篇)

这篇文章主要整体上介绍 etcd-raft 库,包括各个类的作用,类之间的串联。不涉及 raft 算法。先来看看 etcd-raft 几个结构体的定义:

type raft struct {
	id uint64

	Term uint64
	Vote uint64

	// the log
	raftLog *raftLog

	state StateType

	// isLearner is true if the local raft node is a learner.
	isLearner bool

	votes map[uint64]bool

	msgs []pb.Message

	// the leader id
	lead uint64

	tick func()
	step stepFunc
}
type raftLog struct {
	// storage contains all stable entries since the last snapshot.
	storage Storage

	// unstable contains all unstable entries and snapshot.
	// they will be saved into storage.
	unstable unstable

	// committed is the highest log position that is known to be in
	// stable storage on a quorum of nodes.
	committed uint64
	// applied is the highest log position that the application has
	// been instructed to apply to its state machine.
	// Invariant: applied <= committed
	applied uint64
}
// unstable.entries[i] has raft log position i+unstable.offset.
// Note that unstable.offset may be less than the highest log
// position in storage; this means that the next write to storage
// might need to truncate the log before persisting unstable.entries.
type unstable struct {
	// the incoming unstable snapshot, if any.
	snapshot *pb.Snapshot
	// all entries that have not yet been written to storage.
	entries []pb.Entry
	offset  uint64
}
// node is the canonical implementation of the Node interface
type node struct {
	propc      chan msgWithResult
	recvc      chan pb.Message
	readyc     chan Ready
	advancec   chan struct{}
	tickc      chan struct{}
	done       chan struct{}
}
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
	// The current volatile state of a Node.
	// SoftState will be nil if there is no update.
	// It is not required to consume or store SoftState.
	*SoftState

	// The current state of a Node to be saved to stable storage BEFORE
	// Messages are sent.
	// HardState will be equal to empty state if there is no update.
	pb.HardState

	// ReadStates can be used for node to serve linearizable read requests locally
	// when its applied index is greater than the index in ReadState.
	// Note that the readState will be returned when raft receives msgReadIndex.
	// The returned is only valid for the request that requested to read.
	ReadStates []ReadState

	// Entries specifies entries to be saved to stable storage BEFORE
	// Messages are sent.
	Entries []pb.Entry

	// Snapshot specifies the snapshot to be saved to stable storage.
	Snapshot pb.Snapshot

	// CommittedEntries specifies entries to be committed to a
	// store/state-machine. These have previously been committed to stable
	// store.
	CommittedEntries []pb.Entry

	// Messages specifies outbound messages to be sent AFTER Entries are
	// committed to stable storage.
	// If it contains a MsgSnap message, the application MUST report back to raft
	// when the snapshot has been received or has failed by calling ReportSnapshot.
	Messages []pb.Message

	// MustSync indicates whether the HardState and Entries must be synchronously
	// written to disk or if an asynchronous write is permissible.
	MustSync bool
}

这几个结构体的关系如下图:

RaftLog 的 Storage 和 RaftNode 的 raftStorage 都是指向同一个 Storage 对象(虚线表示指针)。Storage 在 kvstore 的示例中的实现为 MemoryStorage,可以理解为 WAL 的一个内存缓存。重启时会从 WAL 恢复 MemoryStorage 的数据。整个逻辑由 Node 的 run 方法的 for loop 驱动,从 recvc channel 接收请求,调用 raft 的 Step 函数进行处理。Step 函数会调用 stepstep 是函数指针,在节点成为 leader 时将其设置为 stepLeader,节点变成 follower 时设置为 stepFollowerstep 处理 append 请求时,会调用 raftLog 的 maybeAppend 方法,最终会把 entries append 到 unstable 中。

在 Node run 方法的 for loop 中,会定期通过 newReady 函数构造 Ready 对象。Ready 包括如下:

  • HardState 即 raft 节点的 persistent state
  • SoftState 即 raft 节点的 volatile state
  • CommittedEntries 即已经 commit 的 log entries,需要应用层 apply 到状态机
  • Entries 即 unstable 中的 log entries(未落盘的 log entries)
  • Snapshot 即需要持久化的 snapshot
  • Messages 即 mailbox,所有还未发送的消息

构造好的 Ready 对象发送到 readyc channel,RaftNode 取出后会做如下处理:

  • 持久化 HardState、Entries、Snapshot 到 Storage 和 WAL (raftStorage.ApplySnapshot()raftStorage.Append()wal.Save(rd.HardState, rd.Entries) 可以看出 memoryStorage 是 wal 的缓存,写 wal 的同时也写 memoryStorage)
  • apply CommittedEntries 到状态机
  • 广播 Messages

处理完后调用 Node.Advance() 通知 Node Ready 对象处理完毕,准备好接收下一个。

最后看看驱动整个逻辑的 run 方法:

func (n *node) run(r *raft) {
	var propc chan msgWithResult
	var readyc chan Ready
	var advancec chan struct{}
	var prevLastUnstablei, prevLastUnstablet uint64
	var havePrevLastUnstablei bool
	var prevSnapi uint64
	var rd Ready

	lead := None
	prevSoftSt := r.softState()
	prevHardSt := emptyState

	for {
		if advancec != nil {
			readyc = nil
		} else {
			// 应用层通知上一个 ready 对象已经处理完毕了 此时 advancec 为 nil 
			rd = newReady(r, prevSoftSt, prevHardSt)
			if rd.containsUpdates() { // 有更新才把 readyc 设为 非空
				readyc = n.readyc
			} else {
				readyc = nil
			}
		}

		select {
		case m := <-n.recvc:
			// filter out response message from unknown From.
			if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
				r.Step(m)
			}
		case <-n.tickc:
			r.tick()
		case readyc <- rd:
			if rd.SoftState != nil {
				prevSoftSt = rd.SoftState
			}
			if len(rd.Entries) > 0 {
				prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
				prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
				havePrevLastUnstablei = true
			}
			if !IsEmptyHardState(rd.HardState) {
				prevHardSt = rd.HardState
			}
			if !IsEmptySnap(rd.Snapshot) {
				prevSnapi = rd.Snapshot.Metadata.Index
			}

			r.msgs = nil
			r.readStates = nil
			advancec = n.advancec
		case <-advancec:
			if prevHardSt.Commit != 0 {
				r.raftLog.appliedTo(prevHardSt.Commit)
			}
			// 应用层处理完了 表示 unstable 的东西不需要了 该清理就清理
			if havePrevLastUnstablei {
				r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
				havePrevLastUnstablei = false
			}
			r.raftLog.stableSnapTo(prevSnapi)
			advancec = nil
		}
	}
}

还有构造 Ready 对象的 newReady 函数:

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
	rd := Ready{
		Entries:          r.raftLog.unstableEntries(),
		CommittedEntries: r.raftLog.nextEnts(),
		Messages:         r.msgs,
	}
	if softSt := r.softState(); !softSt.equal(prevSoftSt) {
		rd.SoftState = softSt
	}
	if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
		rd.HardState = hardSt
	}
	if r.raftLog.unstable.snapshot != nil {
		rd.Snapshot = *r.raftLog.unstable.snapshot
	}
	if len(r.readStates) != 0 {
		rd.ReadStates = r.readStates
	}
	rd.MustSync = MustSync(rd.HardState, prevHardSt, len(rd.Entries))
	return rd
}
最近的文章

etcd-raft 源码学习笔记(Linearizable Read 之 ReadIndx)

这篇文章介绍 etcd-raft 如何实现 linearizable read(linearizable read 简单的说就是不返回 stale 数据,具体可以看这篇文章 《Strong consistency models》)。raft 论文第 8 节阐述了思路: Read-only operations can be handled without writing anything into the log. However, with no additional measures...…

源码阅读继续阅读
更早的文章

etcd-raft 源码学习笔记(示例篇)

本系列文章为 etcd-raft 源码阅读笔记,采用自顶向下的方式。这篇是开篇,首先来看看 etcd 提供的基于 raft 库实现的 kv store 示例,代码目录位于 contrib/raftexample。从 main 函数开始读起:func main() { ... proposeC := make(chan string) defer close(proposeC) var kvs *kvstore getSnapshot := func() ([]by...…

源码阅读继续阅读