9 minute read

MIT6.824 Lab2 - Raft


写在前面

最终我还是决定写包括整个Lab2的总结而不是分成四个部分ABCD来写。其原因也很简单,在我写完2A的时候,我认为我的代码非常好用,然后我写了一篇总结,包括我的思路、代码结构这样的,相当志得意满。结果在写2B的时候,发现原来2A写的代码还是有不少bug的,2A的三个测试点并不能很好的测试出所有的corner case,能通过2A并不代表Leader Election部分的代码就完全没有问题。在辛苦debug完2B之后呢,我的与2A相关的代码已经修改了很多了,那篇2A的总结已经没有什么参考价值了。

后面果不其然,每写完一个部分,在之前认为”天衣无缝”的代码中总能找到不足之处和bug,所以,在完全写完这个Lab后,站在回归的角度再来写一个总结我认为是比较合理的。

我写这个Lab2,前后满打满算应该是花了十二天左右,因为是寒假,所以是几乎全天并且通宵高强度写这个Lab,看视频和论文两天 + Lab2A两天 + Lab2B四天 + Lab2C两天 + Lab2D三天。每次写完一个部分的Lab,我都得休息个两三天,看看《绝命毒师》放松一下,我的性格可能比较偏执,容易被bug一卡直接卡到通宵到第二天7点以后。所以每次做完Lab,晚上躺在床上都时不时要起身看看电脑上在跑的测试脚本,看到没有红色才能安心睡觉。 PS.附上做2C的时候床上拍的照片留念一下~

image-20240214004423578

在写这个Lab之前,我从来没有写过golang,之前一直是一个Java-curd-noob,也没有什么多线程并发编程的经验,就在这个Lab中基本没有使用什么并发原语工具了,几乎都是Sleep个一段时间然后执行对应逻辑,也很少用到channel,全靠一个全局锁 XD。

一些建议

  1. 尽量少去网上找他人的代码来看

    或者说主要借鉴一下代码结构就行,不要照着一个博客之类的猛抄,就像我上面说到的,作者可能写这个Part的时候觉得没啥问题,结果在后面就会导致错误。如果在debug的时候,需要对照一下,那也要仔细找找优质的博客,网上关于这个Lab的质量良莠不齐,有的人代码都写的一塌糊涂博客也发的很带劲…

    关于debug,前两个part我十分推荐直接去看这个演示网站 https://raft.github.io/raftscope/index.html ,关于一些边界条件的判断,可以在这里找到正确的动画演示,它的正确性是值得信赖的,可以说是十分有用了!

    还是推荐一个我认为质量非常高的博客:https://github.com/OneSizeFitsQuorum/MIT6.824-2021

  2. 多看论文,严格遵循Figure 2

    我是学长推荐我论文也要看一遍下来再动工,确实,看完前5个Section再开始写代码,思路还是蛮清晰的,能够认识到下一步要做啥

  3. 学会使用助教提供的”符合人体工学”的日志工具

    https://blog.josejg.com/debugging-pretty/ 他在他的网站上面已经讲得十分详细了,基本直接复制代码,装好环境就行。

最后,放上我的四个部分合一的最终测试结果。

我的一轮测试需要花费7分多钟,其实在性能上是比较差劲的,主要表现在2D部分,比别人慢了很多,大概跟我用了很多Sleep有关吧,不过前后跑了百来趟都没出问题,我的Raft也算是相当稳固了。

image-20240209135422516


Lab2A-Leader Election

任务

  1. 完成RequestVote

  2. 完成不带日志的AppendEntries作为心跳检测。

  3. 完成ticker函数作为定时器,检测raft peer状态并作出对应操作如发起选举、发起心跳等

ticker

这个计时器部分感觉是2A中最需要考虑的部分了,我自己实现之后去网上看了好多其他人的实现方案,都各不相同,很难说谁的更好,我去看了etcd的源码,感觉有点复杂,加上我并不太熟练go的并发原语,很多看起来很优雅的操作也不太敢尝试,所以我在这部分的实现应该是不太好的,不过课的TA说不用太在意CPU的效率,我很多操作就能省则省了,程序就旨在好理解、好维护吧,效率应该是不太行的。

