0
点赞
收藏
分享

微信扫一扫

MIT6.584分布式-原MIT6.824-lab1-2023年万字从0到1小白学习笔记

伽马星系 2023-06-29 阅读 74

文章目录

前置准备

一、分布式系统知识的学习

  • 在进行学习MIT之前我们需要阅读MapReduce这篇论文、因为我们这个Lab的实现是基于这个论文的,同时教授讲课也是这样的,因此我们可以进行下载,可以再谷歌学术上进行下载就行;
  • 另一方面我们需要对Go语言有一定的了解,因为后续我们都是基于Go的,天生就适合处理高并发的事务,这里可以再B战上看视频花几个小时速学一样,主要是基本变量、数组、切片、管道协程方面!!!
  • 在一个选择合适的IDE、我这里采用的是VScode,因为简单实用,连接远程也方便

课程主页:課程主頁
参考专栏:參考专栏

1.1分布式Go语言环境安装

  • 最基本的、做项目我们需要准备好最基础的开发环境、这里可以参考我的Go语言环境配置Go在linux下的安装;由于我试过在Windows下安装进行开发、发现找不到mr这个框架,后面还是选择了在服务器上进行开发,也证明了确实需要在linux下开发可以省很多事啊!!

1.2MIT6.824课程的学习

  • 这里可以参考youtube或者B站上搜Mit6.824就可以,对于基本语法的学习可以参考Go语言栈

1.2* 前言学习

  • 按照指导手册进行测试拉取代码、配置好go环境,打开vsCode,安装对应的remote ssh
    在这里插入图片描述
    阅读简单的mrsequential.go代码,并跑起来
    在这里插入图片描述
  • 接下来在执行官网的下列命令
$ cd ~/6.5840/src/main
$ bash test-mr.sh

在这里插入图片描述
-当完成实现的时候,这个就会finished,具体如下

在这里插入图片描述

可以看到这个任务是不会完成的,在我们没有开始进行编码的时候

1.3课程需求

在这里插入图片描述

1.4怎么实现、借助课程的Hints

一种开始的方法是修改 mr/worker.go 的 Worker() 以向协调器发送请求任务的 RPC。然后 修改协调器以使用尚未启动的文件名进行响应 映射任务。然后修改工作线程以读取该文件并调用 应用程序映射函数,如mrsequential.go。

1.5首先看看其mapfunction的工作逻辑:

整个 Map 函数的工作流程可以描述为:读取输入文件 -> 通过 Map 函数生成 key-value 对 -> 将所有生成的 key-value 对追加到中间结果切片 intermediate 中。最终,Map 函数将返回中间结果切片 intermediate
在这里插入图片描述

二、Lab正文

2.1 思路

● 可以从上图大致看出MapReduce的流程:启动一个Master(Coordinator协调者)分配多个任务给worker做Map任务(Worker分为Map Worker && Reduce Worker)。
● 然后Worker完成Map任务后返回中间值一组KV,接着协调者再将这些KV分发给后继的Reduce-worker根据KV来进行Reduce任务

2.2 实现

我们要知道这个MapReduce是Master分发任务给Worker,同时Worker做完任务后会通知Master进行Reduce,那么因此我们可以从Hints中的第一条:先实现worker与coordinator之间进行交互,依托rpc;

完成worker与Coordinator之间的交互,处理map任务

首先来看看给的Rpc例子;首先运行main/mrworker.go 会进入刀mr/Worker的这个方法中。可以在这个方法中调用RPC的例子方法,也就是CallExample()
在这里插入图片描述
然后CallExample()这个方法中会有一行:

	// send the RPC request, wait for the reply.
	// the "Coordinator.Example" tells the
	// receiving server that we'd like to call
	// the Example() method of struct Coordinator.
	ok := call("Coordinator.Example", &args, &reply)

这个call就是代表了通过调用Coordinator包的Example方法。然后我们就得到了修改后的reply,得到rpc返回值。至此coordinator与worker完成了简单的交互
● 看懂了简单的Rpc交互,现在我们可以自己来实现一个Rpc做Map任务
类似于rpc包下定义类似于ExampleArgs.reply的传参,rpc的改变都是通过参数改变,因此都是用指针

# rpc.go
type Task struct{
	TaskId int  //任务id
	TaskType TaskType  //任务状态 0、1、2 对应map、reduce
	ReduceNum int //reduce数量
	FileSlice []string  //输入文件的切片
}

//TaskType 用于枚举任务的类型
type TaskType int

//Phase 对于分配任务阶段的类型
type Phase int

//State 任务的状态的父类型
type State int

// TaskArgs rpc传入的参数、实际上什么都不用传,因为worker只传一个参数
type TaskArgs struct{}
//枚举任务的类型(Map,Reduce,执行完,等待)
const (
	MapTask TaskType = iota
	ReduceTask
	WaittingTask //Waiting 任务代表此时为任务都分打完了,但是任务还为完成,阶段分为(map未完成和reduce未完成)
	ExitTask //exit
)
//枚举分配阶段的类型(分配Map,分配Reduce,分配完成)
const (
	MapPhase Phase = iota //此阶段在分发MapTask
	ReducePhase
	AllDone //任务分配完成
)
//枚举任务状态类型(工作、等待、完成)
const (
	Working State = iota //此阶段在工作
	Waiting //此阶段任务等待执行
	Done //此阶段已经做完
)

