MapReduce 原理及实现

您所在的位置:网站首页 mapreduce论文解读 MapReduce 原理及实现

MapReduce 原理及实现

2023-11-19 13:50| 来源: 网络整理| 查看: 265

MapReduce 是 Google 的 Jeff Dean 等人在 2004 年提出的大数据处理框架。它将数据拆分为多块,在多个机器上并行处理。原本需要高性能计算资源的数据处理任务,通过 MapReduce 可以放到多个廉价的机器上并行处理。

在 MIT 6.824 分布式系统课程中,第一个实验便是用 Go 实现 MapReduce。

MapReduce 原理MapReduce 程序分成一个 master 和多个 worker,master 负责分配和调度任务,而 worker 负责执行被分配的 Map 或 Reduce 任务MapReduce 的输入为 M 个文件或数据块,数据之间没有依赖性;输出为 R 个 Reduce 结果文件在 Map 任务阶段,每一个 worker 被分配一个文件,对文件内容执行用户自定义的 Map 函数,输出若干 (key, value) 对,并保存至中间文件当 Map 任务全部执行完成后,分配 R 个 Reduce 任务,每个 worker 对应一个 Reduce 任务,收集该 Reduce 任务对应的所有中间文件,读取 (key, value) 对,将它们排序后执行 Reduce 函数,最后将结果输出

下图描绘了 MapReduce 的过程:

以计算单词个数为例,它的 Map 和 Reduce 函数可以这么表示:

1 2 3 4 5 6 7 8 Example: word count input is thousands of text files Map(k, v) split v into words for each word w emit(w, "1") Reduce(k, v) emit(len(v))

更多的细节在论文 MapReduce: Simplified Data Processing on Large Cluster 中,对 MapReduce 有着很详细的解释。

MapReduce 实现

下面将基于 MIT 6.824 的实验部分,讲述一下我是如何实现基本的 MapReduce 的。现在网上大多数的实验版本还停留在几年前,但 2020 Spring 的实验代码结构有了很大的变化,相对更加自由。

首先直接克隆 MIT 6.824 的项目:

1 $ git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824

项目概览:所有的文件都在 src 文件夹下面。按照实验页面描述的可以执行已经给好的 sequential 的 MapReduce。示例是执行 wc.go 的统计单词数的任务。

1 2 3 4 $ cd 6.824/src/main $ go build -buildmode=plugin ../mrapps/wc.go $ rm mr-out* $ go run mrsequential.go wc.so pg*.txt

这个 Lab 是实现一个分布式的 MapReduce,由 master 和 worker 组成,它们将通过 RPC 进行通讯。

在这个 Lab 中,我们只需要修改 src/mr 目录下的 master.go、rpc.go 和 worker.go。

需求分析只有一个 master 实例,来分配调度 MapReduce 的任务worker 实例需要不断向 master 请求任务,在完成任务之后向 master 报告任务完成master 先分配 Map 任务,在所有 Map 任务结束后,再分配 reduce 任务master 在完成所有任务之后退出,此时如果还有 worker 向 master 请求任务,则会因为 RPC 请求失败而直接退出如果 worker 意外退出,或者长时间未响应,则 master 需要重新分配该任务实现

这个 Lab 的切入口是 worker.go,我们在 Worker 函数中,需要不断请求任务,master 根据当前状态分配任务。因此 worker 并不需要管理状态,只需要根据 RPC 返回的结果执行相应的任务。

对于 master,我们需要所有输入文件名、Reduce 任务数量 R。因为 Map 任务的数量等于输入文件的数量 M,所以可以维护一个长为 M 的数列来管理所有 Map 任务的状态。同理也可以用长为 R 的数列管理 Reduce 任务。因而主要的 master 数据结构可以如下构造:

1 2 3 4 5 6 7 8 9 10 11 12 type Master struct { files []string nReduce int mapTasks []MapReduceTask reduceTasks []MapReduceTask mapFinished bool reduceFinished bool mutex sync.Mutex }

对于每个任务,自定义了一个 MapReduceTask 的结构体封装对应的数据。根据上面的需求分析,每个任务需要有:

类型 Type:Map|Reduce|Wait任务的状态 Status:Unassigned|Assigned|Finished该任务在原来任务列表里的 Index(方便定位和更新对应的任务)开始时间 Timestamp,便于判断任务超时Map 任务文件 MapFile,以及 Reduce 任务中间文件列表 ReduceFiles1 2 3 4 5 6 7 8 type MapReduceTask struct { Type string // "Map", "Reduce", "Wait" Status int // "Unassigned", "Assigned", "Finished" Index int // Index of the task Timestamp time.Time // Start time MapFile string // File for map task ReduceFiles []string // List of files for reduce task }

RPC 消息的定义非常简单,worker 向 master 请求时有两种情况:

请求新的任务 RequestTask,不需要额外数据任务完成 FinishTask,需要将完成的任务 Task 传给 master 以便更新任务列表

而 master 回复时则只需要指定 Task 以及告诉总的 Reduce 任务数量。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 const ( RequestTask = iota FinishTask ) type MapReduceArgs struct { MessageType int Task MapReduceTask } type MapReduceReply struct { Task MapReduceTask NReduce int }

因此 master 的逻辑便是:

Master 结构初始化时,得到所有待处理文件 files 以及 Reduce 结果数量 nReduce。

将待处理文件顺序加入到 mapTasks 中,并初始化每个任务状态为 Unassigned。同样的,将 nReduce 个 Task 加入 reduceTasks 中

定义 RPC 处理函数 WorkerCallHandler(args *MapReduceArgs, reply *MapReduceReply) error,worker 的 RPC 请求将由这个函数处理

判断请求类型 RequestTask/FinishTask

如果是请求任务,先通过 mapFinished 判断是否 Map 阶段已完成,若未完成,则在 mapTasks 中选择一个 Unassigned 或者超时的任务;对于 Reduce 阶段,也是一样的。如果所有 Map 任务都已经被分配,并且没有完全结束,则返回一个 Wait 任务让 worker 等待

如果是完成任务,则首先判断 worker 返回的任务 Timestamp 是否一致(因为会有任务超时被重新分配,所以时间戳会不一致)。接着将对应的任务标记为 Finished,并判断是否所有的任务都已经 Finished。

需要注意的:

因为可能有多个 worker 同时请求,所以需要用 Mutex,确保同时只有一个线程在修改 master 变量因为要求能够处理 worker 不响应或异常退出的情况,所以 worker 需要先将中间文件写入临时文件,将临时文件名返回 master,若成功返回了便在 master 将其重命名,确保只有成功完成的任务才有合法的文件名。如 Map 的中间结果为 mr--,而 Reduce 的结果为 mr-out-

对于 worker,只需要根据分配的任务执行相应的函数。具体的 Map 和 Reduce 任务可以参考 mrsequential.go:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { for true { args := MapReduceArgs{MessageType: RequestTask} reply := MapReduceReply{} res := call("Master.WorkerCallHandler", &args, &reply) if !res { break } switch reply.Task.Type { case "Map": doMap(&reply, mapf) case "Reduce": doReduce(&reply, reducef) case "Wait": time.Sleep(1 * time.Second) } } }

在实现好之后通过 src/main/test-mr.sh 执行所有的测试:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 $ sh test-mr.sh *** Starting wc test. 2020/09/06 19:39:30 rpc.Register: method "Done" has 1 input parameters; needs exactly three --- wc test: PASS *** Starting indexer test. 2020/09/06 19:39:36 rpc.Register: method "Done" has 1 input parameters; needs exactly three --- indexer test: PASS *** Starting map parallelism test. 2020/09/06 19:39:39 rpc.Register: method "Done" has 1 input parameters; needs exactly three --- map parallelism test: PASS *** Starting reduce parallelism test. 2020/09/06 19:39:47 rpc.Register: method "Done" has 1 input parameters; needs exactly three --- reduce parallelism test: PASS *** Starting crash test. 2020/09/06 19:39:55 rpc.Register: method "Done" has 1 input parameters; needs exactly three --- crash test: PASS *** PASSED ALL TESTS

参考:

MapReduce: Simplified Data Processing on Large Clustershttps://www.cs.rutgers.edu/~pxk/417/notes/content/mapreduce.html


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3