官方在代码注释中提到:

The easiest way to do this is to create a goroutine with a loop that calls time.Sleep();

Don’t use Go’s time.Timer or time.Ticker, which are difficult to use correctly.

所以我就不打算使用那些复杂的倒计时器之类的东西了,看了看ticker函数中大概的结构,就是每隔一个比较小的时间tick,简单的判断一下各个指标是否超时,超时就执行对应的操作。每个tick间隔就直接让这个ticker线程Sleep过去就行,我直接设置的15秒作为一个tick的间隔。

  • 如何判断是否超时?

我为了节省我大脑贫瘠的CPU算力,就用了我认为最好理解的一种方式了,即对于raft中三个超时分别维护两个变量,一个记录上次发生的时间,一个记录这次发生所拥有的时限timeout。

func (rf *Raft) ticker() {
	for !rf.killed() {
		rf.mu.Lock()
		if rf.state == FollowerState {
			if time.Since(rf.lastHeartbeaten) > rf.heartBeatenTimeout {
				PrettyDebug(dTimer, "S%d's heartbeaten timeout, be candidate, Term:%d", rf.me, rf.currentTerm)
				rf.state = CandidateState
			}
		}
		if rf.state == CandidateState {
			if time.Since(rf.lastElection) > rf.electionTimeout {
				PrettyDebug(dTimer, "S%d's election timeout, raising an election, Term:%d", rf.me, rf.currentTerm)
				go rf.raiseElection()
			}
		}
		if rf.state == LeaderState {
			if time.Since(rf.lastBroadcast) > rf.broadcastTimeout {
				PrettyDebug(dTimer, "S%d's broadcast timeout, broadcasting for Term:%d", rf.me, rf.currentTerm)
				go rf.raiseBroadcast(rf.currentTerm)
			}
		}
		rf.mu.Unlock()
		// sleep for a tick
		tick := 15 * time.Millisecond
		time.Sleep(tick)
	}
}
// 一次选举的超时时间
// 只有在收到来自可以确认的Leader的心跳的时候才能刷新这个时长
func (rf *Raft) resetHeartbeatenTimeout() {
	rf.lastHeartbeaten = time.Now()
	rf.heartBeatenTimeout = time.Duration(rand.Intn(150)+300) * time.Millisecond
}
// 一次选举的超时时间
func (rf *Raft) resetElectionTimeout() {
	rf.lastElection = time.Now()
	rf.electionTimeout = time.Duration(rand.Intn(200)+300) * time.Millisecond
}
// 主动发起心跳的时间间隔
func (rf *Raft) resetBroadcastTimeout() time.Duration {
	rf.broadcastTimeout = time.Duration(100) * time.Millisecond
	return rf.broadcastTimeout
}

用我这种方法,时间区间就有一点要求了,在一次心跳超时的时间中,需要能够发送三次以上的心跳,才能比较稳定的通过测试。

RequestVote

就严格按照Figure 2即可,对于第二个点后半句话关于日志至少一样新的条件,在2A里面并不会检测到,因为涉及到log,2A里面并不需要log

image-20240214023402143

有一点图中没有给出但在论文中有提到的是:

Current terms are exchanged whenever servers communicate;

if one server’s current term is smaller than the other’s, then it updates its current term to the larger value.

If a candidate or leader discovers that its term is out of date, it immediately reverts to follower state.