● 接着我们就来在worker里面进行构造发送rpc的方法,获取Map任务;
● 总的判断,首先是获取任务的类型,根据类型决定worker是做什么任务

# worker.go
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {

	//CallExample()
	alive := true
	for alive {
		task := GetTask() //获取任务
		switch task.TaskType {
		case MapTask:
			{
				DoMapTask(mapf, &task)
				callDone()
			}

		case WaittingTask:
			{
				fmt.Println("All tasks are in progress, please wait...")
				time.Sleep(time.Second)
			}
		case ExitTask:
			{
				fmt.Println("Task about :[", task.TaskId, "] is terminated...")
				keepFlag = false
			}

		}
	}

	// uncomment to send the Example RPC to the coordinator.

}

接下来实现上方中的方法
● 调用RPC拉去Coordinator中的任务,也就是我们worker是通过rpc进行远程调用coordinator中生产任务的方法

// GetTask 获取任务(需要知道是Map任务,还是Reduce)
func GetTask() Task {

	args := TaskArgs{} //可以形式一点,为空
	reply := Task{}
	ok := call("Coordinator.PollTask", &args, &reply)

	if ok {
		fmt.Println(reply)
	} else {
		fmt.Printf("call failed!\n")
	}
	return reply

}

当我们获取任务后,任务是MapTask,然后我们Worker要执行Map任务,那么map要做的事可以参考wc.go、mrsequential.go的map方法,个人理解就是我拿到一个文件,然后进行切分成一组k,v,写到temp文件,其中这个temp文件要命名成mr-tmp-{taskId}-ihash(kv.key),调用的库文档推荐的是json库。至于为什么采用中间文件,其实也是为了后面的crash试验用的。
● 自己实现的DoMapTask

func DoMapTask(mapf  func(string, string) []KeyValue,task *Task){
	//读取文件-仿照mrsequential.go中的map写
	var intermediate[]KeyValue
	filename := task.FileSlice[0]
	file,err := os.Open(filename)
	if err != nil{
		log.Fatalf("cannot open %v", filename)
	}
	//通过io工具包获取content,作为Mapfunction的参数
	content,err :=ioutil.ReadAll(file)
	if err!=nil{
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()
	//map进行切片处理
	intermediate = mapf(filename,string(content))
	reduceNum := task.ReduceNum
	//创建一个长度为nReduce的二维切片
	HashKv := make([][]KeyValue,reduceNum)
	for _,kv :=range intermediate{
		//通过给定的ihash函数,将不同的key分配到不同的reduce任务文件中
		HashKv[ihash(kv.Key)%reduceNum] = append(HashKv[ihash(kv.Key)%reduceNum], kv)
	}
	//生成一组kv,写到temp文件中
	for i := 0; i < reduceNum; i++ {
		oname := "mr-tmp-"+strconv.Itoa(task.TaskId) + "-"+strconv.Itoa(i)
		//创建一个文件
		ofile,_ := os.Create(oname)
		//调用json库
		enc := json.NewEncoder(ofile)
		for _,kv := range HashKv[i]{
			err := enc.Encode(kv)
			if err != nil{
				return
			}
		}
		ofile.Close()
	}
}

参考课件中mrsequential.go

	intermediate := []mr.KeyValue{}
	for _, filename := range os.Args[2:] {
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}
		content, err := ioutil.ReadAll(file)
		if err != nil {
			log.Fatalf("cannot read %v", filename)
		}
		file.Close()
		kva := mapf(filename, string(content))
		intermediate = append(intermediate, kva...)
	}
  • 做完任务后也需要调用rpc在协调者中将任务状态设置为完成,以便于协调者Coorinator确认任务已完成,worker与协调者程序能正常退出。

func callDone(f *Task) Task{
	args := f
	reply := Task{}
	ok := call("Coordinator.MarkFinished", &args, &reply)

	if ok {
		//fmt.Println("worker finish :taskId[", args.TaskId, "]")
	} else {
		fmt.Printf("call failed!\n")
	}
	return reply
}

难点:结构体的定义

  • 协调者结构体定义:
type Coordinator struct {
	// Your definitions here.
	TaskId int   //用于生成Task的特殊Id
	MapChan chan*Task  //map任务队列
	ReduceChan chan*Task  //reduce任务队列
	ReduceNum int //reduce数量
	DistPhase Phase //目前框架中处于什么阶段
	taskMetaHolder TaskMetaHolder //存放task
	files []string //传入文件数组
}

其中taskMetaHolder为存放全部元信息(TaskMetaInfo)的map,当然用slice也行。

// TaskMetaHolder 保存全部任务的元数据
type TaskMetaHolder struct {
	MetaMap map[int]*TaskMetaInfo // 通过下标hash快速定位
}

TaskMetaInfo结构体的定义:

