【MIT6.824】lab 1 MapReduce实现总结

您所在的位置:网站首页 mapreduce论文总结 【MIT6.824】lab 1 MapReduce实现总结

【MIT6.824】lab 1 MapReduce实现总结

2024-04-10 03:57| 来源: 网络整理| 查看: 265

MIT6.824是一门经典的分布式课程,课程链接:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html,对于lab 1我们需要在提供的代码框架的基础上补充coordinator和worker的代码,以实现分布式的MapReduce程序。

本人在借鉴了部分其他人的设计思想的基础上,独立完成了所有的代码,最后设计的实现能够通过所有的测试脚本。

实现的代码厂库:https://github.com/slipegg/MIT6.824/tree/main/6.5840

实现目标

在给定的代码框架中实现一个单词计数的MapReduce程序。原本的框架中已经给了一个在本地串行执行单词计数的独立程序,并提供了一个通过UNIX-domain sockets实现的RPC(RPC介绍),我们需要完成的部分有:

设计coordinator和worker之间交流的流程和格式,以方便worker向coordinator申请任务,coordinator将taks发送给worker,worker把task的完成情况返回给coordinator coordinator对Map类型的task和Reduce类型的task进行管理,需要初始化这些任务,需要记录任务完成的情况,并生成新的任务,直到全部完成 worker如何完成Map类型以及Reduce类型task 总体设计

worker会不断向coordinator发送心跳,申请任务,拿到任务后进行map或者renduce类型的task的执行,在执行完毕后发送请求给coordinator以表示该任务完成了。当coordinator告诉其所有任务都完成时,他会结束运行

coordinator只维护task的状态不维护各个worker的状态。worker向其发送心跳申请任务时,coordinator会去遍历任务,取出还没有发送的任务或者过了太长时间都没有完成的任务返回回去,如果没有,就返回一个等待任务。coordinator接收到worker的某个任务完成的请求时会改变这个任务的状态,如果当前阶段所有的任务都完成了就转向下一个阶段,知道转到了所有MapReduce任务都完成的阶段。

整体流程如下图所示:

rpc信息传递设计Heartbeat

worker通过rpc向coordinator发送心跳(Heartbeat)来申请任务。如下:

关键结构体定义如下,HeartbeatRequest是个空结构,HeartbeatResponse承载了coordinator返回给worker的信息,这里的信息实际上是运行map类型和reduce类型的task所必须的信息的集合。所有的返回都需要JobType来标明其类型,需要id来标明其是哪个作业,对于map类型作业,其额外需要FilePath来获取任务的输入,还需要NReduce来决定输出的数量,对于reduce类型作业,其额外需要NMap来辅助获取map类型的中间输出。

12345678910type HeartbeatRequest struct {}

type HeartbeatResponse struct { FilePath string JobType JobType NReduce int NMap int Id int}

调用请求如下,它会调用coordinator的heartbeat函数来处理,并将任务返回到response中。

1call("Coordinator.Heartbeat", &HeartbeatRequest{}, &response) Report

worker完成任务后通过rpc向coordinator发送回复。如下:

关键结构体设计如下。ReportRequest通过phase和id来联合表示是哪个任务完成了。

12345678type ReportRequest struct { Id int Phase SchedulePhase}

type ReportResponse struct {}

调用请求如下,它会调用coordinator的Report函数来处理,来将该任务标记为运行结束。

1call("Coordinator.Report", &ReportRequest{Id: id, Phase: phase}, &ReportResponse{}) coordinator设计

coordinator会衍生出2个额外的协程,一个负责给rpc注册,并响应rpc传来的函数调用请求,一个负责给worker选择task生成resopnse

rpc函数调用处理

给rpc注册的程序就是原本框架提供的代码,具体代码如下:

1234567891011121314// start a thread that listens for RPCs from worker.gofunc (c *Coordinator) server() { rpc.Register(c) rpc.HandleHTTP() //l, e := net.Listen("tcp", ":1234") sockname := coordinatorSock() os.Remove(sockname) l, e := net.Listen("unix", sockname) if e != nil { log.Fatal("listen error:", e) } go http.Serve(l, nil)}

其相应的也是上面提到的hearbeat和report事件。比较有go特色的的地方在于如何等待另一个进程生成对应的回应,采用的是如下的代码:

1234567891011type heartbeatMsg struct { response *HeartbeatResponse ok chan struct{}}

func (c *Coordinator) Heartbeat(request *HeartbeatRequest, response *HeartbeatResponse) error { msg := heartbeatMsg{response, make(chan struct{})} c.heartbeatCh



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3