不管peer之间以什么方式交流,当发现有更高的Term,就需要更新自己的Term,更新完之后必定是Follower状态。所以不仅在心跳的时候要更新状态,requestVote也需要。

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).
	// 2A
	rf.mu.Lock()
	defer rf.mu.Unlock()

	if rf.killed() {
		reply.Term = 0
		reply.VoteGranted = false
		return
	}
	reply.VoteGranted = false
	reply.Term = rf.currentTerm
	if args.Term < rf.currentTerm {
		// just end
		return
	}
	if args.Term > rf.currentTerm {
		rf.updateTermPassively(args.Term)
	}
	reply.Term = rf.currentTerm
	// never voted OR has voted to RPC's original peer
	if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
		lastLogIndex, lastLogTerm := rf.getLastLogInfo()
		if lastLogTerm < args.LastLogTerm ||
			(lastLogTerm == args.LastLogTerm && lastLogIndex <= args.LastLogIndex) {
			reply.VoteGranted = true
			rf.votedFor = args.CandidateId
			// heartbeaten only if voted for candidate
			rf.state = FollowerState
			rf.resetHeartbeatenTimeout()
			rf.persist()
		}
	} 
}

Raise Election

接收方上面已经基本做好了,接下来就是完成发送方,一般只有在发起选举的时候才会发送Request Vote,而只有当peer失去和Leader的心跳联系时,它应该认为已经没有Leader了,所以他就会成为一个Candidate,开始发起选举。

发起选举首先是对自己做一些update操作,大概就是

  1. 更新lastElection和electionTimeout
  2. currentTerm++
  3. change state to Candidate
  4. 给自己投票
  5. 给其他peer发RPC

这边有一个需要注意的地方,就是论文中提到的:

A candidate continues in this state until one of three things happens:

(a) it wins the election

(b) another server establishes itself as leader

(c) a period of time goes by with no winner

实际上我认为不需要做这么多判断

  • 首先我们不需要做超时重试这个举动,因为超时之后ticker会检测到,自己重新发起一轮选举,在论文中提到发生split之后会each candidate will time out and start a new election by incrementing its term and initiating another round of RequestVote RPCs.,所以超时重试是没有必要的,不需要自己来实现一个重试的操作。(至少我是没这么做)
  • 对于b条件,我认为没必要显式的检测它的发生,换个角度,当peer收到其他更高任期的RPC的时候(有很多可能,但如论文所说“whenever communicate”)需要回退到Follower状态,并且服从收到的高的任期Term。所以当这轮选举获胜之后,对当前状态做一个判断,判断自己是否还处于这次选举中(term == currentTerm && state == Candidate)。如果check失败,说明这次选举已经因为某种原因结束或者失败了。
func (rf *Raft) raiseElection() {

	rf.mu.Lock()
	defer rf.mu.Unlock()
	if rf.state == LeaderState {
		return
	}

	rf.resetElectionTimeout()
	rf.currentTerm++
	rf.state = CandidateState
	rf.votedFor = rf.me

	rf.persist()

	cnt := &Counter{
		counter: 0,
		mu:      sync.Mutex{},
	}
	cnt.Increment()
	need := len(rf.peers)/2 + 1
	lastLogIndex, lastLogTerm := rf.getLastLogInfo()
	args := &RequestVoteArgs{
		Term:         rf.currentTerm,
		CandidateId:  rf.me,
		LastLogIndex: lastLogIndex,
		LastLogTerm:  lastLogTerm,
	}
	var once = sync.Once{}

	// call each peer
	for peer := range rf.peers {
		if peer == rf.me {
			continue
		}
		go rf.requestVoteHandler(cnt, args, peer, need, rf.me, &once)
	}
}