// TaskMetaInfo 保存任务的元数据
type TaskMetaInfo struct {
	state     State     // 任务的状态
	TaskAdr   *Task     // 传入任务的指针,为的是这个任务从通道中取出来后,还能通过地址标记这个任务已经完成
}

Task任务定义

type Task struct{
	TaskId int  //任务id
	TaskType TaskType  //任务状态 0、1、2 对应map、reduce
	ReduceNum int //reduce数量
	FileSlice []string  //输入文件的切片
}

mrcoordinator中初始协调者的方法(同worker)

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	//初始化master
	c := Coordinator{
		files : files,
		ReduceNum :nReduce,
		DistPhase: MapPhase,
		MapChan :make(chan *Task,len(files)),
		ReduceChan :make(chan*Task,nReduce),
		taskMetaHolder: TaskMetaHolder{
			MetaMap :make(map[int]*TaskMetaInfo,len(files)+nReduce), //任务的总数
		},
	}
	// Your code here.
	c.makeMapTasks(files) //制造任务
	c.server()
    //后续crash用到的协程、这里可以先忽略
	//go c.CrashDetector()
	return &c
}

实现我们上方的makeMapTasks;将Map任务放到Map管道中,taskMetaInfo放到taskMetaHolder中。那么这里应该怎么去实现呢?我们初始Master后现在要生成Map任务,也就是代表我们将files这个文件传入后,对其进行spilt,将Task任务的信息填进去,那么这里需要生成一个任务Id,声明状态、将这些信息最后存放到mapChan中

//接受taskMetaInfo存储在metaHolder里
func (t *TaskMetaHolder)acceptMeta(TaskInfo *TaskMetaInfo)bool{
	taskId := TaskInfo.TaskAdr.TaskId
	meta, _ := t.MetaMap[taskId]
	if meta != nil{
		return false
	}else{
		t.MetaMap[taskId] = TaskInfo
	}
	return true
}
// 将Map任务放到Map管道中、taskInfo放到taskMetaHolder中
func(c * Coordinator) makeMapTasks(files[]string){
	//将该文件分成多个文件
		for _,v :=range files{
			//获取任务id
			id := c.generateTaskId()
			//生成任务Task相关信息
			task :=Task{
				TaskType: MapTask,
				TaskId: id,
				ReduceNum: c.ReduceNum,
				FileSlice: []string{v},
			}
			//保存任务的初始状态
			taskMetaInfo := TaskMetaInfo{
				state: Waiting, //任务等待被执行
				TaskAdr: &task, //保存任务的地址
			}
			//将任务存储在map中,指明taskId->taskInfo
			c.taskMetaHolder.acceptMeta(&taskMetaInfo)
			fmt.Println("make a map task :", &task)
			//写进通道
			c.MapChan <- &task
		}
	}

上方中生成id的方法(其实就是主键自增的方式)

func (c * Coordinator)generateTaskId()int{
    res := c.TaskId
    c.TaskId++;
    return res;
}

到这里我们Coorinator的相关定义就已经实现了、包括Coordinator产生map任务,将任务存放到mapChan
● 接下来实现worker中的一个调用协调者的一个rpc方法,也是我认为Coordinator比较核心的分配任务:将map从任务管道中取出来,如果取不出来,说明任务已经取尽了,那么此时任务要么已经完成、要么就是正在进行。然后需要判断任务map任务是否先完成,如果完成那么应该进入下一个任务处理阶段(ReducePhase),因此此时我们先验证map则直接跳过reduce直接allDone全部完成。

// 分发任务
func (c *Coordinator) PollTask(args *TaskArgs, reply *Task) error {
	// 分发任务应该上锁,防止多个worker竞争,并用defer回退解锁
	mu.Lock()
	defer mu.Unlock()

	// 判断任务类型存任务
	switch c.DistPhase {
	case MapPhase:
		{
			if len(c.TaskChannelMap) > 0 {
				*reply = *<-c.TaskChannelMap
				if !c.taskMetaHolder.judgeState(reply.TaskId) {
					fmt.Printf("taskid[ %d ] is running\n", reply.TaskId)
				}
			} else {
				reply.TaskType = WaittingTask // 如果map任务被分发完了但是又没完成,此时就将任务设为Waitting
				if c.taskMetaHolder.checkTaskDone() {
					c.toNextPhase()
				}
				return nil
			}
		}
	default:
		{
			reply.TaskType = ExitTask
		}

	}

	return nil
}

检查任务是否完成后的中转阶段的实现:

func (c *Coordinator) toNextPhase() {
	if c.DistPhase == MapPhase {
		//c.makeReduceTasks()

		// todo
		c.DistPhase = AllDone
	} else if c.DistPhase == ReducePhase {
		c.DistPhase = AllDone
	}
}

分配任务中检查任务是否完成的实现:这个就是之前的MetaMap :make(map[int]*TaskMetaInfo,len(files)+nReduce),通过其中的taskMetaInfo中的TaskAdr.TaskType来决定

