0
点赞
收藏
分享

微信扫一扫

分布式系统---MapReduce实现(Go语言)

蓝哆啦呀 2023-05-12 阅读 57

一、说明

  • 本次实验是基于MIT-6.824的课程,详情请参见官网主页
  • 下载源代码

二、MapReduce原理

2.1 经典的分布式模型

MapReduce是经典的分布式模型。通过Map函数和Reduce函数实现。

分布式计算,就是利用多台机器,完成一个任务。关于分布式计算,几个经典的例子就是单词词频统计。假设现在有1000MB的文本文件需要进行词频统计,如果只有1台机器,处理此大文件可能需要10s。

如果现在有10台机器,每台机器负责处理100MB的数据,处理完之后(并行),再进行汇总,那效率将会极大提升。

2.2 MapReduce的过程

  • MapReduce模型中,有一个master,负责分配任务,有多个worker,负责map任务和reduce任务。当map处理好任务后,输出中间结果{key, value}。一般来说,每个reduce会负责固定key的任务。reduce拿到中间结果继续处理,最后再整合输出。
    在这里插入图片描述

三. Go语言实现

3.1 Worker进行和Coordinate进行RPC通信

  • RPC的参数
type ExampleArgs struct {
	X int
}

type ExampleReply struct {
	Y int
}

需要注意,遍历要大写,因为在Go语言中大写首字母代表public。

  • RPC服务器的接口
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
	reply.Y = args.X + 1
	return nil
}

客户端能够远程调用该函数,并得到结果

  • 客户端远程调用
func CallExample() {
	args := ExampleArgs{}
	args.X = 99
	reply := ExampleReply{}

	// send the RPC request, wait for the reply.
	call("Coordinator.Example", &args, &reply)

	// reply.Y should be 100.
	fmt.Printf("reply.Y %v\n", reply.Y)
}

3.2 一些变量的定义

  • 我们这里使用了状态机的方式,设置了TaskMap,TaskReduce,TaskWait,TaskEnd几种状态,分别表示Map任务,Reduce任务,目前还有任务正在执行需要等待,任务全部完成
const (
	TaskMap      = 1  // Map任务
	TaskReduce   = 2  // Reduece任务
	TaskWait     = 3  // 暂时无任务
	TaskEnd      = 4  // 所有任务已完成
	FixedTimeOut = 15 // 等待15s,Worker没回复,就说明出错了
)

var Debug bool = false

// Map任务
type MapTask struct {
	FileName string // 需要处理的文件
	MapID    int    // 当前map的编号
	NReduce  int    // 需要分成几块
}

type ReduceTask struct {
	FileName string // 该任务属于哪一个文件
	MapID    int    // 哪个map的输出结果
	ReduceID int    // 当前reduce的编号
}

// Debug模式下才需要打印
func Dprintf(format string, data ...interface{}) {
	if Debug {
		fmt.Printf(format, data...)
	}
}

3.2 Coordinator

  • Coorinator负责整个MapReduce过程的管理,因此需要记录任务相关的信息
type Coordinator struct {
	// Your definitions here.
	mutex       sync.Mutex
	mapTaskQ    []MapTask
	redTaskQ    []ReduceTask // [{0-0}, {0-1}, {0-2}, ..., {0-9}, {1-0},...{1-9}]
	mapTaskingQ []MapTask    // 正在执行map任务
	redTaskingQ []ReduceTask // 正在执行reduce任务
	nReduce     int
	isDone      bool
}

mutex:因为程序涉及到多线程,因此需要加锁
mapTaskQ:记录目前的Map任务,实际上就是文件名
redTaskQ:记录当前的Reduce任务,因为每个map会输出对应nReduce的中间文件,实验给的提示是通过ihash(key)函数,将不同的key映射到不同的reduce上,因此每个reduce应该会涉及到处理多个中间文件
mapTaskingQ:正在执行的map任务,当目前还有正在执行的map任务时,但是又有闲置的map机器在请求任务,此时coordinator应该让其等待,因为可能执行任务的map机器出现故障,此时就需要找到新的map继续执行该任务。此外,如果当目前还有正在执行的map任务,也不应该进入到reduce阶段
redTaskingQ:正在执行的reduce任务,理由和上面类似,都是为了防止发生故障

  • Coordinator初始化
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	// Your code here.
	c.mapTaskingQ = make([]MapTask, 0)
	c.nReduce = nReduce

	// 每个文件对应一个任务
	for i, file := range files {
		task := MapTask{
			FileName: file,
			MapID:    i,
			NReduce:  nReduce,
		}

		c.mapTaskQ = append(c.mapTaskQ, task)
	}

	c.isDone = false

	Dprintf("Master working...\n")

	go c.tasking2task()

	c.server()
	return &c
}

实际上就是将文件转换为map任务

  • AskTask Worker请求任务的RPC调用