func (rf *Raft) requestVoteHandler(cnt *Counter, args *RequestVoteArgs, peer int, need int, me int, once *sync.Once) {
	reply := &RequestVoteReply{}
	ok := rf.sendRequestVote(peer, args, reply)
	if ok {
		rf.mu.Lock()
		if reply.VoteGranted {
			// It should have happened only once?
			if cnt.IncrementAndGet() >= need {
				once.Do(func() {
					// become leader
					if rf.state == CandidateState && rf.currentTerm == args.Term {
						rf.state = LeaderState
						// 2B: initializeLogs when being leader
						lastLogIndex, _ := rf.getLastLogInfo()
						for peer := range rf.peers {
							rf.nextIndex[peer] = lastLogIndex + 1
							rf.matchIndex[peer] = 0
						}
						go rf.raiseBroadcast(rf.currentTerm)
					}
				})
			}
		}
		if reply.Term > args.Term {
			PrettyDebug(dVote, "S%d receive higher term(%d > %d) from S%d", me, reply.Term, args.Term, peer)
			if rf.state == CandidateState && rf.currentTerm == args.Term {
				rf.updateTermPassively(reply.Term)
				cnt.Clear()
			}
		}
		rf.mu.Unlock()
	}
}

Append Entries

在2A中,这个RPC只需要完成很简单的判断就行,就是把它当成一个心跳而已,这边是需要在成为Leader的第一时间发送AE的,才能保证很快的选出Leader。

逻辑简单,记得刷新heartbeat时间相关数据就行。遇到什么问题就再好好看看Figure 2就行。

2A测试详解

第一个测试叫initial election,三个raft,检查初始化选举,保证只有一个领导,同时选举成功后一段时间不会触发新选举,总而言之是测试raft稳态

第二个测试叫election after network failure,三个raft,会先后分别测试领导挂机、旧领导恢复上线、一个领导和一个马仔挂机、马仔恢复上线、旧领导恢复上线,总而言之是测试回合值更新和判断和选举策略

第三个测试叫multiple elections,七个raft,九次迭代,每次迭代都会随机选三个raft挂机,然后再让挂机的raft恢复连接,保证过程中最多一个领导

image-20240128185525558


Lab2B-Log Replicated

2B比较难,bug也会比较多,不能着急,慢慢调~

一步一步写吧

AppendEntries RPC

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
  
	reply.Term = rf.currentTerm
	reply.Success = false
	// 1. Reply false if term < currentTerm
	if args.Term < rf.currentTerm {
		return
	}
	// 选举的时候发现已经有Leader,此时可能是竞争选举,认为自己失败,退回Follower
	if args.Term == rf.currentTerm && rf.state == CandidateState {
		rf.updateTermPassively(rf.currentTerm)
	}

	if args.Term > rf.currentTerm {
		rf.updateTermPassively(args.Term)
		rf.leaderId = args.LeaderId
	}

	// 此时可以确认对方是可以确认的Leader,刷新
	rf.resetHeartbeatenTimeout()

	// 2B
	reply.Term = rf.currentTerm
	lastLogIndex, _ := rf.getLastLogInfo()

	if args.PrevLogIndex < rf.lastIncludedIndex {
		reply.Success = false
		reply.XTerm = -1
		reply.XLen = lastLogIndex
		return
	}

	// 2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm
	// 先匹配上相交点
	if args.PrevLogIndex > lastLogIndex {
		reply.XLen = lastLogIndex
		reply.XTerm = -1
		return
	}
	var matchingTerm int
	if args.PrevLogIndex == rf.lastIncludedIndex {
		matchingTerm = rf.lastIncludedTerm
	} else {
		matchingTerm = rf.getLog(args.PrevLogIndex).Term
	}
	if matchingTerm != args.PrevLogTerm && args.PrevLogTerm != 0 {
		reply.XLen = lastLogIndex
		reply.XTerm = matchingTerm
		reply.XIndex, _ = rf.getLogIndexesWithTerm(reply.XTerm)
		return
	}
	reply.Success = true
	// prev log check pass, recognize each's logs before previndex are consist
	// 从PrevLogIndex开始,找出自己的日志中与Leader对应位置处不一致的日志,将此日志后续的所有日志删除,并将新的日志添加到后面
	for i, entry := range args.Entries {
		idx := args.PrevLogIndex + i + 1
		lastLogIndex, _ = rf.getLastLogInfo()
		if idx == lastLogIndex+1 {
			// 4. Append any new entries not already in the log
			rf.logs = append(rf.logs, entry)
		}
		if rf.getLog(idx).Term != entry.Term {
			// 3. If an existing entry conflicts with a new one (same index but different terms),
			//    delete the existing entry and all that follow it
			rf.logs = rf.copyLogs(rf.lastIncludedIndex+1, idx)
			// 4. Append any new entries not already in the log
			rf.logs = append(rf.logs, args.Entries[i:]...)
			break
		}
	}
	// 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
	if args.LeaderCommit > rf.commitIndex {
		rf.commitIndex = min(args.LeaderCommit, args.PrevLogIndex+len(args.Entries))
	}
	rf.persist()
}