// 检查多少个任务做了包括(map、reduce),
func (t *TaskMetaHolder) checkTaskDone() bool {

	var (
		mapDoneNum      = 0
		mapUnDoneNum    = 0
		reduceDoneNum   = 0
		reduceUnDoneNum = 0
	)

	// 遍历储存task信息的map
	for _, v := range t.MetaMap {
		// 首先判断任务的类型
		if v.TaskAdr.TaskType == MapTask {
			// 判断任务是否完成,下同
			if v.state == Done {
				mapDoneNum++
			} else {
				mapUnDoneNum++
			}
		} else if v.TaskAdr.TaskType == ReduceTask {
			if v.state == Done {
				reduceDoneNum++
			} else {
				reduceUnDoneNum++
			}
		}

	}
	//fmt.Printf("map tasks  are finished %d/%d, reduce task are finished %d/%d \n",
	//	mapDoneNum, mapDoneNum+mapUnDoneNum, reduceDoneNum, reduceDoneNum+reduceUnDoneNum)

	// 如果某一个map或者reduce全部做完了,代表需要切换下一阶段,返回true

	// R
	if (mapDoneNum > 0 && mapUnDoneNum == 0) && (reduceDoneNum == 0 && reduceUnDoneNum == 0) {
		return true
	} else {
		if reduceDoneNum > 0 && reduceUnDoneNum == 0 {
			return true
		}
	}

	return false

}

在任务队列大于0的时候;分配任务中修改任务的状态方法:

// 判断给定任务是否在工作,并修正其目前任务信息状态
func (t *TaskMetaHolder) judgeState(taskId int) bool {
	taskInfo, ok := t.MetaMap[taskId]
	if !ok || taskInfo.state != Waiting {
		return false
	}
	taskInfo.state = Working
	return true
}

接着再来实现一个调用的RPC方法、将我们的任务标记为完成

//实现一个调用rpc的方法,将任务标记为完成
func (c *Coordinator)MarkFinished(args * Task,reply *Task)error{
	mutex.Lock()
	defer mutex.Unlock()
	switch args.TaskType {
	case MapTask:
		meta, ok := c.taskMetaHolder.MetaMap[args.TaskId]

		//prevent a duplicated work which returned from another worker
		if ok && meta.state == Working {
			meta.state = Done
			//fmt.Printf("Map task Id[%d] is finished.\n", args.TaskId)
		} else {
			fmt.Printf("Map task Id[%d] is finished,already ! ! !\n", args.TaskId)
		}
		break
	case ReduceTask:
		meta, ok := c.taskMetaHolder.MetaMap[args.TaskId]

		//prevent a duplicated work which returned from another worker
		if ok && meta.state == Working {
			meta.state = Done 
			//fmt.Printf("Reduce task Id[%d] is finished.\n", args.TaskId)
		} else {
			fmt.Printf("Reduce task Id[%d] is finished,already ! ! !\n", args.TaskId)
		}
		break
	default:
		panic("The task type undefined ! ! !")
	}
	return nil
}

最后实现在Map阶段中最后一个事情:如果map任务全部实现完,那么Done方法应该返回true,是协调者能够exit程序

//Done 主函数mr调用,如果所有task完成mr会通过此方法退出
func (c *Coordinator) Done() bool {
	mu.Lock()
	defer mu.Unlock()
	if c.DistPhase == AllDone {
		fmt.Printf("All tasks are finished,the coordinator will be exit! !")
		return true
	} else {
		return false
	}

}

至此map阶段已经能暂且构成一个循环,先运行mrcoordinator.go、再运行mrworker查看效果。
mrcoordinator.go运行效果(笔者为了测试效果只传入了两个文件):
在这里插入图片描述
mrworker.go运行效果:

在这里插入图片描述
再去查看生成的文件
在这里插入图片描述

2.2*补reduce

在map阶段上补充reduce阶段的代码。进行处理,reduce阶段就需要将map输出的kv进行整合起来,并进行shuffle后最终输出

  • 有过大概一个流程写reduce阶段还是挺快,大部分逻辑其实和map阶段是相同的的,先继续初始写reduce方法:’
// 产生对应的Reduce任务
func (c *Coordinator)makeReduceTasks(){
		for i := 0; i < c.ReduceNum; i++ {
			id := c.generateTaskId()
			task := Task{
				TaskId: id,
				TaskType: ReduceTask,
				FileSlice: selectReduceName(i), //对应的是reduce的文件
			}
			//保存任务的初始状态
			taskMetaINfo := TaskMetaInfo{
				state: Waiting,
				TaskAdr: &task,
			}
			c.taskMetaHolder.acceptMeta(&taskMetaINfo)
			c.ReduceChan <- &task
		}
	}	

通过梳理MapReduce框架就可知,输入阶段时,初始化一个map任务其实是对应一个输入文件,但是经过map过程来看,我们其实一个任务切分成了很多tmp文件,那么reduce任务输入则应该是一组哈希相同的中间文件
在这里插入图片描述
因此来补充上文的makeReduceTasks方法中挑选reduce的方法:

func selectReduceName(reduceNum int) []string {
	var s []string
	path, _ := os.Getwd()
	files, _ := ioutil.ReadDir(path)
	for _, fi := range files {
		// 匹配对应的reduce文件
		if strings.HasPrefix(fi.Name(), "mr-tmp") && strings.HasSuffix(fi.Name(), strconv.Itoa(reduceNum)) {
			s = append(s, fi.Name())
		}
	}
	return s
}

