Go实现raft协议(1)
文章目录
导读
实现思路来自于:MIT 6.824课程 Lab 2: Raft
做实验前,你应该熟读raft论文,这里是: raft论文 raft译文
推荐阅读: Students’ Guide to Raft Raft Understandable Distributed Consensus
lab内容
在本实验中,您您将实现扩展论文中描述的大多数Raft设计,包括保存持久状态并在节点发生故障然后重新启动后读取它。您将不会实现群集成员身份更改(第6节)或日志压缩/快照(第7节)。
Part 2A
实现领导人选举和心跳功能(没有日志条目的AppendEntries RPC请求)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
const ( LEADER = iota + 1 CANDIDATE FOLLOWER ) const ( RPC_CALL_TIMEOUT = 1 * time.Second // 1s HEARTBEAT_INTERVAL = 50 * time.Millisecond // 50ms,限制每秒 ) // LogEntry 日志条目 type LogEntry struct { LogIndex int // 日志条目的索引值 LogTerm int // 日志条目的任期号 LogCommand interface{} // 日志条目的执行指令 } // // A Go object implementing a single Raft peer. // type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] rd *rand.Rand state int // 状态 // 所有服务器上持久存在的 currentTerm int64 // 服务器最后一次知道的任期号(初始化为 0,持续递增) votedFor int64 // 在当前获得选票的候选人的 Id log []LogEntry // 日志条目集;每一个条目包含一个用户状态机执行的指令,和收到时的任期号 // 所有服务器上经常变的 commitIndex int64 // 已知的最大的已经被提交的日志条目的索引值 lastApplied int64 // 最后被应用到状态机的日志条目索引值(初始化为 0,持续递增) // 在领导人里经常改变的 (选举后重新初始化) nextIndex []int64 // 对于每一个服务器,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值加一) matchIndex []int64 // 对于每一个服务器,已经复制给他的日志的最高索引值 // timer eTimer *time.Timer // 选举超时定时器 voteCh chan struct{} // 成功投票的信号 heartCh chan struct{} // 心跳的信号 // Your data here (2A, 2B, 2C). // Look at the paper's Figure 2 for a description of what // state a Raft server must maintain. } func (rf *Raft) getLastTerm()(lastLogTerm int64){ if len(rf.log) <= 0 { return } return rf.log[len(rf.log)-1].LogTerm } func (rf *Raft) getLastIndex()(lastLogIndex int64){ if len(rf.log) <= 0 { return } return rf.log[len(rf.log)-1].LogIndex } func (rf *Raft) isLEADER() bool { return rf.state == LEADER } func (rf *Raft) resetElectTimer() { rf.eTimer.Reset(randElectionDuration(rf.rd)) } // return currentTerm and whether this server // believes it is the LEADER. func (rf *Raft) GetState() (int, bool) { return int(rf.currentTerm), rf.state == LEADER } // 请求投票 RPC type RequestVoteArgs struct { // Your data here (2A, 2B). Term int64 // 候选人的任期号 CandidateID int64 // 请求选票的候选人的 Id LastLogIndex int64 // 候选人的最后日志条目的索引值 LastLogTerm int64 // 候选人最后日志条目的任期号 } // // example RequestVote RPC reply structure. // field names must start with capital letters! // // 请求投票 RPC 返回值 type RequestVoteReply struct { // Your data here (2A). Term int64 // 当前任期号,以便于候选人去更新自己的任期号 VoteGranted bool // 候选人赢得了此张选票时为真 } // AppendEntriesArgs 附加日志 RPC // 由领导人负责调用来复制日志指令;也会用作heartbeat type AppendEntriesArgs struct { Term int64 // 领导人的任期号 LEADERID int64 // 领导人的 Id,以便于跟随者重定向请求 PrevLogIndex int64 // 新的日志条目紧随之前的索引值 PrevLogTerm int64 // prevLogIndex 条目的任期号 Entries []LogEntry // 准备存储的日志条目(表示心跳时为空;一次性发送多个是为了提高效率) LEADERCommit int64 // 领导人领导人已经提交的日志的索引值 } // AppendEntriesReply 附加日志 RPC 返回值 type AppendEntriesReply struct { Term int64 // 当前的任期号,用于领导人去更新自己 Success bool // 跟随者包含了匹配上 prevLogIndex 和 prevLogTerm 的日志时为真 } // // example RequestVote RPC handler. // RequestVote 处理请求投票RPC // 接收者实现: // 如果term < currentTerm返回 false (论文 5.2 节) // 如果 votedFor 为空或者为 CANDIDATEId,并且候选人的日志至少和自己一样新,那么就投票给他(论文 5.2 节,5.4 节) func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { rf.mu.Lock() defer rf.mu.Unlock() reply.Term = rf.currentTerm reply.VoteGranted = false if args.Term < rf.currentTerm { return // CANDIDATE expired } //If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower if args.Term > rf.currentTerm { rf.currentTerm = args.Term rf.state = FOLLOWER rf.votedFor = -1 } // now the term are same if rf.votedFor == -1 || rf.votedFor == args.CandidateID { newestlog := false if args.LastLogTerm > rf.getLastTerm() { newestlog = true } if args.LastLogTerm == rf.getLastTerm() && args.LastLogIndex >= rf.getLastIndex() { // at least up to date newestlog = true } if newestlog { rf.state = FOLLOWER reply.VoteGranted = true rf.votedFor = args.CandidateID } } if reply.VoteGranted { // 投票成功,重置选举定时器 rf.voteCh <- struct{}{} } } // AppendEntries 附加日志 RPC: // 由领导人负责调用来复制日志指令;也会用作heartbeat // 接收者实现: // 如果 term < currentTerm 就返回 false (5.1 节) // 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false (5.3 节) // 如果已经存在的日志条目和新的产生冲突(索引值相同但是任期号不同),删除这一条和之后所有的 (5.3 节) // 附加日志中尚未存在的任何新条目 // 如果 LEADERCommit > commitIndex,令 commitIndex 等于 LEADERCommit 和 新日志条目索引值中较小的一个 func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() reply.Success = false if args.Term < rf.currentTerm { reply.Term = rf.currentTerm return } // 重置选举定时器 rf.heartCh <- struct{}{} if args.Term > rf.currentTerm { rf.currentTerm = args.Term reply.Term = rf.currentTerm rf.state = FOLLOWER rf.votedFor = -1 } } func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { ok := rf.peers[server].Call("Raft.RequestVote", args, reply) return ok } func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool { ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) return ok } // 选举过程 // 1.当跟随者在超过选举超时时间的情况之前都没有收到领导人的心跳,转变成候选人后就立即开始选举过程: // 自增当前的任期号(currentTerm) // 给自己投票 // 重置选举超时计时器 // 发送请求投票的 RPC 给其他所有服务器 func (rf *Raft) vote() { rf.mu.Lock() defer rf.mu.Unlock() rf.currentTerm++ rf.state = CANDIDATE args := RequestVoteArgs{ Term: rf.currentTerm, CandidateID: int64(rf.me), LastLogIndex:rf.getLastIndex(), LastLogTerm: rf.getLastTerm(), } replyCh := make(chan RequestVoteReply,len(rf.peers)-1) var wg sync.WaitGroup for server := range rf.peers { if server == rf.me { continue } wg.Add(1) go func(server int){ defer wg.Done() var reply RequestVoteReply respCh := make(chan struct{}) go func() { rf.sendRequestVote(server,&args,&reply) respCh<- struct{}{} }() select { case <-time.After(RPC_CALL_TIMEOUT): // 1s return case <-respCh: replyCh <- reply } }(server); } go func() { wg.Wait() close(replyCh) // avoid goroutine leak }() votes := 1 // vote by self needvotes := len(rf.peers)/2 + 1 for reply := range replyCh { if reply.Term > rf.currentTerm { // higher term LEADER rf.state = FOLLOWER rf.currentTerm = reply.Term rf.votedFor = -1 } if reply.VoteGranted { votes++ } if votes >= needvotes{ rf.state = LEADER //DPrintf("candidated:%d become leader ,votes:%d need:%d", rf.me, votes,needvotes) for i := range rf.peers { rf.nextIndex[i] = rf.getLastIndex() + 1 rf.matchIndex[i] = 0 } break } } if votes < needvotes { // 票数小于大多数,成为follower rf.state = FOLLOWER rf.votedFor = -1 } } // send heartbeat func (rf *Raft) heartbeat() { // ch := time.Tick(HEARTBEAT_INTERVAL) // for { if !rf.isLEADER() { return } for i := range rf.peers { if i == rf.me { continue } go func(server int) { rf.mu.Lock() args := AppendEntriesArgs{ Term: rf.currentTerm, LEADERID: int64(rf.me), PrevLogIndex: 0, PrevLogTerm: 0, Entries: nil, // heartbeat entries are empty } rf.mu.Unlock() var reply AppendEntriesReply rf.sendAppendEntries(server, &args, &reply) }(i) } // <-ch // } } // Make create a new Raft server instance: // peers : 所有网络节点的数组 // me : 自身在节点数组中的索引位置 func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me rf.state = FOLLOWER rf.votedFor = -1 rf.log = append(rf.log, LogEntry{LogTerm: 0}) rf.currentTerm = 0 rf.nextIndex = make([]int64,len(rf.peers)) rf.matchIndex = make([]int64,len(rf.peers)) rf.heartCh = make(chan struct{}, 1) rf.voteCh = make(chan struct{}, 1) rf.rd = rand.New(rand.NewSource(time.Now().UnixNano())) timer := randElectionDuration(rf.rd) rf.eTimer = time.NewTimer(timer) go func() { for { switch rf.state { case FOLLOWER: select { case <-rf.voteCh: case <-rf.heartCh: case <-time.After(time.Duration(rand.Intn(300)+500) * time.Millisecond): { rf.vote() } } case LEADER: rf.heartbeat() // 心跳机制 time.Sleep(HEARTBEAT_INTERVAL) } } }() // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) return rf } // randElectionDuration 从一个固定的区间(例如 150-300 毫秒)随机选择超时时间 func randElectionDuration(rd *rand.Rand) time.Duration { //var electimemax, electimemin int64 //electimemax = 300 //electimemin = 150 //return time.Millisecond * time.Duration(rd.Int63n(electimemax-electimemin)+electimemin) return time.Duration(rand.Intn(300)+500) * time.Millisecond } |
执行测试代码
1 2 3 4 5 6 7 |
zhengxuzhangde-MacBook-Pro:raft zhengxuzhang$ go test -run 2A Test (2A): initial election ... ... Passed -- 3.6 3 116 0 Test (2A): election after network failure ... ... Passed -- 5.5 3 196 0 PASS ok _/Users/zhengxuzhang/6.824/src/raft 9.133s |
Part 2B
实现对Start()的调用开始向日志添加新操作的过程;领导人将新操作发送到AppendEntries RPC中的其他服务器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
// start agreement on a new log entry: // start 开始将command追加到日志中的处理,必须马上返回,而不需要等待日志追加完成. func (rf *Raft) Start(command interface{}) (int, int, bool) { rf.mu.Lock() defer rf.mu.Unlock() index := -1 term, isLEADER := rf.GetState() if isLEADER { index = int(rf.getLastIndex() + 1) rf.log = append(rf.log, LogEntry{LogTerm: int(term),LogIndex:index, LogCommand: command}) } if isLEADER { //DPrintf("leader:%d start,index:%d term:%d,command:%v", rf.me, index,term,command) } return index, term, isLEADER } // AppendEntries 附加日志 RPC: // 由领导人负责调用来复制日志指令;也会用作heartbeat // 接收者实现: // 如果 term < currentTerm 就返回 false (5.1 节) // 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false (5.3 节) // 如果已经存在的日志条目和新的产生冲突(索引值相同但是任期号不同),删除这一条和之后所有的 (5.3 节) // 附加日志中尚未存在的任何新条目 // 如果 LEADERCommit > commitIndex,令 commitIndex 等于 LEADERCommit 和 新日志条目索引值中较小的一个 func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() reply.Success = false if args.Term < rf.currentTerm { reply.Term = rf.currentTerm reply.NextIndex = rf.getLastIndex() + 1 //DPrintf("follower:%d appendentries,args term :%d,cur term:%d ",rf.me, args.Term, rf.currentTerm) return } // 重置选举定时器 rf.heartCh <- struct{}{} if args.Term > rf.currentTerm { rf.currentTerm = args.Term reply.Term = rf.currentTerm rf.state = FOLLOWER rf.votedFor = -1 //DPrintf("follower:%d appendentries2,args term :%d,cur term:%d ",rf.me, args.Term, rf.currentTerm) } reply.Term = args.Term if args.PrevLogIndex > rf.getLastIndex() { reply.NextIndex = rf.getLastIndex() + 1 //DPrintf("append entries timeout,raft :%d, term:%d ", args.PrevLogIndex, rf.getLastIndex()) return } baseIndex := rf.log[0].LogIndex // 如果一个跟随者的日志和领导人不一致,那么在下一次的附加日志 RPC 时的一致性检查就会失败。 // 在被跟随者拒绝之后,领导人就会减小 nextIndex 值并进行重试。 // 最终 nextIndex 会在某个位置使得领导人和跟随者的日志达成一致。 // 当这种情况发生,附加日志 RPC 就会成功,这时就会把跟随者冲突的日志条目全部删除并且加上领导人的日志。 // 一旦附加日志 RPC 成功,那么跟随者的日志就会和领导人保持一致,并且在接下来的任期里一直继续保持。 // 如果需要的话,算法可以通过减少被拒绝的附加日志 RPCs 的次数来优化。 // 例如,当附加日志 RPC 的请求被拒绝的时候,跟随者可以包含冲突的条目的任期号和自己存储的那个任期的最早的索引地址。 // 借助这些信息,领导人可以减小 nextIndex 越过所有那个任期冲突的所有日志条目; // 这样就变成每个任期需要一次附加条目 RPC 而不是每个条目一次。 if args.PrevLogIndex > baseIndex { term := rf.log[args.PrevLogIndex].LogTerm if args.PrevLogTerm != term { for i := args.PrevLogIndex - 1; i >= baseIndex; i-- { if rf.log[i- baseIndex].LogTerm != term { reply.NextIndex = i + 1 return } } } } // Failure case is at above reply.Success = true if args.Entries != nil { rf.log = rf.log[:args.PrevLogIndex+1] rf.log = append(rf.log, args.Entries...) reply.NextIndex = rf.getLastIndex() + 1 } //If leaderCommit > commitIndex, set commitIndex =min(leaderCommit, index of last new entry) if args.LeaderCommit > rf.commitIndex { last := rf.getLastIndex() if args.LeaderCommit > last { rf.commitIndex = last } else { rf.commitIndex = args.LeaderCommit } rf.appendCh <- struct{}{} } reply.NextIndex = rf.getLastIndex() + 1 } // 当节点选举成为leader之后,需要调用broadcastAppendEntries()用来广播发送心跳或者追加日志。 // 接收者实现: // 如果 term < currentTerm 就返回 false (5.1 节) // 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false (5.3 节) // 如果已经存在的日志条目和新的产生冲突(索引值相同但是任期号不同),删除这一条和之后所有的 (5.3 节) // 附加日志中尚未存在的任何新条目 // 如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和 新日志条目索引值中较小的一个 func (rf *Raft) broadcastAppendEntries() { if rf.state != LEADER { return } //If there exists an N such that N > commitIndex, a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N N := rf.commitIndex baseIndex := rf.log[0].LogIndex for i := rf.commitIndex + 1; i <= rf.getLastIndex(); i++ { num := 1 for j := range rf.peers { if j != rf.me && rf.matchIndex[j] >= i && rf.log[i - baseIndex].LogTerm == rf.currentTerm { num++ } } if 2 * num > len(rf.peers) { N = i } } if N != rf.commitIndex { rf.commitIndex = N rf.appendCh <- struct{}{} } for i := range rf.peers { if i == rf.me { continue } go func(server int){ rf.mu.Lock() if rf.state != LEADER { rf.mu.Unlock() return } if rf.nextIndex[server] > rf.getLastIndex() { rf.nextIndex[server] = rf.getLastIndex() + 1 } nextIndex := rf.nextIndex[server] entries := make([]LogEntry, 0) entries = append(entries, rf.log[nextIndex:]...) //DPrintf("----- entries:%v ", entries) args := AppendEntriesArgs{ Term: rf.currentTerm, LeaderID: rf.me, PrevLogIndex: nextIndex-1, PrevLogTerm: rf.log[nextIndex-1].LogTerm, Entries: entries, LeaderCommit: rf.commitIndex, } rf.mu.Unlock() reply := &AppendEntriesReply{} ok := rf.sendAppendEntries(server, &args, reply) if !ok { //DPrintf("sendAppendEntries fail,request args Term:%d, LeaderId:%d ", args.Term, args.LeaderID) return } rf.mu.Lock() if rf.state != LEADER || rf.currentTerm != args.Term { rf.mu.Unlock() return } rf.mu.Unlock() if reply.Success { rf.mu.Lock() rf.nextIndex[server] += int(len(args.Entries)) rf.matchIndex[server] = rf.nextIndex[server] - 1 rf.mu.Unlock() }else { if rf.currentTerm < reply.Term { rf.mu.Lock() rf.currentTerm = reply.Term rf.state = FOLLOWER rf.votedFor = -1 rf.mu.Unlock() }else { rf.nextIndex[server] = reply.NextIndex } } }(i) } } // Make create a new Raft server instance: // peers : 所有网络节点的数组 // me : 自身在节点数组中的索引位置 func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me rf.state = FOLLOWER rf.lastApplied = 0 rf.votedFor = -1 rf.log = append(rf.log, LogEntry{LogTerm: 0}) rf.currentTerm = 0 rf.nextIndex = make([]int,len(rf.peers)) rf.matchIndex = make([]int,len(rf.peers)) rf.heartCh = make(chan struct{}, 10) rf.voteCh = make(chan struct{}, 10) rf.appendCh = make(chan struct{}, 10) go func() { for { switch rf.state { case FOLLOWER: select { case <-rf.voteCh: //DPrintf("follower:%d get vote cancle ", rf.me) case <-rf.heartCh: //DPrintf("follower:%d get herat cancle ", rf.me) case <-time.After(time.Duration(rand.Intn(300)+500) * time.Millisecond): { //DPrintf("follower:%d vote timeout,term:%d ", rf.me, rf.currentTerm) rf.vote() } } case LEADER: rf.broadcastAppendEntries() // 心跳或追加日志 time.Sleep(HEARTBEAT_INTERVAL) } } }() go func(){ for { select { case <-rf.appendCh: rf.mu.Lock() commitIndex := rf.commitIndex baseIndex := rf.log[0].LogIndex for i := rf.lastApplied + 1; i <= commitIndex; i++ { msg := ApplyMsg{CommandIndex: i, Command: rf.log[i - baseIndex].LogCommand,CommandValid :true} applyCh <- msg //DPrintf("follower:%d appendCh index:%d,msg:%v ",rf.me, rf.commitIndex,msg) rf.lastApplied = i } rf.mu.Unlock() } } }() // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) return rf } |
执行测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
go test -run 2B Test (2B): basic agreement ... ... Passed -- 0.9 5 32 3 Test (2B): agreement despite follower disconnection ... ^Csignal: interrupt FAIL _/Users/zhengxuzhang/6.824/src/raft 4.485s zhengxuzhangde-MacBook-Pro:raft zhengxuzhang$ go test -run 2B Test (2B): basic agreement ... ... Passed -- 0.9 5 32 3 Test (2B): agreement despite follower disconnection ... ... Passed -- 6.1 3 198 7 Test (2B): no agreement if too many followers disconnect ... ... Passed -- 4.5 5 288 4 Test (2B): concurrent Start()s ... ... Passed -- 1.0 3 24 6 Test (2B): rejoin of partitioned leader ... ... Passed -- 4.7 3 262 4 Test (2B): leader backs up quickly over incorrect follower logs ... ... Passed -- 15.8 5 2153 102 Test (2B): RPC counts aren't too high ... ... Passed -- 2.7 3 86 12 PASS ok _/Users/zhengxuzhang/6.824/src/raft 35.714s |
Part 2C
实现Raft的持久状态,以便在服务器重启的情况下恢复服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
func (rf *Raft) persist() { w := new(bytes.Buffer) e := gob.NewEncoder(w) e.Encode(rf.currentTerm) e.Encode(rf.votedFor) e.Encode(rf.log) data := w.Bytes() rf.persister.SaveRaftState(data) } // // restore previously persisted state. // func (rf *Raft) readPersist(data []byte) { if data == nil || len(data) < 1 { return } r := bytes.NewBuffer(data) d := gob.NewDecoder(r) d.Decode(&rf.currentTerm) d.Decode(&rf.votedFor) d.Decode(&rf.log) } func (rf *Raft) Start(command interface{}) (int, int, bool) { ... if isLEADER { ... rf.persist() } return index, term, isLEADER } func (rf *Raft) broadcastAppendEntries() { defer rf.persist() ... } func (rf *Raft) vote() { rf.mu.Lock() defer rf.mu.Unlock() defer rf.persist() ... } func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { rf.mu.Lock() defer rf.mu.Unlock() defer rf.persist() ... } func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { rf.mu.Lock() defer rf.mu.Unlock() defer rf.persist() ... } func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me rf.state = FOLLOWER rf.lastApplied = 0 rf.votedFor = -1 rf.log = append(rf.log, LogEntry{LogTerm: 0}) rf.currentTerm = 0 rf.nextIndex = make([]int,len(rf.peers)) rf.matchIndex = make([]int,len(rf.peers)) rf.heartCh = make(chan struct{}, 10) rf.voteCh = make(chan struct{}, 10) rf.appendCh = make(chan struct{}, 10) // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) for i := range rf.peers { rf.nextIndex[i] = len(rf.log) // initialized to leader last log index + 1 } ... } |
执行测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
go test -run 2C Test (2C): basic persistence ... ... Passed -- 4.7 3 430 6 Test (2C): more persistence ... ... Passed -- 35.2 5 3690 18 Test (2C): partitioned leader and one follower crash, leader restarts ... ... Passed -- 2.0 3 72 4 Test (2C): Figure 8 ... ... Passed -- 107.5 5 176193 23 Test (2C): unreliable agreement ... ... Passed -- 5.6 5 408 246 Test (2C): Figure 8 (unreliable) ... ... Passed -- 13.8 5 1827 50 Test (2C): churn ... ... Passed -- 16.2 5 1728 117 Test (2C): unreliable churn ... ... Passed -- 17.0 5 1396 189 PASS ok _/Users/zhengxuzhang/6.824/src/raft 202.041s |
文章作者 zhengxz
上次更新 2019-12-22