一个值得注意的地方,也是我在2D才发现的,也可能只有我会犯这种毛病。就是在填写reply的Term的时候,如果发现被动的UpdateTerm了,要把这个Term更新到reply中,我之前一直没做这一步,到了2D才发现的,导致了迟迟无法达成一致。

发送方 Leader逻辑处理

func (rf *Raft) appendEntriesHandler(peer int, term int, args *AppendEntriesArgs) {
	reply := &AppendEntriesReply{}
	ok := rf.sendAppendEntries(peer, args, reply)
	if !ok {
		return
	}
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if rf.state != LeaderState {
		return
	}
	if reply.Term < rf.currentTerm {
		return
	}

	if reply.Term > rf.currentTerm {
		rf.updateTermPassively(reply.Term)
		return
	}
	// reply.Term == currentTerm
	if reply.Success {
		// Eventually nextIndex will reach a point where the leader and follower logs match.
		// the follower’s log is consistent with the leader’s
		peerNextIndex := args.PrevLogIndex + len(args.Entries) + 1
		peerMatchIndex := args.PrevLogIndex + len(args.Entries)
		rf.nextIndex[peer] = max(rf.nextIndex[peer], peerNextIndex)
		rf.matchIndex[peer] = max(rf.matchIndex[peer], peerMatchIndex)
		// update leader's commitIndex
		// by calculate the index of majority could keep up with
		sortedMatchIndex := make([]int, 0)
		lastLogIndex, _ := rf.getLastLogInfo()
		sortedMatchIndex = append(sortedMatchIndex, lastLogIndex)
		for i := 0; i < len(rf.peers); i++ {
			if i == rf.me {
				continue
			}
			sortedMatchIndex = append(sortedMatchIndex, rf.matchIndex[i])
		}
		sort.Ints(sortedMatchIndex)
		newCommitIndex := sortedMatchIndex[len(rf.peers)/2]
		if newCommitIndex >= rf.commitIndex && rf.getLog(newCommitIndex).Term == rf.currentTerm {
			rf.commitIndex = newCommitIndex
		}
	} else {
		// 2C optimize log catch up
		if reply.XTerm == -1 {
			rf.nextIndex[peer] = reply.XLen + 1
		} else {
			_, lastIndex := rf.getLogIndexesWithTerm(reply.XTerm)
			if lastIndex == -1 {
				// 没有这个term的
				rf.nextIndex[peer] = reply.XIndex
			} else if lastIndex > 0 {
				rf.nextIndex[peer] = lastIndex
			}
		}
		if rf.nextIndex[peer] > 1 {
			rf.nextIndex[peer]--
		}
	}
}

统一发送AppendEntries给所有peer