接着补充pollTask(分配任务)分发reduce任务:

func (c *Coordinator) PollTask(args *TaskArgs, reply *Task) error {
	// 分发任务应该上锁,防止多个worker竞争,并用defer回退解锁
	mu.Lock()
	defer mu.Unlock()

	// 判断任务类型存任务
	switch c.DistPhase {
	case MapPhase:
		{
			if len(c.MapTaskChannel) > 0 {
				*reply = *<-c.MapTaskChannel
				if !c.taskMetaHolder.judgeState(reply.TaskId) {
					fmt.Printf("Map-taskid[ %d ] is running\n", reply.TaskId)
				}
			} else {
				reply.TaskType = WaittingTask // 如果map任务被分发完了但是又没完成,此时就将任务设为Waitting
				if c.taskMetaHolder.checkTaskDone() {
					c.toNextPhase()
				}
				return nil
			}
		}

	case ReducePhase:
		{
			if len(c.ReduceTaskChannel) > 0 {
				*reply = *<-c.ReduceTaskChannel
				if !c.taskMetaHolder.judgeState(reply.TaskId) {
					fmt.Printf("Reduce-taskid[ %d ] is running\n", reply.TaskId)
				}
			} else {
				reply.TaskType = WaittingTask // 如果map任务被分发完了但是又没完成,此时就将任务设为Waitting
				if c.taskMetaHolder.checkTaskDone() {
					c.toNextPhase()
				}
				return nil
			}
		}

	case AllDone:
		{
			reply.TaskType = ExitTask
		}
	default:
		panic("The phase undefined ! ! !")

	}

	return nil
}

补充之前切换状态的函数:

func (c *Coordinator) toNextPhase() {
	if c.DistPhase == MapPhase {
		c.makeReduceTasks()
		c.DistPhase = ReducePhase
	} else if c.DistPhase == ReducePhase {
		c.DistPhase = AllDone
	}
}

回头补充woker里的:

func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {

	//CallExample()
	keepFlag := true
	for keepFlag {
		task := GetTask()
		switch task.TaskType {
		case MapTask:
			{
				DoMapTask(mapf, &task)
				callDone()
			}

		case WaittingTask:
			{
				//fmt.Println("All tasks are in progress, please wait...")
				time.Sleep(time.Second)
			}

		case ReduceTask:
			{
				DoReduceTask(reducef, &task)
				callDone()
			}

		case ExitTask:
			{
				//fmt.Println("Task about :[", task.TaskId, "] is terminated...")
				keepFlag = false
			}

		}
	}

	// uncomment to send the Example RPC to the coordinator.

}

分配reduce任务,跟map一样参考wc.go、mrsequential.go方法,有些代码可以直接拿来用,这里还是讲一下大概思路:对之前的tmp文件进行洗牌(shuffle),得到一组排序好的kv数组,并根据重排序好kv数组重定向输出文件

func DoReduceTask(reducef func(string, []string) string	, response *Task) {
	reduceFileNum := response.TaskId
	intermediate := shuffle(response.FileSlice)
	dir, _ := os.Getwd()
	//tempFile, err := ioutil.TempFile(dir, "mr-tmp-*")
	tempFile, err := ioutil.TempFile(dir, "mr-tmp-*")
	if err != nil {
		log.Fatal("Failed to create temp file", err)
	}
	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		var values []string
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)
		fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
		i = j
	}
	tempFile.Close()
	fn := fmt.Sprintf("mr-out-%d", reduceFileNum)
	os.Rename(tempFile.Name(), fn)
}

  • shuffle

// 洗牌方法,得到一组排序好的kv数组
func shuffle(files []string) []KeyValue {
	var kva []KeyValue
	for _, filepath := range files {
		file, _ := os.Open(filepath)
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			kva = append(kva, kv)
		}
		file.Close()
	}
	sort.Sort(SortedKey(kva))
	return kva
}

至此reduce阶段也基本完成,最后得到的结果应有:
在这里插入图片描述
在这里插入图片描述

2.3 Crash实现

从课程文档来看,测试阶段还有一个crash测试
● 从文档中看以下crash的定义

简单的翻译过来就是:如果你选择去实现一个备份任务来容错,请注意我们测试你的代码时候不会安排无关的任务(个人认为就是假死任务)让worker去执行,也因此不会崩溃。备份任务只有在一些任务很久没有得到响应后,才会被安排(例如10s)。

因此我们对crash的情况可以大概这样处理:先给在工作信息补充一个记录时间的开始状态,然后在初始化协调者的时候同步开启一个crash探测协程,将超过10s的任务都放回chan中,等待任务重新读取

// TaskMetaInfo 保存任务的元数据
type TaskMetaInfo struct {
	state     State     // 任务的状态
	StartTime time.Time // 任务的开始时间,为crash做准备
	TaskAdr   *Task     // 传入任务的指针,为的是这个任务从通道中取出来后,还能通过地址标记这个任务已经完成
}
  • 初始化补充时间:
// 判断给定任务是否在工作,并修正其目前任务信息状态
func (t *TaskMetaHolder) judgeState(taskId int) bool {
	taskInfo, ok := t.MetaMap[taskId]
	if !ok || taskInfo.state != Waiting {
		return false
	}
	taskInfo.state = Working
	taskInfo.StartTime = time.Now()
	return true
}


  • 在协调者中补充开启crash协程
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{
		JobChannelMap:    make(chan *Job, len(files)),
		JobChannelReduce: make(chan *Job, nReduce),
		jobMetaHolder: JobMetaHolder{
			MetaMap: make(map[int]*JobMetaInfo, len(files)+nReduce),
		},
		CoordinatorCondition: MapPhase,
		ReducerNum:           nReduce,
		MapNum:               len(files),
		uniqueJobId:          0,
	}

	c.makeMapJobs(files)

	c.server()

	go c.CrashHandler()

	return &c
}

crash协程实现


func (c *Coordinator) CrashDetector() {
	for {
		time.Sleep(time.Second * 2)
		mu.Lock()
		if c.DistPhase == AllDone {
			mu.Unlock()
			break
		}

		for _, v := range c.taskMetaHolder.MetaMap {
			if v.state == Working {
				//fmt.Println("task[", v.TaskAdr.TaskId, "] is working: ", time.Since(v.StartTime), "s")
			}

			if v.state == Working && time.Since(v.StartTime) > 9*time.Second {
				fmt.Printf("the task[ %d ] is crash,take [%d] s\n", v.TaskAdr.TaskId, time.Since(v.StartTime))

				switch v.TaskAdr.TaskType {
				case MapTask:
					c.MapTaskChannel <- v.TaskAdr
					v.state = Waiting
				case ReduceTask:
					c.ReduceTaskChannel <- v.TaskAdr
					v.state = Waiting

				}
			}
		}
		mu.Unlock()
	}

}

2.4 测试

在整个lab测试过程我也是不出意外的出了两个test-fail,分别是early_exit,和carsh测试,在这就简单的分享一下心路历程。
在这里插入图片描述
首先那个总体的test其实内容还是挺多,我建议先总体的跑一次,然后将没过的test单独,设为一个sh脚本,进行单独测试,例如early_exit一样:

#!/usr/bin/env bash

#
# map-reduce tests
#

# comment this out to run the tests without the Go race detector.
RACE=-race

if [[ "$OSTYPE" = "darwin"* ]]
then
  if go version | grep 'go1.17.[012345]'
  then
    # -race with plug-ins on x86 MacOS 12 with
    # go1.17 before 1.17.6 sometimes crash.
    RACE=
    echo '*** Turning off -race since it may not work on a Mac'
    echo '    with ' `go version`
  fi
fi

TIMEOUT=timeout
if timeout 2s sleep 1 > /dev/null 2>&1
then
  :
else
  if gtimeout 2s sleep 1 > /dev/null 2>&1
  then
    TIMEOUT=gtimeout
  else
    # no timeout command
    TIMEOUT=
    echo '*** Cannot find timeout command; proceeding without timeouts.'
  fi
fi
if [ "$TIMEOUT" != "" ]
then
  TIMEOUT+=" -k 2s 180s "
fi

# run the test in a fresh sub-directory.
rm -rf mr-tmp
mkdir mr-tmp || exit 1
cd mr-tmp || exit 1
rm -f mr-*

# make sure software is freshly built.
(cd ../../mrapps && go clean)
(cd .. && go clean)
(cd ../../mrapps && go build $RACE -buildmode=plugin wc.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin indexer.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin mtiming.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin rtiming.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin jobcount.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin early_exit.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin crash.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin nocrash.go) || exit 1
(cd .. && go build $RACE mrcoordinator.go) || exit 1
(cd .. && go build $RACE mrworker.go) || exit 1
(cd .. && go build $RACE mrsequential.go) || exit 1

failed_any=0

echo '***' Starting crash test.

# generate the correct output
../mrsequential ../../mrapps/nocrash.so ../pg*txt || exit 1
sort mr-out-0 > mr-correct-crash.txt
rm -f mr-out*

rm -f mr-done
($TIMEOUT ../mrcoordinator ../pg*txt ; touch mr-done ) &
sleep 1

# start multiple workers
$TIMEOUT ../mrworker ../../mrapps/crash.so &

# mimic rpc.go's coordinatorSock()
SOCKNAME=/var/tmp/824-mr-`id -u`

( while [ -e $SOCKNAME -a ! -f mr-done ]
  do
    $TIMEOUT ../mrworker ../../mrapps/crash.so
    sleep 1
  done ) &

( while [ -e $SOCKNAME -a ! -f mr-done ]
  do
    $TIMEOUT ../mrworker ../../mrapps/crash.so
    sleep 1
  done ) &

while [ -e $SOCKNAME -a ! -f mr-done ]
do
  $TIMEOUT ../mrworker ../../mrapps/crash.so
  sleep 1
done

wait

rm $SOCKNAME
sort mr-out* | grep . > mr-crash-all
if cmp mr-crash-all mr-correct-crash.txt
then
  echo '---' crash test: PASS
