文章目录
前置准备
一、分布式系统知识的学习
- 在进行学习MIT之前我们需要阅读MapReduce这篇论文、因为我们这个Lab的实现是基于这个论文的,同时教授讲课也是这样的,因此我们可以进行下载,可以再谷歌学术上进行下载就行;
- 另一方面我们需要对Go语言有一定的了解,因为后续我们都是基于Go的,天生就适合处理高并发的事务,这里可以再B战上看视频花几个小时速学一样,主要是基本变量、数组、切片、管道协程方面!!!
- 在一个选择合适的IDE、我这里采用的是VScode,因为简单实用,连接远程也方便
课程主页:課程主頁
参考专栏:參考专栏
1.1分布式Go语言环境安装
- 最基本的、做项目我们需要准备好最基础的开发环境、这里可以参考我的Go语言环境配置Go在linux下的安装;由于我试过在Windows下安装进行开发、发现找不到mr这个框架,后面还是选择了在服务器上进行开发,也证明了确实需要在linux下开发可以省很多事啊!!
1.2MIT6.824课程的学习
- 这里可以参考youtube或者B站上搜Mit6.824就可以,对于基本语法的学习可以参考Go语言栈
1.2* 前言学习
- 按照指导手册进行测试拉取代码、配置好go环境,打开vsCode,安装对应的remote ssh
阅读简单的mrsequential.go代码,并跑起来
- 接下来在执行官网的下列命令
$ cd ~/6.5840/src/main
$ bash test-mr.sh
-当完成实现的时候,这个就会finished,具体如下
可以看到这个任务是不会完成的,在我们没有开始进行编码的时候
1.3课程需求
1.4怎么实现、借助课程的Hints
一种开始的方法是修改 mr/worker.go 的 Worker() 以向协调器发送请求任务的 RPC。然后 修改协调器以使用尚未启动的文件名进行响应 映射任务。然后修改工作线程以读取该文件并调用 应用程序映射函数,如mrsequential.go。
1.5首先看看其mapfunction的工作逻辑:
整个 Map 函数的工作流程可以描述为:读取输入文件 -> 通过 Map 函数生成 key-value 对 -> 将所有生成的 key-value 对追加到中间结果切片 intermediate 中。最终,Map 函数将返回中间结果切片 intermediate
二、Lab正文
2.1 思路
● 可以从上图大致看出MapReduce的流程:启动一个Master(Coordinator协调者)分配多个任务给worker做Map任务(Worker分为Map Worker && Reduce Worker)。
● 然后Worker完成Map任务后返回中间值一组KV,接着协调者再将这些KV分发给后继的Reduce-worker根据KV来进行Reduce任务
2.2 实现
我们要知道这个MapReduce是Master分发任务给Worker,同时Worker做完任务后会通知Master进行Reduce,那么因此我们可以从Hints中的第一条:先实现worker与coordinator之间进行交互,依托rpc;
完成worker与Coordinator之间的交互,处理map任务
首先来看看给的Rpc例子;首先运行main/mrworker.go 会进入刀mr/Worker的这个方法中。可以在这个方法中调用RPC的例子方法,也就是CallExample()
然后CallExample()这个方法中会有一行:
// send the RPC request, wait for the reply.
// the "Coordinator.Example" tells the
// receiving server that we'd like to call
// the Example() method of struct Coordinator.
ok := call("Coordinator.Example", &args, &reply)
这个call就是代表了通过调用Coordinator包的Example方法。然后我们就得到了修改后的reply,得到rpc返回值。至此coordinator与worker完成了简单的交互
● 看懂了简单的Rpc交互,现在我们可以自己来实现一个Rpc做Map任务
类似于rpc包下定义类似于ExampleArgs.reply的传参,rpc的改变都是通过参数改变,因此都是用指针
# rpc.go
type Task struct{
TaskId int //任务id
TaskType TaskType //任务状态 0、1、2 对应map、reduce
ReduceNum int //reduce数量
FileSlice []string //输入文件的切片
}
//TaskType 用于枚举任务的类型
type TaskType int
//Phase 对于分配任务阶段的类型
type Phase int
//State 任务的状态的父类型
type State int
// TaskArgs rpc传入的参数、实际上什么都不用传,因为worker只传一个参数
type TaskArgs struct{}
//枚举任务的类型(Map,Reduce,执行完,等待)
const (
MapTask TaskType = iota
ReduceTask
WaittingTask //Waiting 任务代表此时为任务都分打完了,但是任务还为完成,阶段分为(map未完成和reduce未完成)
ExitTask //exit
)
//枚举分配阶段的类型(分配Map,分配Reduce,分配完成)
const (
MapPhase Phase = iota //此阶段在分发MapTask
ReducePhase
AllDone //任务分配完成
)
//枚举任务状态类型(工作、等待、完成)
const (
Working State = iota //此阶段在工作
Waiting //此阶段任务等待执行
Done //此阶段已经做完
)
● 接着我们就来在worker里面进行构造发送rpc的方法,获取Map任务;
● 总的判断,首先是获取任务的类型,根据类型决定worker是做什么任务
# worker.go
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
//CallExample()
alive := true
for alive {
task := GetTask() //获取任务
switch task.TaskType {
case MapTask:
{
DoMapTask(mapf, &task)
callDone()
}
case WaittingTask:
{
fmt.Println("All tasks are in progress, please wait...")
time.Sleep(time.Second)
}
case ExitTask:
{
fmt.Println("Task about :[", task.TaskId, "] is terminated...")
keepFlag = false
}
}
}
// uncomment to send the Example RPC to the coordinator.
}
接下来实现上方中的方法
● 调用RPC拉去Coordinator中的任务,也就是我们worker是通过rpc进行远程调用coordinator中生产任务的方法
// GetTask 获取任务(需要知道是Map任务,还是Reduce)
func GetTask() Task {
args := TaskArgs{} //可以形式一点,为空
reply := Task{}
ok := call("Coordinator.PollTask", &args, &reply)
if ok {
fmt.Println(reply)
} else {
fmt.Printf("call failed!\n")
}
return reply
}
当我们获取任务后,任务是MapTask,然后我们Worker要执行Map任务,那么map要做的事可以参考wc.go、mrsequential.go的map方法,个人理解就是我拿到一个文件,然后进行切分成一组k,v,写到temp文件,其中这个temp文件要命名成mr-tmp-{taskId}-ihash(kv.key),调用的库文档推荐的是json库。至于为什么采用中间文件,其实也是为了后面的crash试验用的。
● 自己实现的DoMapTask
func DoMapTask(mapf func(string, string) []KeyValue,task *Task){
//读取文件-仿照mrsequential.go中的map写
var intermediate[]KeyValue
filename := task.FileSlice[0]
file,err := os.Open(filename)
if err != nil{
log.Fatalf("cannot open %v", filename)
}
//通过io工具包获取content,作为Mapfunction的参数
content,err :=ioutil.ReadAll(file)
if err!=nil{
log.Fatalf("cannot read %v", filename)
}
file.Close()
//map进行切片处理
intermediate = mapf(filename,string(content))
reduceNum := task.ReduceNum
//创建一个长度为nReduce的二维切片
HashKv := make([][]KeyValue,reduceNum)
for _,kv :=range intermediate{
//通过给定的ihash函数,将不同的key分配到不同的reduce任务文件中
HashKv[ihash(kv.Key)%reduceNum] = append(HashKv[ihash(kv.Key)%reduceNum], kv)
}
//生成一组kv,写到temp文件中
for i := 0; i < reduceNum; i++ {
oname := "mr-tmp-"+strconv.Itoa(task.TaskId) + "-"+strconv.Itoa(i)
//创建一个文件
ofile,_ := os.Create(oname)
//调用json库
enc := json.NewEncoder(ofile)
for _,kv := range HashKv[i]{
err := enc.Encode(kv)
if err != nil{
return
}
}
ofile.Close()
}
}
参考课件中mrsequential.go
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}
- 做完任务后也需要调用rpc在协调者中将任务状态设置为完成,以便于协调者Coorinator确认任务已完成,worker与协调者程序能正常退出。
func callDone(f *Task) Task{
args := f
reply := Task{}
ok := call("Coordinator.MarkFinished", &args, &reply)
if ok {
//fmt.Println("worker finish :taskId[", args.TaskId, "]")
} else {
fmt.Printf("call failed!\n")
}
return reply
}
难点:结构体的定义
- 协调者结构体定义:
type Coordinator struct {
// Your definitions here.
TaskId int //用于生成Task的特殊Id
MapChan chan*Task //map任务队列
ReduceChan chan*Task //reduce任务队列
ReduceNum int //reduce数量
DistPhase Phase //目前框架中处于什么阶段
taskMetaHolder TaskMetaHolder //存放task
files []string //传入文件数组
}
其中taskMetaHolder为存放全部元信息(TaskMetaInfo)的map,当然用slice也行。
// TaskMetaHolder 保存全部任务的元数据
type TaskMetaHolder struct {
MetaMap map[int]*TaskMetaInfo // 通过下标hash快速定位
}
TaskMetaInfo结构体的定义:
// TaskMetaInfo 保存任务的元数据
type TaskMetaInfo struct {
state State // 任务的状态
TaskAdr *Task // 传入任务的指针,为的是这个任务从通道中取出来后,还能通过地址标记这个任务已经完成
}
Task任务定义
type Task struct{
TaskId int //任务id
TaskType TaskType //任务状态 0、1、2 对应map、reduce
ReduceNum int //reduce数量
FileSlice []string //输入文件的切片
}
mrcoordinator中初始协调者的方法(同worker)
func MakeCoordinator(files []string, nReduce int) *Coordinator {
//初始化master
c := Coordinator{
files : files,
ReduceNum :nReduce,
DistPhase: MapPhase,
MapChan :make(chan *Task,len(files)),
ReduceChan :make(chan*Task,nReduce),
taskMetaHolder: TaskMetaHolder{
MetaMap :make(map[int]*TaskMetaInfo,len(files)+nReduce), //任务的总数
},
}
// Your code here.
c.makeMapTasks(files) //制造任务
c.server()
//后续crash用到的协程、这里可以先忽略
//go c.CrashDetector()
return &c
}
实现我们上方的makeMapTasks;将Map任务放到Map管道中,taskMetaInfo放到taskMetaHolder中。那么这里应该怎么去实现呢?我们初始Master后现在要生成Map任务,也就是代表我们将files这个文件传入后,对其进行spilt,将Task任务的信息填进去,那么这里需要生成一个任务Id,声明状态、将这些信息最后存放到mapChan中
//接受taskMetaInfo存储在metaHolder里
func (t *TaskMetaHolder)acceptMeta(TaskInfo *TaskMetaInfo)bool{
taskId := TaskInfo.TaskAdr.TaskId
meta, _ := t.MetaMap[taskId]
if meta != nil{
return false
}else{
t.MetaMap[taskId] = TaskInfo
}
return true
}
// 将Map任务放到Map管道中、taskInfo放到taskMetaHolder中
func(c * Coordinator) makeMapTasks(files[]string){
//将该文件分成多个文件
for _,v :=range files{
//获取任务id
id := c.generateTaskId()
//生成任务Task相关信息
task :=Task{
TaskType: MapTask,
TaskId: id,
ReduceNum: c.ReduceNum,
FileSlice: []string{v},
}
//保存任务的初始状态
taskMetaInfo := TaskMetaInfo{
state: Waiting, //任务等待被执行
TaskAdr: &task, //保存任务的地址
}
//将任务存储在map中,指明taskId->taskInfo
c.taskMetaHolder.acceptMeta(&taskMetaInfo)
fmt.Println("make a map task :", &task)
//写进通道
c.MapChan <- &task
}
}
上方中生成id的方法(其实就是主键自增的方式)
func (c * Coordinator)generateTaskId()int{
res := c.TaskId
c.TaskId++;
return res;
}
到这里我们Coorinator的相关定义就已经实现了、包括Coordinator产生map任务,将任务存放到mapChan
● 接下来实现worker中的一个调用协调者的一个rpc方法,也是我认为Coordinator比较核心的分配任务:将map从任务管道中取出来,如果取不出来,说明任务已经取尽了,那么此时任务要么已经完成、要么就是正在进行。然后需要判断任务map任务是否先完成,如果完成那么应该进入下一个任务处理阶段(ReducePhase),因此此时我们先验证map则直接跳过reduce直接allDone全部完成。
// 分发任务
func (c *Coordinator) PollTask(args *TaskArgs, reply *Task) error {
// 分发任务应该上锁,防止多个worker竞争,并用defer回退解锁
mu.Lock()
defer mu.Unlock()
// 判断任务类型存任务
switch c.DistPhase {
case MapPhase:
{
if len(c.TaskChannelMap) > 0 {
*reply = *<-c.TaskChannelMap
if !c.taskMetaHolder.judgeState(reply.TaskId) {
fmt.Printf("taskid[ %d ] is running\n", reply.TaskId)
}
} else {
reply.TaskType = WaittingTask // 如果map任务被分发完了但是又没完成,此时就将任务设为Waitting
if c.taskMetaHolder.checkTaskDone() {
c.toNextPhase()
}
return nil
}
}
default:
{
reply.TaskType = ExitTask
}
}
return nil
}
检查任务是否完成后的中转阶段的实现:
func (c *Coordinator) toNextPhase() {
if c.DistPhase == MapPhase {
//c.makeReduceTasks()
// todo
c.DistPhase = AllDone
} else if c.DistPhase == ReducePhase {
c.DistPhase = AllDone
}
}
分配任务中检查任务是否完成的实现:这个就是之前的MetaMap :make(map[int]*TaskMetaInfo,len(files)+nReduce),通过其中的taskMetaInfo中的TaskAdr.TaskType来决定
// 检查多少个任务做了包括(map、reduce),
func (t *TaskMetaHolder) checkTaskDone() bool {
var (
mapDoneNum = 0
mapUnDoneNum = 0
reduceDoneNum = 0
reduceUnDoneNum = 0
)
// 遍历储存task信息的map
for _, v := range t.MetaMap {
// 首先判断任务的类型
if v.TaskAdr.TaskType == MapTask {
// 判断任务是否完成,下同
if v.state == Done {
mapDoneNum++
} else {
mapUnDoneNum++
}
} else if v.TaskAdr.TaskType == ReduceTask {
if v.state == Done {
reduceDoneNum++
} else {
reduceUnDoneNum++
}
}
}
//fmt.Printf("map tasks are finished %d/%d, reduce task are finished %d/%d \n",
// mapDoneNum, mapDoneNum+mapUnDoneNum, reduceDoneNum, reduceDoneNum+reduceUnDoneNum)
// 如果某一个map或者reduce全部做完了,代表需要切换下一阶段,返回true
// R
if (mapDoneNum > 0 && mapUnDoneNum == 0) && (reduceDoneNum == 0 && reduceUnDoneNum == 0) {
return true
} else {
if reduceDoneNum > 0 && reduceUnDoneNum == 0 {
return true
}
}
return false
}
在任务队列大于0的时候;分配任务中修改任务的状态方法:
// 判断给定任务是否在工作,并修正其目前任务信息状态
func (t *TaskMetaHolder) judgeState(taskId int) bool {
taskInfo, ok := t.MetaMap[taskId]
if !ok || taskInfo.state != Waiting {
return false
}
taskInfo.state = Working
return true
}
接着再来实现一个调用的RPC方法、将我们的任务标记为完成
//实现一个调用rpc的方法,将任务标记为完成
func (c *Coordinator)MarkFinished(args * Task,reply *Task)error{
mutex.Lock()
defer mutex.Unlock()
switch args.TaskType {
case MapTask:
meta, ok := c.taskMetaHolder.MetaMap[args.TaskId]
//prevent a duplicated work which returned from another worker
if ok && meta.state == Working {
meta.state = Done
//fmt.Printf("Map task Id[%d] is finished.\n", args.TaskId)
} else {
fmt.Printf("Map task Id[%d] is finished,already ! ! !\n", args.TaskId)
}
break
case ReduceTask:
meta, ok := c.taskMetaHolder.MetaMap[args.TaskId]
//prevent a duplicated work which returned from another worker
if ok && meta.state == Working {
meta.state = Done
//fmt.Printf("Reduce task Id[%d] is finished.\n", args.TaskId)
} else {
fmt.Printf("Reduce task Id[%d] is finished,already ! ! !\n", args.TaskId)
}
break
default:
panic("The task type undefined ! ! !")
}
return nil
}
最后实现在Map阶段中最后一个事情:如果map任务全部实现完,那么Done方法应该返回true,是协调者能够exit程序
//Done 主函数mr调用,如果所有task完成mr会通过此方法退出
func (c *Coordinator) Done() bool {
mu.Lock()
defer mu.Unlock()
if c.DistPhase == AllDone {
fmt.Printf("All tasks are finished,the coordinator will be exit! !")
return true
} else {
return false
}
}
至此map阶段已经能暂且构成一个循环,先运行mrcoordinator.go、再运行mrworker查看效果。
mrcoordinator.go运行效果(笔者为了测试效果只传入了两个文件):
mrworker.go运行效果:
再去查看生成的文件
2.2*补reduce
在map阶段上补充reduce阶段的代码。进行处理,reduce阶段就需要将map输出的kv进行整合起来,并进行shuffle后最终输出
- 有过大概一个流程写reduce阶段还是挺快,大部分逻辑其实和map阶段是相同的的,先继续初始写reduce方法:’
// 产生对应的Reduce任务
func (c *Coordinator)makeReduceTasks(){
for i := 0; i < c.ReduceNum; i++ {
id := c.generateTaskId()
task := Task{
TaskId: id,
TaskType: ReduceTask,
FileSlice: selectReduceName(i), //对应的是reduce的文件
}
//保存任务的初始状态
taskMetaINfo := TaskMetaInfo{
state: Waiting,
TaskAdr: &task,
}
c.taskMetaHolder.acceptMeta(&taskMetaINfo)
c.ReduceChan <- &task
}
}
通过梳理MapReduce框架就可知,输入阶段时,初始化一个map任务其实是对应一个输入文件,但是经过map过程来看,我们其实一个任务切分成了很多tmp文件,那么reduce任务输入则应该是一组哈希相同的中间文件
因此来补充上文的makeReduceTasks方法中挑选reduce的方法:
func selectReduceName(reduceNum int) []string {
var s []string
path, _ := os.Getwd()
files, _ := ioutil.ReadDir(path)
for _, fi := range files {
// 匹配对应的reduce文件
if strings.HasPrefix(fi.Name(), "mr-tmp") && strings.HasSuffix(fi.Name(), strconv.Itoa(reduceNum)) {
s = append(s, fi.Name())
}
}
return s
}
接着补充pollTask(分配任务)分发reduce任务:
func (c *Coordinator) PollTask(args *TaskArgs, reply *Task) error {
// 分发任务应该上锁,防止多个worker竞争,并用defer回退解锁
mu.Lock()
defer mu.Unlock()
// 判断任务类型存任务
switch c.DistPhase {
case MapPhase:
{
if len(c.MapTaskChannel) > 0 {
*reply = *<-c.MapTaskChannel
if !c.taskMetaHolder.judgeState(reply.TaskId) {
fmt.Printf("Map-taskid[ %d ] is running\n", reply.TaskId)
}
} else {
reply.TaskType = WaittingTask // 如果map任务被分发完了但是又没完成,此时就将任务设为Waitting
if c.taskMetaHolder.checkTaskDone() {
c.toNextPhase()
}
return nil
}
}
case ReducePhase:
{
if len(c.ReduceTaskChannel) > 0 {
*reply = *<-c.ReduceTaskChannel
if !c.taskMetaHolder.judgeState(reply.TaskId) {
fmt.Printf("Reduce-taskid[ %d ] is running\n", reply.TaskId)
}
} else {
reply.TaskType = WaittingTask // 如果map任务被分发完了但是又没完成,此时就将任务设为Waitting
if c.taskMetaHolder.checkTaskDone() {
c.toNextPhase()
}
return nil
}
}
case AllDone:
{
reply.TaskType = ExitTask
}
default:
panic("The phase undefined ! ! !")
}
return nil
}
补充之前切换状态的函数:
func (c *Coordinator) toNextPhase() {
if c.DistPhase == MapPhase {
c.makeReduceTasks()
c.DistPhase = ReducePhase
} else if c.DistPhase == ReducePhase {
c.DistPhase = AllDone
}
}
回头补充woker里的:
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
//CallExample()
keepFlag := true
for keepFlag {
task := GetTask()
switch task.TaskType {
case MapTask:
{
DoMapTask(mapf, &task)
callDone()
}
case WaittingTask:
{
//fmt.Println("All tasks are in progress, please wait...")
time.Sleep(time.Second)
}
case ReduceTask:
{
DoReduceTask(reducef, &task)
callDone()
}
case ExitTask:
{
//fmt.Println("Task about :[", task.TaskId, "] is terminated...")
keepFlag = false
}
}
}
// uncomment to send the Example RPC to the coordinator.
}
分配reduce任务,跟map一样参考wc.go、mrsequential.go方法,有些代码可以直接拿来用,这里还是讲一下大概思路:对之前的tmp文件进行洗牌(shuffle),得到一组排序好的kv数组,并根据重排序好kv数组重定向输出文件
func DoReduceTask(reducef func(string, []string) string , response *Task) {
reduceFileNum := response.TaskId
intermediate := shuffle(response.FileSlice)
dir, _ := os.Getwd()
//tempFile, err := ioutil.TempFile(dir, "mr-tmp-*")
tempFile, err := ioutil.TempFile(dir, "mr-tmp-*")
if err != nil {
log.Fatal("Failed to create temp file", err)
}
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
var values []string
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
i = j
}
tempFile.Close()
fn := fmt.Sprintf("mr-out-%d", reduceFileNum)
os.Rename(tempFile.Name(), fn)
}
- shuffle
// 洗牌方法,得到一组排序好的kv数组
func shuffle(files []string) []KeyValue {
var kva []KeyValue
for _, filepath := range files {
file, _ := os.Open(filepath)
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
file.Close()
}
sort.Sort(SortedKey(kva))
return kva
}
至此reduce阶段也基本完成,最后得到的结果应有:
2.3 Crash实现
从课程文档来看,测试阶段还有一个crash测试
● 从文档中看以下crash的定义
简单的翻译过来就是:如果你选择去实现一个备份任务来容错,请注意我们测试你的代码时候不会安排无关的任务(个人认为就是假死任务)让worker去执行,也因此不会崩溃。备份任务只有在一些任务很久没有得到响应后,才会被安排(例如10s)。
因此我们对crash的情况可以大概这样处理:先给在工作信息补充一个记录时间的开始状态,然后在初始化协调者的时候同步开启一个crash探测协程,将超过10s的任务都放回chan中,等待任务重新读取
// TaskMetaInfo 保存任务的元数据
type TaskMetaInfo struct {
state State // 任务的状态
StartTime time.Time // 任务的开始时间,为crash做准备
TaskAdr *Task // 传入任务的指针,为的是这个任务从通道中取出来后,还能通过地址标记这个任务已经完成
}
- 初始化补充时间:
// 判断给定任务是否在工作,并修正其目前任务信息状态
func (t *TaskMetaHolder) judgeState(taskId int) bool {
taskInfo, ok := t.MetaMap[taskId]
if !ok || taskInfo.state != Waiting {
return false
}
taskInfo.state = Working
taskInfo.StartTime = time.Now()
return true
}
- 在协调者中补充开启crash协程
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
JobChannelMap: make(chan *Job, len(files)),
JobChannelReduce: make(chan *Job, nReduce),
jobMetaHolder: JobMetaHolder{
MetaMap: make(map[int]*JobMetaInfo, len(files)+nReduce),
},
CoordinatorCondition: MapPhase,
ReducerNum: nReduce,
MapNum: len(files),
uniqueJobId: 0,
}
c.makeMapJobs(files)
c.server()
go c.CrashHandler()
return &c
}
crash协程实现
func (c *Coordinator) CrashDetector() {
for {
time.Sleep(time.Second * 2)
mu.Lock()
if c.DistPhase == AllDone {
mu.Unlock()
break
}
for _, v := range c.taskMetaHolder.MetaMap {
if v.state == Working {
//fmt.Println("task[", v.TaskAdr.TaskId, "] is working: ", time.Since(v.StartTime), "s")
}
if v.state == Working && time.Since(v.StartTime) > 9*time.Second {
fmt.Printf("the task[ %d ] is crash,take [%d] s\n", v.TaskAdr.TaskId, time.Since(v.StartTime))
switch v.TaskAdr.TaskType {
case MapTask:
c.MapTaskChannel <- v.TaskAdr
v.state = Waiting
case ReduceTask:
c.ReduceTaskChannel <- v.TaskAdr
v.state = Waiting
}
}
}
mu.Unlock()
}
}
2.4 测试
在整个lab测试过程我也是不出意外的出了两个test-fail,分别是early_exit,和carsh测试,在这就简单的分享一下心路历程。
首先那个总体的test其实内容还是挺多,我建议先总体的跑一次,然后将没过的test单独,设为一个sh脚本,进行单独测试,例如early_exit一样:
#!/usr/bin/env bash
#
# map-reduce tests
#
# comment this out to run the tests without the Go race detector.
RACE=-race
if [[ "$OSTYPE" = "darwin"* ]]
then
if go version | grep 'go1.17.[012345]'
then
# -race with plug-ins on x86 MacOS 12 with
# go1.17 before 1.17.6 sometimes crash.
RACE=
echo '*** Turning off -race since it may not work on a Mac'
echo ' with ' `go version`
fi
fi
TIMEOUT=timeout
if timeout 2s sleep 1 > /dev/null 2>&1
then
:
else
if gtimeout 2s sleep 1 > /dev/null 2>&1
then
TIMEOUT=gtimeout
else
# no timeout command
TIMEOUT=
echo '*** Cannot find timeout command; proceeding without timeouts.'
fi
fi
if [ "$TIMEOUT" != "" ]
then
TIMEOUT+=" -k 2s 180s "
fi
# run the test in a fresh sub-directory.
rm -rf mr-tmp
mkdir mr-tmp || exit 1
cd mr-tmp || exit 1
rm -f mr-*
# make sure software is freshly built.
(cd ../../mrapps && go clean)
(cd .. && go clean)
(cd ../../mrapps && go build $RACE -buildmode=plugin wc.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin indexer.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin mtiming.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin rtiming.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin jobcount.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin early_exit.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin crash.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin nocrash.go) || exit 1
(cd .. && go build $RACE mrcoordinator.go) || exit 1
(cd .. && go build $RACE mrworker.go) || exit 1
(cd .. && go build $RACE mrsequential.go) || exit 1
failed_any=0
echo '***' Starting crash test.
# generate the correct output
../mrsequential ../../mrapps/nocrash.so ../pg*txt || exit 1
sort mr-out-0 > mr-correct-crash.txt
rm -f mr-out*
rm -f mr-done
($TIMEOUT ../mrcoordinator ../pg*txt ; touch mr-done ) &
sleep 1
# start multiple workers
$TIMEOUT ../mrworker ../../mrapps/crash.so &
# mimic rpc.go's coordinatorSock()
SOCKNAME=/var/tmp/824-mr-`id -u`
( while [ -e $SOCKNAME -a ! -f mr-done ]
do
$TIMEOUT ../mrworker ../../mrapps/crash.so
sleep 1
done ) &
( while [ -e $SOCKNAME -a ! -f mr-done ]
do
$TIMEOUT ../mrworker ../../mrapps/crash.so
sleep 1
done ) &
while [ -e $SOCKNAME -a ! -f mr-done ]
do
$TIMEOUT ../mrworker ../../mrapps/crash.so
sleep 1
done
wait
rm $SOCKNAME
sort mr-out* | grep . > mr-crash-all
if cmp mr-crash-all mr-correct-crash.txt
then
echo '---' crash test: PASS
else
echo '---' crash output is not the same as mr-correct-crash.txt
echo '---' crash test: FAIL
failed_any=1
fi
对于crash测试我其实那个协程一开始,死活测试不起,并且只卡到第一行第一个字节,我就想到是不是哪里阻塞住了,遂直接加入crash协程后直接对协调者进行golang的调试运行,发现rpc调用到polltask直接卡住了,后面觉得是crash协程疯狂获取锁,倒是PollTask的全局锁获取不到。于是在crash探测的协程里放松的对锁的获取(sleep 2s)有点像时间片轮转,遂成功了。
因为写这篇博客的时候其实是边敲边写的,所以最后的bug我想起来的其实都在上面改了的,如果有少量问题可以私信我更正
2.5结果
*** Starting wc test.
2023/06/22 11:15:35 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:15:35 rpc.Register: method "Done" has 1 input parameters; needs exactly three
All tasks are finished,the coordinator will be exit! !--- wc test: PASS
2023/06/22 11:15:57 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:15:57 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:15:57 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
*** Starting indexer test.
2023/06/22 11:15:59 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:15:59 rpc.Register: method "Done" has 1 input parameters; needs exactly three
All tasks are finished,the coordinator will be exit! !2023/06/22 11:16:12 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
--- indexer test: PASS
2023/06/22 11:16:12 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
*** Starting map parallelism test.
2023/06/22 11:16:12 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:16:12 rpc.Register: method "Done" has 1 input parameters; needs exactly three
All tasks are finished,the coordinator will be exit! !2023/06/22 11:16:27 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:16:27 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
--- map parallelism test: PASS
*** Starting reduce parallelism test.
2023/06/22 11:16:27 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:16:27 rpc.Register: method "Done" has 1 input parameters; needs exactly three
All tasks are finished,the coordinator will be exit! !2023/06/22 11:16:43 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
--- reduce parallelism test: PASS
2023/06/22 11:16:43 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
*** Starting job count test.
2023/06/22 11:16:43 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:16:43 rpc.Register: method "Done" has 1 input parameters; needs exactly three
All tasks are finished,the coordinator will be exit! !2023/06/22 11:17:03 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:04 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:04 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:04 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
--- job count test: PASS
*** Starting early exit test.
2023/06/22 11:17:04 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:17:04 rpc.Register: method "Done" has 1 input parameters; needs exactly three
test-mr.sh: line 246: wait: -n: invalid option
wait: usage: wait [id]
sort: cannot read: mr-out*: No such file or directory
All tasks are finished,the coordinator will be exit! !All tasks are Done ,will be exiting...
2023/06/22 11:17:10 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:13 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
cmp: EOF on mr-wc-all-initial
--- output changed after first worker exited
--- early exit test: FAIL
*** Starting crash test.
2023/06/22 11:17:13 rpc.Register: method "CrashDetector" has 1 input parameters; needs exactly three
2023/06/22 11:17:13 rpc.Register: method "Done" has 1 input parameters; needs exactly three
the task[ 1 ] is crash,take [10952100597] s
the task[ 7 ] is crash,take [9871407319] s
the task[ 2 ] is crash,take [10951694013] s
the task[ 5 ] is crash,take [9898542154] s
the task[ 4 ] is crash,take [10926119449] s
the task[ 3 ] is crash,take [10942430906] s
the task[ 2 ] is crash,take [10822523589] s
the task[ 4 ] is crash,take [10580679965] s
the task[ 10 ] is crash,take [10456960439] s
the task[ 13 ] is crash,take [9406698246] s
All tasks are finished,the coordinator will be exit! !2023/06/22 11:17:56 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:57 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
2023/06/22 11:17:58 dialing:dial unix /var/tmp/824-mr-0: connect: connection refused
--- crash test: PASS
*** FAILED SOME TESTS
之前rpc跑失败的结果可以看看
go: finding module for package 6.5840/lockservice
go: finding module for package 6.5840/diskv
go: finding module for package 6.5840/pbservice
go: finding module for package 6.5840/viewservice
*** Starting wc test.
make a map task: &{0 0 10 [../pg-being_ernest.txt]}
make a map task: &{1 0 10 [../pg-dorian_gray.txt]}
make a map task: &{2 0 10 [../pg-frankenstein.txt]}
make a map task: &{3 0 10 [../pg-grimm.txt]}
make a map task: &{4 0 10 [../pg-huckleberry_finn.txt]}
make a map task: &{5 0 10 [../pg-metamorphosis.txt]}
make a map task: &{6 0 10 [../pg-sherlock_holmes.txt]}
make a map task: &{7 0 10 [../pg-tom_sawyer.txt]}
2023/06/18 18:57:10 rpc.Register: method "Done" has 1 input parameters; needs exactly three
new worker 5577006791947779410
new worker 5577006791947779410
rpc: can't find service Coordinate.DistrubtionJob
call faild!
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
new worker 5577006791947779410
rpc: can't find service Coordinate.DistrubtionJob
call faild!
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
rpc: can't find service Coordinate.DistrubtionJob
call faild!
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
panic: runtime error: index out of range [0] with length 0
goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0001180b8, 0xc00010e150)
/root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0001180b8, 0xc0001180c0)
/root/6.5840/src/mr/worker.go:56 +0x225
main.main()
/root/6.5840/src/main/mrworker.go:27 +0xbd
panic: runtime error: index out of range [0] with length 0
goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0000e40c0, 0xc0000f6090)
/root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0000e40c0, 0xc0000e40c8)
/root/6.5840/src/mr/worker.go:56 +0x225
main.main()
/root/6.5840/src/main/mrworker.go:27 +0xbd
panic: runtime error: index out of range [0] with length 0
goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0000e60c8, 0xc0000dc150)
/root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0000e60c8, 0xc0000e60d0)
/root/6.5840/src/mr/worker.go:56 +0x225
main.main()
/root/6.5840/src/main/mrworker.go:27 +0xbd
sort: cannot read: mr-out*: No such file or directory
cmp: EOF on mr-wc-all
--- wc output is not the same as mr-correct-wc.txt
--- wc test: FAIL
*** Starting indexer test.
make a map task: &{0 0 10 [../pg-being_ernest.txt]}
make a map task: &{1 0 10 [../pg-dorian_gray.txt]}
make a map task: &{2 0 10 [../pg-frankenstein.txt]}
make a map task: &{3 0 10 [../pg-grimm.txt]}
make a map task: &{4 0 10 [../pg-huckleberry_finn.txt]}
make a map task: &{5 0 10 [../pg-metamorphosis.txt]}
make a map task: &{6 0 10 [../pg-sherlock_holmes.txt]}
make a map task: &{7 0 10 [../pg-tom_sawyer.txt]}
2023/06/18 18:57:57 rpc.Register: method "Done" has 1 input parameters; needs exactly three
new worker 5577006791947779410
new worker 5577006791947779410
rpc: can't find service Coordinate.DistrubtionJob
call faild!
rpc: can't find service Coordinate.DistrubtionJob
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
call faild!
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
panic: runtime error: index out of range [0] with length 0
goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc00007bb48, 0xc00006a1e0)
/root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc00007bb48, 0xc00007bb50)
/root/6.5840/src/mr/worker.go:56 +0x225
main.main()
/root/6.5840/src/main/mrworker.go:27 +0xbd
panic: runtime error: index out of range [0] with length 0
goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0001180b8, 0xc00010e150)
/root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0001180b8, 0xc0001180c0)
/root/6.5840/src/mr/worker.go:56 +0x225
main.main()
/root/6.5840/src/main/mrworker.go:27 +0xbd
sort: cannot read: mr-out*: No such file or directory
cmp: EOF on mr-indexer-all
--- indexer output is not the same as mr-correct-indexer.txt
--- indexer test: FAIL
*** Starting map parallelism test.
make a map task: &{0 0 10 [../pg-being_ernest.txt]}
make a map task: &{1 0 10 [../pg-dorian_gray.txt]}
make a map task: &{2 0 10 [../pg-frankenstein.txt]}
make a map task: &{3 0 10 [../pg-grimm.txt]}
make a map task: &{4 0 10 [../pg-huckleberry_finn.txt]}
make a map task: &{5 0 10 [../pg-metamorphosis.txt]}
make a map task: &{6 0 10 [../pg-sherlock_holmes.txt]}
make a map task: &{7 0 10 [../pg-tom_sawyer.txt]}
2023/06/18 18:58:42 rpc.Register: method "Done" has 1 input parameters; needs exactly three
new worker 5577006791947779410
new worker 5577006791947779410
rpc: can't find service Coordinate.DistrubtionJob
rpc: can't find service Coordinate.DistrubtionJob
call faild!
call faild!
get response &{0 0 0 []}
get response &{0 0 0 []}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
task &{Id:0 Type:0 ReduceNum:0 FileName:[]}
panic: runtime error: index out of range [0] with length 0
goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0000e20c8, 0xc0000e40c0)
/root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0000e20c8, 0xc0000e20d0)
/root/6.5840/src/mr/worker.go:56 +0x225
main.main()
/root/6.5840/src/main/mrworker.go:27 +0xbd
panic: runtime error: index out of range [0] with length 0
goroutine 1 [running]:
6.5840/mr.DoMapTask(0x4d65822107fcfd52, 0xc0001180b8, 0xc00010e150)
/root/6.5840/src/mr/worker.go:88 +0x97c
6.5840/mr.Worker(0xc0001180b8, 0xc0001180c0)
/root/6.5840/src/mr/worker.go:56 +0x225
main.main()
/root/6.5840/src/main/mrworker.go:27 +0xbd
cat: mr-out*: No such file or directory
--- saw 0 workers rather than 2
--- map parallelism test: FAIL
cat: mr-out*: No such file or directory
--- map workers did not run in parallel
--- map parallelism test: FAIL
都是泪,当初不知道调用错了;还特地问了GPT
结果
三、收获
这次lab1其实是找到实习上岸过后开始肝的,加上go的学习、读paper、和看课程视频,写完这篇博客,总的还是花了半个多月和课程要求的一周还是有出入(实在是废物了orz…),但是扎扎实实自己写下来我觉得还是真的有非常大的收获了,后面有时间也想继续冲接下来的lab。这篇虽然我尽可能的详细的介绍了我的思路,但是还是希望后来者能有自己的思考和实现。这篇也是纯手码的,希望有错误的地方欢迎指正。由于课程说过尽量不公布源代码,因此就不公布Github啦!!!当然按上面的也是完全可以跑通