func (c *Coordinator) AskTask(args *AskTaskArgs, reply *AskTaskReply) error {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	// 还有map任务
	if len(c.mapTaskQ) > 0 {
		reply.TaskType = TaskMap
		reply.MapTask = c.assignMapTask()

		return nil
	}

	// 有正在执行的map任务
	if len(c.mapTaskingQ) > 0 {
		reply.TaskType = TaskWait
		Dprintf("some MapTasks are not completed, please wait...\n")

		return nil
	}

	// 有reduce任务
	if len(c.redTaskQ) > 0 {
		reply.TaskType = TaskReduce
		redTasks := c.assignRedTask()
		reply.RedTasks = append(reply.RedTasks, redTasks...)

		return nil
	}

	// 有正在执行的reduce任务
	if len(c.redTaskingQ) > 0 {
		reply.TaskType = TaskWait
		Dprintf("some ReduceTasks are not completed, please wait...\n")

		return nil
	}

	// reduce任务也处理完了
	reply.TaskType = TaskEnd
	c.isDone = true

	Dprintf("ALL tasks done,msg: closing -> worker...\n")

	return nil
}
  • 分配Map任务
func (c *Coordinator) assignMapTask() MapTask {
	task := c.mapTaskQ[0]
	c.mapTaskQ = append(c.mapTaskQ[:0], c.mapTaskQ[1:]...)
	c.mapTaskingQ = append(c.mapTaskingQ, task)

	Dprintf("assign MapTask, fileName..%v, mapId..%v, mReduce..%v\n",
		task.FileName, task.MapID, task.NReduce)

	return task
}
  • 分配reduce任务
// 分配reduce任务
func (c *Coordinator) assignRedTask() []ReduceTask {
	redTasks := make([]ReduceTask, 0)
	reduceId := c.redTaskQ[0].ReduceID

	// 取第一个reduce task
	for i := 0; i < len(c.redTaskQ); {
		if c.redTaskQ[i].ReduceID != reduceId {
			i++
			continue
		}

		task := c.redTaskQ[i]
		c.redTaskingQ = append(c.redTaskingQ, task)
		redTasks = append(redTasks, task)
		c.redTaskQ = append(c.redTaskQ[:i], c.redTaskQ[i+1:]...)
		Dprintf("assign ReduceTask, fileName..%v, mapId..%v, reduceId..%v\n",
			task.FileName, task.MapID, task.ReduceID)
	}

	return redTasks
}
  • worker完成,回复coordinator
func (c *Coordinator) TaskDone(args *TaskDoneReply, reply *ExampleReply) error {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	switch args.TaskType {
	case TaskMap:
		c.mapTaskingQDeleter(args.MapTask, args.RedTasks)
		break
	case TaskReduce:
		c.redTaskingQDeleter(args.RedTasks)
		break
	default:
		break
	}

	return nil
}
  • map任务完成,从mapTaskingQ中移除,同时还应该将map处理好的中间结果,存放到redTaskQ
func (c *Coordinator) mapTaskingQDeleter(mapTask MapTask, redTasks []ReduceTask) {
	Dprintf("MapTask done, fileName..%v, mapId..%v, nReduce..%v\n",
		mapTask.FileName, mapTask.MapID, mapTask.NReduce)

	// 找到 mapTask.MapID的任务,然后剔除
	for i := 0; i < len(c.mapTaskingQ); i++ {
		if c.mapTaskingQ[i].MapID == mapTask.MapID {
			c.mapTaskingQ = append(c.mapTaskingQ[:i], c.mapTaskingQ[i+1:]...)
			break
		}
	}

	// 将该map的结果放到reduceTask
	// reTasks = [{i-0},...,{i-9}]
	for _, v := range redTasks {
		c.redTaskQ = append(c.redTaskQ, v)
		Dprintf("add ReduceTask, fileName..%v, mapId..%v, reduceId..%v\n",
			v.FileName, v.MapID, v.ReduceID)
	}
}
  • reduce任务完成,从redTaskingQ移除
func (c *Coordinator) redTaskingQDeleter(redTasks []ReduceTask) {
	for i := 0; i < len(redTasks); i++ {
		task := redTasks[i]
		Dprintf("ReduceTask done, fileName..%v, mapId..%v, reduceId..%v\n",
			task.FileName, task.MapID, task.ReduceID)
	
		for j := 0; j < len(c.redTaskingQ); {
			if c.redTaskingQ[j].ReduceID == task.ReduceID {
				c.redTaskingQ = append(c.redTaskingQ[:j], c.redTaskingQ[j+1:]...)
				continue
			}
			j++
		}
	}
}
  • coordinator还需要定期检查是否有map或者reduce宕机,如果有宕机,应该重新由空闲的机器处理。这里我们采用设置一个等待时间FixedTimeOut=15,如果超过这个时间正在执行的任务还没有完成,应该将他们移动到任务队列中
func (c *Coordinator) tasking2task() {
	for {
		time.Sleep(FixedTimeOut * time.Second)
		c.mutex.Lock()
		// defer c.mutex.Unlock()

		// 还有正在执行的map任务,将它添加到待完成任务队列 mapTaskQ
		if len(c.mapTaskingQ) != 0 {
			for i, _ := range c.mapTaskingQ {
				c.mapTaskQ = append(c.mapTaskQ, c.mapTaskingQ[i])
			}

			c.mapTaskingQ = []MapTask{}
		}

		if len(c.redTaskingQ) != 0 {
			Dprintf("redTaskingQ..%v\n", c.redTaskingQ)
			for i, _ := range c.redTaskingQ {
				c.redTaskQ = append(c.redTaskQ, c.redTaskingQ[i])
			}

			c.redTaskingQ = []ReduceTask{}
		}
		c.mutex.Unlock()
	}
}