else
  echo '---' crash output is not the same as mr-correct-crash.txt
  echo '---' crash test: FAIL
  failed_any=1
fi

对于crash测试我其实那个协程一开始,死活测试不起,并且只卡到第一行第一个字节,我就想到是不是哪里阻塞住了,遂直接加入crash协程后直接对协调者进行golang的调试运行,发现rpc调用到polltask直接卡住了,后面觉得是crash协程疯狂获取锁,倒是PollTask的全局锁获取不到。于是在crash探测的协程里放松的对锁的获取(sleep 2s)有点像时间片轮转,遂成功了。
因为写这篇博客的时候其实是边敲边写的,所以最后的bug我想起来的其实都在上面改了的,如果有少量问题可以私信我更正

2.5结果

*** Starting wc test.
2023/06/22 11:15:35 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:15:35 rpc.Register: method "Done" has 1 input parameters; needs exactly three
All tasks are finished,the coordinator will be exit! !--- wc test: PASS
2023/06/22 11:15:57 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:15:57 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:15:57 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
*** Starting indexer test.
2023/06/22 11:15:59 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:15:59 rpc.Register: method "Done" has 1 input parameters; needs exactly three
All tasks are finished,the coordinator will be exit! !2023/06/22 11:16:12 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
--- indexer test: PASS
2023/06/22 11:16:12 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
*** Starting map parallelism test.
2023/06/22 11:16:12 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:16:12 rpc.Register: method "Done" has 1 input parameters; needs exactly three
All tasks are finished,the coordinator will be exit! !2023/06/22 11:16:27 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:16:27 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
--- map parallelism test: PASS
*** Starting reduce parallelism test.
2023/06/22 11:16:27 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:16:27 rpc.Register: method "Done" has 1 input parameters; needs exactly three
All tasks are finished,the coordinator will be exit! !2023/06/22 11:16:43 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
--- reduce parallelism test: PASS
2023/06/22 11:16:43 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
*** Starting job count test.
2023/06/22 11:16:43 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:16:43 rpc.Register: method "Done" has 1 input parameters; needs exactly three
All tasks are finished,the coordinator will be exit! !2023/06/22 11:17:03 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:04 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:04 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:04 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
--- job count test: PASS
*** Starting early exit test.
2023/06/22 11:17:04 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:17:04 rpc.Register: method "Done" has 1 input parameters; needs exactly three
test-mr.sh: line 246: wait: -n: invalid option
wait: usage: wait [id]
sort: cannot read: mr-out*: No such file or directory
All tasks are finished,the coordinator will be exit! !All tasks are Done ,will be exiting...
2023/06/22 11:17:10 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:13 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
cmp: EOF on mr-wc-all-initial
--- output changed after first worker exited
--- early exit test: FAIL
*** Starting crash test.
2023/06/22 11:17:13 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:17:13 rpc.Register: method "Done" has 1 input parameters; needs exactly three
the task[ 1 ] is crash,take [10952100597] s
the task[ 7 ] is crash,take [9871407319] s
the task[ 2 ] is crash,take [10951694013] s
the task[ 5 ] is crash,take [9898542154] s
the task[ 4 ] is crash,take [10926119449] s
the task[ 3 ] is crash,take [10942430906] s
the task[ 2 ] is crash,take [10822523589] s
the task[ 4 ] is crash,take [10580679965] s
the task[ 10 ] is crash,take [10456960439] s
the task[ 13 ] is crash,take [9406698246] s
All tasks are finished,the coordinator will be exit! !2023/06/22 11:17:56 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:57 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:58 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
--- crash test: PASS
*** FAILED SOME TESTS

之前rpc跑失败的结果可以看看

go: finding module for package 6.5840/lockservice
go: finding module for package 6.5840/diskv
go: finding module for package 6.5840/pbservice
go: finding module for package 6.5840/viewservice
*** Starting wc test.
make a map task: &{0 0 10 [../pg-being_ernest.txt]}
make a map task: &{1 0 10 [../pg-dorian_gray.txt]}
make a map task: &{2 0 10 [../pg-frankenstein.txt]}
make a map task: &{3 0 10 [../pg-grimm.txt]}
make a map task: &{4 0 10 [../pg-huckleberry_finn.txt]}
make a map task: &{5 0 10 [../pg-metamorphosis.txt]}
make a map task: &{6 0 10 [../pg-sherlock_holmes.txt]}
make a map task: &{7 0 10 [../pg-tom_sawyer.txt]}
2023/06/18 18:57:10 rpc.Register: method "Done" has 1 input parameters; needs exactly three
new worker  5577006791947779410
new worker  5577006791947779410
rpc: can't find service Coordinate.DistrubtionJob
call faild!
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
new worker  5577006791947779410
rpc: can't find service Coordinate.DistrubtionJob
call faild!
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
rpc: can't find service Coordinate.DistrubtionJob
call faild!
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
panic: runtime error: index out of range [0] with length 0

goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0001180b8, 0xc00010e150)
        /root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0001180b8, 0xc0001180c0)
        /root/6.5840/src/mr/worker.go:56 +0x225
main.main()
        /root/6.5840/src/main/mrworker.go:27 +0xbd