func (rf *Raft) raiseBroadcast(term int) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if term != rf.currentTerm {
		return
	}
	if rf.state != LeaderState {
		return
	}
	for peer := range rf.peers {
		if peer == rf.me {
			continue
		}
		preLogIndex := rf.nextIndex[peer] - 1
		lastLogIndex, _ := rf.getLastLogInfo()
		if preLogIndex < rf.lastIncludedIndex { // peer落后太多了,已经在snapshot里了,只能直接发动installSnapshot了
			args := &InstallSnapshotArgs{
				Term:              rf.currentTerm,
				LeaderId:          rf.me,
				LastIncludedIndex: rf.lastIncludedIndex,
				LastIncludedTerm:  rf.lastIncludedTerm,
				Data:              rf.persister.snapshot,
			}
			go rf.installSnapshotHandler(args, peer)
		} else {
			args := &AppendEntriesArgs{
				Term:         term,
				LeaderId:     rf.me,
				PrevLogIndex: preLogIndex,
				PrevLogTerm:  rf.getLog(preLogIndex).Term,
				LeaderCommit: rf.commitIndex,
				Entries:      rf.getLogs(preLogIndex+1, lastLogIndex+1),
			}
			go rf.appendEntriesHandler(peer, term, args)
		}
	}
	rf.lastBroadcast = time.Now()
	rf.resetBroadcastTimeout()
}

Apply Log

这一步的意思是把达成共识的Log,发送到状态机中。

func (rf *Raft) applyLogHandler(applyCh chan ApplyMsg) {
	for !rf.killed() {
		time.Sleep(10 * time.Millisecond)

		appliedMsgs := []ApplyMsg{}
		rf.mu.Lock()
		for rf.commitIndex > rf.lastApplied {
			rf.lastApplied++
			if rf.lastApplied <= rf.lastIncludedIndex {
				continue
			}
			applied := ApplyMsg{
				CommandValid: true,
				Command:      rf.getLog(rf.lastApplied).Command,
				CommandIndex: rf.lastApplied,
			}
			appliedMsgs = append(appliedMsgs, applied)
		}
		rf.mu.Unlock()

		for _, msg := range appliedMsgs {
			applyCh <- msg
		}
	}
}

一个注意的点就是不要在锁里面发东西到Channel里,会有一些阻塞的问题。

image-20240129113922052

Lab2C-Persistence

最简单的一集,做好持久化就行

func (rf *Raft) persist() {
	// Your code here (2C).
	PrettyDebug(dPersist, "S%d Saving persistent state to stable storage at Term:%d.", rf.me, rf.currentTerm)
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(rf.currentTerm)
	e.Encode(rf.votedFor)
	e.Encode(rf.logs)
	e.Encode(rf.lastIncludedIndex)
	e.Encode(rf.lastIncludedTerm)
	data := w.Bytes()
	rf.persister.SaveRaftState(data)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	// Your code here (2C).
	PrettyDebug(dPersist, "S%d Restoring previously persisted state at Term:%d.", rf.me, rf.currentTerm)
	r := bytes.NewBuffer(data)
	d := labgob.NewDecoder(r)
	rf.mu.Lock()
	defer rf.mu.Unlock()
	d.Decode(&rf.currentTerm)
	d.Decode(&rf.votedFor)
	d.Decode(&rf.logs)
	d.Decode(&rf.lastIncludedIndex)
	d.Decode(&rf.lastIncludedTerm)
}

有个点,就是在Raft的Make函数里面,不仅要ReadPersistence,还要恢复一下nextIndex[]的状态。

	// 2C
	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())
	lastLogIndex, _ := rf.getLastLogInfo()
	for peer := range rf.peers {
		rf.nextIndex[peer] = lastLogIndex + 1
	}

image-20240214032650423

Lab2D-Snapshot

日志快速追赶

加入日志压缩功能后,需要注意及时对内存中的 entries 数组进行清除。即使得废弃的 entries 切片能够被正常 gc,从而避免内存释放不掉并最终 OOM 的现象出现。

可以参考上面的代码,就是在AppendEntries相关的部分。

这边最难受的就是,需要对LogEntry[]做一个包装的操作,我之前没有做,访问指定Index的Log,就只是去切片里面直接访问,后面需要对整体代码做重构,期间出了好多好多好多好多好多的Bug T_T

Entry相关

这边感觉写的真的很烂,但是能跑就是了,能写好还是要写好的,能少很多bug。

type Entry struct {
	Term    int
	Command interface{}
}

// index = lastIncludedIndex + index
func (rf *Raft) getLog(index int) *Entry {
	i := index - rf.lastIncludedIndex
	return &rf.logs[i]
}

