Masutangu

长风破浪会有时 直挂云帆济沧海

也許我這一生 始終在追逐那顆九號球


raft-rust 初体验

之前分析了使用 golang 实现的 etcd-raft,这几天再读了下 rust 实现的 raft-rs,简单说下对比。rust 版应该是基于 golang 版来实现的,所有的类、方法基本上是一致的。

从样例看起,let (sender, receiver) = mpsc::channel(); 创建了 channel 用于线程之间数据传递(类似 golang 的 channel)。调用 send_propose 创建一个线程,通过 sender 发送 propose 请求。main 主线程则在 loop 循环中监听 receiver channel 的请求。如果是 propose 请求,调用 RawNode 的 propose 方法处理(其内部调用 self.raft.step 方法),如果是其他请求,直接调用 RawNode 的 step 方法处理(其内部也是调用 self.raft.step 方法)。loop 循环最后调用 on_ready,处理 raft 层返回的 ready 对象,这个逻辑和之前 golang 实现的 etcd-raft 是很类似的:

// A simple example about how to use the Raft library in Rust.
fn main() {
    // Create a storage for Raft, and here we just use a simple memory storage.
    // You need to build your own persistent storage in your production.
    // Please check the Storage trait in src/storage.rs to see how to implement one.
    let storage = MemStorage::new();

    // Create the configuration for the Raft node.
    let cfg = Config {
        ..Default::default()
    };

    // Create the Raft node.
    let mut r = RawNode::new(&cfg, storage, vec![]).unwrap();

    let (sender, receiver) = mpsc::channel();

    // Use another thread to propose a Raft request.
    send_propose(sender);

    // Loop forever to drive the Raft.
    let mut t = Instant::now();
    let mut timeout = Duration::from_millis(100);

    // Use a HashMap to hold the `propose` callbacks.
    let mut cbs = HashMap::new();

    loop {
        match receiver.recv_timeout(timeout) {
            Ok(Msg::Propose { id, cb }) => {
                cbs.insert(id, cb);
                r.propose(vec![], vec![id]).unwrap();
            }
            Ok(Msg::Raft(m)) => r.step(m).unwrap(),
            Err(RecvTimeoutError::Timeout) => (),
            Err(RecvTimeoutError::Disconnected) => return,
        }

        let d = t.elapsed();
        if d >= timeout {
            t = Instant::now();
            timeout = Duration::from_millis(100);
            // We drive Raft every 100ms.
            r.tick();
        } else {
            timeout -= d;
        }

        on_ready(&mut r, &mut cbs);
    }
}

留意这里,往 sender 发了个包在 Box 里的闭包 s1.send(0).unwrap()

fn send_propose(sender: mpsc::Sender<Msg>) {
    thread::spawn(move || {
        // Wait some time and send the request to the Raft.
        thread::sleep(Duration::from_secs(10));

        let (s1, r1) = mpsc::channel::<u8>();

        println!("propose a request");

        // Send a command to the Raft, wait for the Raft to apply it
        // and get the result.
        sender
            .send(Msg::Propose {
                id: 1,
				// cb 为 closure
                cb: Box::new(move || {
                    s1.send(0).unwrap();
                }),
            })
            .unwrap();

		// 当该 propose 请求被处理时,会调用 cb,往 s1 send 值,于是 r1 的 recv 会返回
        let n = r1.recv().unwrap();
        assert_eq!(n, 0);

        println!("receive the propose callback");
    });
}
fn on_ready(r: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeCallback>) {
    let mut ready = r.ready(); // 调用 ready 方法,拿到 ready 对象
    // 一波处理 忽略,处理完后也是调用了 advance 方法
	...
    // Advance the Raft
    r.advance(ready);
}

和 golang 版最大的区别,是 golang 用了 channel 来做进行模块之间的数据通信:recvc channel 收发请求,advancec channel 收发 advance 消息,readyc channel 收发 ready 对象。而 rust 中则直接调用 stepproposereadyadvance 方法来驱动状态机,没有通过 rust 的 channel 机制做消息传递。也许是考虑到效率?由于 raft-rs 提供的例子有点简单,等之后读 tikv 的代码,再来做下一步的对比。

最近的文章

TiKV 源码阅读(未完成)

TiKV 使用 RocksDB 做持久化存储引擎。将 key 分 range,每一段称为 Region。Region 分散在多台机器上以实现存储的水平扩展。每个 Region 会存放多个副本在不同机器上,使用 raft 算法管理:PD 负责整个 TiKV 集群的调度。TiKV 使用 version 的方式进行多版本控制(MVCC):Key1-Version3 -> ValueKey1-Version2 -> ValueKey1-Version1 -> Value...Ke...…

源码阅读继续阅读
更早的文章

etcd-raft 源码学习笔记(PreVote)

这篇文章介绍 etcd-raft 的 PreVote 机制,避免由于网络分区导致 candidate 的 term 不断增大。Election timeout 之后,发送 type 为 pb.MsgHup 的请求,进入选举阶段:// tickElection is run by followers and candidates after r.electionTimeout.func (r *raft) tickElection() { r.electionElapsed++ if r.p...…

源码阅读继续阅读