panic: runtime error: index out of range [0] with length 0

goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0000e40c0, 0xc0000f6090)
        /root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0000e40c0, 0xc0000e40c8)
        /root/6.5840/src/mr/worker.go:56 +0x225
main.main()
        /root/6.5840/src/main/mrworker.go:27 +0xbd
panic: runtime error: index out of range [0] with length 0

goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0000e60c8, 0xc0000dc150)
        /root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0000e60c8, 0xc0000e60d0)
        /root/6.5840/src/mr/worker.go:56 +0x225
main.main()
        /root/6.5840/src/main/mrworker.go:27 +0xbd
sort: cannot read: mr-out*: No such file or directory
cmp: EOF on mr-wc-all
--- wc output is not the same as mr-correct-wc.txt
--- wc test: FAIL
*** Starting indexer test.
make a map task: &{0 0 10 [../pg-being_ernest.txt]}
make a map task: &{1 0 10 [../pg-dorian_gray.txt]}
make a map task: &{2 0 10 [../pg-frankenstein.txt]}
make a map task: &{3 0 10 [../pg-grimm.txt]}
make a map task: &{4 0 10 [../pg-huckleberry_finn.txt]}
make a map task: &{5 0 10 [../pg-metamorphosis.txt]}
make a map task: &{6 0 10 [../pg-sherlock_holmes.txt]}
make a map task: &{7 0 10 [../pg-tom_sawyer.txt]}
2023/06/18 18:57:57 rpc.Register: method "Done" has 1 input parameters; needs exactly three
new worker  5577006791947779410
new worker  5577006791947779410
rpc: can't find service Coordinate.DistrubtionJob
call faild!
rpc: can't find service Coordinate.DistrubtionJob
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
call faild!
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
panic: runtime error: index out of range [0] with length 0

goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc00007bb48, 0xc00006a1e0)
        /root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc00007bb48, 0xc00007bb50)
        /root/6.5840/src/mr/worker.go:56 +0x225
main.main()
        /root/6.5840/src/main/mrworker.go:27 +0xbd
panic: runtime error: index out of range [0] with length 0

goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0001180b8, 0xc00010e150)
        /root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0001180b8, 0xc0001180c0)
        /root/6.5840/src/mr/worker.go:56 +0x225
main.main()
        /root/6.5840/src/main/mrworker.go:27 +0xbd
sort: cannot read: mr-out*: No such file or directory
cmp: EOF on mr-indexer-all
--- indexer output is not the same as mr-correct-indexer.txt
--- indexer test: FAIL
*** Starting map parallelism test.
make a map task: &{0 0 10 [../pg-being_ernest.txt]}
make a map task: &{1 0 10 [../pg-dorian_gray.txt]}
make a map task: &{2 0 10 [../pg-frankenstein.txt]}
make a map task: &{3 0 10 [../pg-grimm.txt]}
make a map task: &{4 0 10 [../pg-huckleberry_finn.txt]}
make a map task: &{5 0 10 [../pg-metamorphosis.txt]}
make a map task: &{6 0 10 [../pg-sherlock_holmes.txt]}
make a map task: &{7 0 10 [../pg-tom_sawyer.txt]}
2023/06/18 18:58:42 rpc.Register: method "Done" has 1 input parameters; needs exactly three
new worker  5577006791947779410
new worker  5577006791947779410
rpc: can't find service Coordinate.DistrubtionJob
rpc: can't find service Coordinate.DistrubtionJob
call faild!
call faild!
get response &{0 0 0 []}
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
panic: runtime error: index out of range [0] with length 0

goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0000e20c8, 0xc0000e40c0)
        /root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0000e20c8, 0xc0000e20d0)
        /root/6.5840/src/mr/worker.go:56 +0x225
main.main()
        /root/6.5840/src/main/mrworker.go:27 +0xbd
panic: runtime error: index out of range [0] with length 0

goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0001180b8, 0xc00010e150)
        /root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0001180b8, 0xc0001180c0)
        /root/6.5840/src/mr/worker.go:56 +0x225
main.main()
        /root/6.5840/src/main/mrworker.go:27 +0xbd
cat: mr-out*: No such file or directory
--- saw 0 workers rather than 2
--- map parallelism test: FAIL
cat: mr-out*: No such file or directory
--- map workers did not run in parallel
--- map parallelism test: FAIL

都是泪,当初不知道调用错了;还特地问了GPT
在这里插入图片描述

结果

在这里插入图片描述

三、收获

这次lab1其实是找到实习上岸过后开始肝的,加上go的学习、读paper、和看课程视频,写完这篇博客,总的还是花了半个多月和课程要求的一周还是有出入(实在是废物了orz…),但是扎扎实实自己写下来我觉得还是真的有非常大的收获了,后面有时间也想继续冲接下来的lab。这篇虽然我尽可能的详细的介绍了我的思路,但是还是希望后来者能有自己的思考和实现。这篇也是纯手码的,希望有错误的地方欢迎指正。由于课程说过尽量不公布源代码,因此就不公布Github啦!!!当然按上面的也是完全可以跑通

举报

相关推荐

0 条评论