func (rf *Raft) getLogs(startIndex, endIndex int) []Entry {
	slice := []Entry(nil)
	for i := startIndex; i < endIndex; i++ {
		slice = append(slice, rf.logs[i-rf.lastIncludedIndex])
	}
	return slice
}

func (rf *Raft) copyLogs(startIndex, endIndex int) []Entry {
	slice := []Entry(nil)
	slice = append(slice, Entry{0, nil})
	for i := startIndex; i < endIndex; i++ {
		slice = append(slice, rf.logs[i-rf.lastIncludedIndex])
	}
	return slice
}

// Need Lock
// return last's index and term
// index = 0 means no logs
func (rf *Raft) getLastLogInfo() (index int, term int) {
	if len(rf.logs) <= 1 {
		return rf.lastIncludedIndex, rf.lastIncludedTerm
	}
	lastLogIndex := len(rf.logs) - 1 + rf.lastIncludedIndex
	lastLogTerm := rf.logs[len(rf.logs)-1].Term
	return lastLogIndex, lastLogTerm
}

func (rf *Raft) getLogIndexesWithTerm(term int) (firstIdx, lastIdx int) {
	if term == 0 {
		return 0, 0
	}
	first, last := math.MaxInt, -1
	if rf.lastIncludedTerm == term {
		first = min(first, rf.lastIncludedIndex)
		last = max(last, rf.lastIncludedIndex)
	}
	for i := 1; i < len(rf.logs); i++ {
		if rf.logs[i].Term == term {
			first = min(first, i+rf.lastIncludedIndex)
			last = max(last, i+rf.lastIncludedIndex)
		}
	}
	if last == -1 {
		return -1, -1
	}
	return first, last
}

Snapshot

func (rf *Raft) Snapshot(index int, snapshot []byte) {
	// Your code here (2D).
	rf.mu.Lock()
	defer rf.mu.Unlock()

	lastLogIndex, _ := rf.getLastLogInfo()
	if rf.lastIncludedIndex >= index {
		return
	}

	rf.lastIncludedTerm = rf.getLog(index).Term
	rf.logs = rf.copyLogs(index+1, lastLogIndex+1)
	rf.lastIncludedIndex = index

	rf.persistAndSnapshot(snapshot)
}

Install Snapshot RPC

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	if args.Term < rf.currentTerm {
		reply.Term = rf.currentTerm
		return
	}

	if args.Term > rf.currentTerm {
		rf.updateTermPassively(args.Term)
	}
	reply.Term = rf.currentTerm
	rf.resetHeartbeatenTimeout()

	if rf.commitIndex >= args.LastIncludedIndex {
		return
	}

	// 将快照后的logs切割,快照前的直接applied
	index := args.LastIncludedIndex
	tempLogs := make([]Entry, 0)
	tempLogs = append(tempLogs, Entry{0, nil})
	lastLogIndex, _ := rf.getLastLogInfo()

	if index < lastLogIndex {
		// 快照并不能包含接收者的所有的日志条目
		// 把快照包含的条目都裁剪掉,留下快照不包含的
		tempLogs = append(tempLogs, rf.getLogs(index+1, lastLogIndex+1)...)
	}

	rf.lastIncludedIndex = index
	rf.lastIncludedTerm = args.LastIncludedTerm
	rf.logs = tempLogs
	rf.lastApplied = rf.lastIncludedIndex
	rf.commitIndex = rf.lastIncludedIndex
	rf.persistAndSnapshot(args.Data)

	msg := ApplyMsg{
		SnapshotValid: true,
		Snapshot:      args.Data,
		SnapshotTerm:  rf.lastIncludedTerm,
		SnapshotIndex: rf.lastIncludedIndex,
	}
	go func(msg ApplyMsg) {
		rf.applyChannel <- msg
	}(msg)
}

Handler