3.3 Worker

  • 因为我们是单机,所以这里需要模拟多机,我们会让Worker一直请求任务,知道所有任务处理完成
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	for {
		reply := CallAskTask()

		switch reply.TaskType {
		case TaskMap: // Map任务
			doMapTask(mapf, reply.MapTask)
			break
		case TaskReduce: // Reduce任务
			doReduceTask(reducef, reply.RedTasks)
			break
		case TaskWait: // 此时没有任务
			time.Sleep(1 * time.Second)
			break
		case TaskEnd: // 所有任务全部完成
			Dprintf("任务全部完成,关机...\n")
			return
		default:
			fmt.Println("reply.TaskType: ", reply.TaskType)
			Dprintf("Unknown fault\n")
			break
		}
	}
}
  • 请求任务
func CallAskTask() AskTaskReply {
	args := AskTaskArgs{}
	reply := AskTaskReply{}

	call("Coordinator.AskTask", &args, &reply)

	return reply
}
  • 处理map任务
func doMapTask(mapf func(string, string) []KeyValue, mapTask MapTask) {
	Dprintf("doing MapTask, fileName..%v, mapId..%v, nReduce..%v\n",
		mapTask.FileName, mapTask.MapID, mapTask.NReduce)

	fileName := mapTask.FileName

	file, err := os.Open(fileName)
	if err != nil {
		log.Fatalf("cannot open %v", fileName)
	}

	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot open %v", fileName)
	}

	file.Close()

	// 通过map函数,统计每个单词
	kvs := mapf(fileName, string(content))
	sort.Sort(ByKey(kvs))

	// 将所有的单词都放到对应的 reduces[idx]中 idx = [0,...,NReduce)
	reduces := make([][]KeyValue, mapTask.NReduce)
	for _, kv := range kvs {
		idx := ihash(kv.Key) % mapTask.NReduce
		reduces[idx] = append(reduces[idx], kv)
	}

	// reduce任务
	redTasks := []ReduceTask{} // [{i-0}, ... ,{i-9}]

	for idx, reduce := range reduces {

		// reduce任务
		redTask := ReduceTask{
			FileName: mapTask.FileName,
			MapID:    mapTask.MapID,
			ReduceID: idx,
		}

		redTasks = append(redTasks, redTask)

		output := "mr-" + strconv.Itoa(redTask.MapID) + "-" + strconv.Itoa(redTask.ReduceID)
		file, err = os.Create(output)
		if err != nil {
			log.Fatalf("cannot create %v", output)
		}
		defer file.Close()

		enc := json.NewEncoder(file)

		for _, kv := range reduce {
			enc.Encode(&kv)
		}

	}

	Dprintf("MapTask to ReduceTask done, fileName..%v, mapId..%v\n", mapTask.FileName, mapTask.MapID)

	CallTaskDone(TaskMap, mapTask, redTasks)
}
  • 处理reduce任务
func doReduceTask(reducef func(string, []string) string, redTasks []ReduceTask) {

	intermediate := []KeyValue{}

	for _, v := range redTasks {
		Dprintf("doing ReduceTask, fileName..%v, mapId..%v, reduceId..%v\n",
			v.FileName, v.MapID, v.ReduceID)
		fileName := "mr-" + strconv.Itoa(v.MapID) + "-" + strconv.Itoa(v.ReduceID)
		file, err := os.Open(fileName)
		if err != nil {
			log.Fatalf("cannot open %v", fileName)
		}
		defer file.Close()

		// 反序列化JSON格式文件
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			err := dec.Decode(&kv)
			if err != nil {
				//log.Print("read file done...")
				break
			}

			intermediate = append(intermediate, KeyValue{kv.Key, kv.Value})
		}
	}

	sort.Sort(ByKey(intermediate))

	oname := "mr-out-" + strconv.Itoa(redTasks[0].ReduceID)
	ofile, _ := os.Create(oname)
	defer ofile.Close()

	i := 0
	for i < len(intermediate) {
		// 找到每一组相同的key,统一放到一个slice
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}

		// ["1","1",...,"1"]
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}

		// 执行Reduce函数,计算key出现的次数
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}

	Dprintf("ReduceTask done, fileName..%v, mapId..%v\n", redTasks[0].FileName, redTasks[0].MapID)

	CallTaskDone(TaskReduce, MapTask{}, redTasks)
}
  • 任务处理完,需要回应coordinator
func CallTaskDone(taskType int, mapTask MapTask, redTasks []ReduceTask) {
	args := TaskDoneReply{}
	args.TaskType = taskType
	args.MapTask = mapTask
	args.RedTasks = redTasks
	reply := ExampleReply{}
	call("Coordinator.TaskDone", &args, &reply)
}
举报

相关推荐

0 条评论