之前分析了使用 golang 实现的 etcd-raft,这几天再读了下 rust 实现的 raft-rs,简单说下对比。rust 版应该是基于 golang 版来实现的,所有的类、方法基本上是一致的。
从样例看起,let (sender, receiver) = mpsc::channel();
创建了 channel 用于线程之间数据传递(类似 golang 的 channel)。调用 send_propose
创建一个线程,通过 sender
发送 propose 请求。main
主线程则在 loop 循环中监听 receiver
channel 的请求。如果是 propose 请求,调用 RawNode 的 propose
方法处理(其内部调用 self.raft.step
方法),如果是其他请求,直接调用 RawNode 的 step
方法处理(其内部也是调用 self.raft.step
方法)。loop 循环最后调用 on_ready
,处理 raft 层返回的 ready 对象,这个逻辑和之前 golang 实现的 etcd-raft 是很类似的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// A simple example about how to use the Raft library in Rust.
fn main() {
// Create a storage for Raft, and here we just use a simple memory storage.
// You need to build your own persistent storage in your production.
// Please check the Storage trait in src/storage.rs to see how to implement one.
let storage = MemStorage::new();
// Create the configuration for the Raft node.
let cfg = Config {
..Default::default()
};
// Create the Raft node.
let mut r = RawNode::new(&cfg, storage, vec![]).unwrap();
let (sender, receiver) = mpsc::channel();
// Use another thread to propose a Raft request.
send_propose(sender);
// Loop forever to drive the Raft.
let mut t = Instant::now();
let mut timeout = Duration::from_millis(100);
// Use a HashMap to hold the `propose` callbacks.
let mut cbs = HashMap::new();
loop {
match receiver.recv_timeout(timeout) {
Ok(Msg::Propose { id, cb }) => {
cbs.insert(id, cb);
r.propose(vec![], vec![id]).unwrap();
}
Ok(Msg::Raft(m)) => r.step(m).unwrap(),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => return,
}
let d = t.elapsed();
if d >= timeout {
t = Instant::now();
timeout = Duration::from_millis(100);
// We drive Raft every 100ms.
r.tick();
} else {
timeout -= d;
}
on_ready(&mut r, &mut cbs);
}
}
留意这里,往 sender 发了个包在 Box 里的闭包 s1.send(0).unwrap()
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fn send_propose(sender: mpsc::Sender<Msg>) {
thread::spawn(move || {
// Wait some time and send the request to the Raft.
thread::sleep(Duration::from_secs(10));
let (s1, r1) = mpsc::channel::<u8>();
println!("propose a request");
// Send a command to the Raft, wait for the Raft to apply it
// and get the result.
sender
.send(Msg::Propose {
id: 1,
// cb 为 closure
cb: Box::new(move || {
s1.send(0).unwrap();
}),
})
.unwrap();
// 当该 propose 请求被处理时,会调用 cb,往 s1 send 值,于是 r1 的 recv 会返回
let n = r1.recv().unwrap();
assert_eq!(n, 0);
println!("receive the propose callback");
});
}
1
2
3
4
5
6
7
fn on_ready(r: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeCallback>) {
let mut ready = r.ready(); // 调用 ready 方法,拿到 ready 对象
// 一波处理 忽略,处理完后也是调用了 advance 方法
...
// Advance the Raft
r.advance(ready);
}
和 golang 版最大的区别,是 golang 用了 channel 来做进行模块之间的数据通信:recvc
channel 收发请求,advancec
channel 收发 advance 消息,readyc
channel 收发 ready 对象。而 rust 中则直接调用 step
、propose
、ready
和 advance
方法来驱动状态机,没有通过 rust 的 channel 机制做消息传递。也许是考虑到效率?由于 raft-rs 提供的例子有点简单,等之后读 tikv 的代码,再来做下一步的对比。