func (rf *Raft) installSnapshotHandler(args *InstallSnapshotArgs, peer int) {
	reply := &InstallSnapshotReply{}
	ok := rf.sendInstallSnapshot(peer, args, reply)
	if ok {
		rf.mu.Lock()
		defer rf.mu.Unlock()

		if rf.currentTerm != args.Term {
			return
		}
		if rf.currentTerm < reply.Term {
			rf.updateTermPassively(reply.Term)
			return
		}

		rf.matchIndex[peer] = max(args.LastIncludedIndex, rf.matchIndex[peer])
		rf.nextIndex[peer] = max(args.LastIncludedIndex+1, rf.nextIndex[peer])
	}
}

func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
	return true
}

在2022的版本中,Lab官方已经不推荐实现CondInstallSnapshot了,只要简单返回true即可。


2024-02-14

就先这样吧,后面有时间再补充了。

把Lab2写完,确实感觉收获不少,但是已经2月中旬了,马上也要开始准备三月份的面试了,算法也没刷多少,八股也没准备,感慨自己醒悟太晚,大一大二浪费了很多时间,心里还是很迷茫的。希望能好运吧。

补充

幽灵复现问题

大概就是下面图片表示的,可能在某些场景下长时间无法提供正确的读服务。如果你有仔细观察过raft运行的时候的日志,就能发现这个问题了。

大概描述起来就是,Leader在一个高的Term时有很多低Term的Log还没有被commit,因为raft不允许这个操作。

image.png

一个解决方法就是在Leader当选的时候记录一条no-op的日志,让之前的Log都被commit,使读服务快速恢复正常。

if rf.state == CandidateState && rf.currentTerm == args.Term {
						rf.state = LeaderState
						rf.logs = append(rf.logs, Entry{rf.currentTerm, 0})
}

比如这样,并且这样也确实很大提升了我Lab2D的执行效率,减少了将近100s的测试时长!不过我的Lab2B全挂了,应该是官方还没有考虑这个情况吧,大佬的博客中也是这么认为的。所以我就把2B里面校验Index的所有assert都注释掉了哈哈哈

image-20240214162340090

原本是需要470-490s的,现在舒服多啦!

Bug-report - Lab3B丢失消息

在Lab3的TestSnapshotUnreliableRecoverConcurrentPartition3B测试中总是存在丢失消息的问题,这个问题似乎全网只有我会出现,这是由于我的log实现方式不够理想导致的,卡了我三天,花了整整三天看了我几十万行日志才debug出来这个问题……非常想死。

bug复现:

五台服务器0、1、2、3、4,存在各种可能性的网络分区,其中一种就是两个分区,一个两台一个三台。

2作为三台分区的Leader,可以正常进行共识,3作为两台分区的Leader,落后了很多很多log。

然后分区形势变了,2能和3连上了,这时候2会向3开始进行日志复制。

第一次rpc,3发现自己的log长度小于2发来的prevLogIndex,就返回了自己的长度XIndex=192,这时候2下一次rpc就会从3的最后一个log的index开始尝试复制日志。

第二次rpc,2把192和之后的log一起发过去,prevLogIndex和Term就应该取的191的,ok问题就出在这,191很不巧就是上一次Snapshot的index,也就是说getLog(191)会正好访问到那个物理坐标为0的dummy,而它的Term是0,所以在AppendEntries里面发现args.prevLogTerm == 0,好,直接当作心跳了,直接把191之前的都当成对的了。

这个bug在Lab2里面是测不出来的,又或者说可能全世界就我会写出这种bug…所以说不要像我这样用魔法数字来做一些边界测试,代码会越来越难维护,bug会越来越多

Bug-report - Lab3B错误的日志追赶

这一块我一直做的很差,做的很赶,很多细节都有问题。

如果使用了XLen XTerm这种参数形式的追赶方式,就不要再留着nextIndex[peer]–这种方式了,很容易导致被Snapshot卡住。导致完全没办法复制日志。任务直接超时120s的时限。

Comments