===========本文需要有 Go 的基础,并且知道 6.824 Lab 的相关内容作为预备知识===========

最开始做这个 Lab 是去年,所以使用的是 2015 年的(现在已经有 2016 年的了),地址Distributed System

第一个 Lab 是阅读 MapReduce 的论文,然后在提供的框架下实现一个简单版的 MapReduce 程序,论文地址:MapReduce

Part I

在提供的框架下,自己实现 Map 和 Reduce 函数,从而实现单机版的 MapReduce 程序,用来统计单词的数据,类似分布式程序的 Hello World。

根据提供的代码,以及我们在 Part I 执行的语句可知,在 wc.go 中我们找到如下的语句

mapreduce.RunSingle(5, 3, os.Args[2], Map, Reduce)

我们最终执行的是 mapreduce.RunSingle 这个函数,在 RunSingle 函数中,可以分为如下几步

  1. InitMapReduce
  2. Split
  3. DoMap
  4. DoReduce
    其中 InitMapReduce,初始化一个 mapreduce 结构体,在后面使用,Split 则将输入的文件进行,然后顺序调用 DoMap,这里面会调用我们写的 map 函数,DoMap 都做完之后,再继续执行 DoReduce,这个函数会调用我们写的 reduce 函数。然后根据论文中的伪代码,差不多就可以完成这两个函数了
    map(String key, String value):
    // key: document name
    // value: document contents
    for each word w in value:
    EmitIntermediate(w, “1”);
    reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
    result += ParseInt(v);
    Emit(AsString(result));

    Part IIIII

首先查看 test_test.go 中的所有的 test 函数,看是如何实现测试的,大致顺序会形成一张如下的图,从上到下形成调用间的层次,同一层次间的函数执行顺序是从左往右顺序执行,其中绿色的表示是通过 go func()(另起一个线程)来执行的,我们只需要完成 mapreduce.go#RunMaster 函数即可。

part2

从 test 中的代码以及 Part I 中相关代码可以得知,我们需要写的代码(RunMaster 函数),实际上就是把所有的任务(map 或者 reduce)分配给具体的 worker 来执行。

首先,如果我们不考虑 worker 这个概念,那么怎么实现 RunMaster 函数呢,我们只需要把 Part I 中 RunSingle 中的两个 for 循环改成 goroutine 的,也就是在函数 DoMap 和 DoReduce 之前加关键字 go 即可,当然到这里我们还需要考虑,如何做到所有的 map 都完成之后才处理 reduce?reduce 都处理完成才算 RunMaster 函数处理完成?这就变成了 goroutine 的的同步问题了,可以参考 channel buffering。

到这里,如果我们不考虑 worker 的话,所有的 test case 都可以通过了,但是发现 TestBasic 函数的起的 worker 我们根本没有用到(后面几个 test case 还有 worker fail 的情况),那么就变成了,如何将我们上面的代码改写为,使用 worker 来执行,而不是直接通过 go DoMap() 以及 go DoReduce 来执行,通过阅读 worker.go 发现有一个 RPC 接口 DoJob,刚好满足我们的需要,阅读整个项目的其他代码(mapreduce.go#CleanupRegistration()),发现通过调用 common.go#call() 来统一进行 RPC 调用.

这里我们需要知道,从哪知道一个 worker 准备就绪,以及如何知道一个 woker 从忙状态(处理任务)—> 闲状态(任务处理完成),我们可以看到在 worker.go#RunWoker 里面有一句

Register(MasterAddress, me)

我们发现 Register 函数如下

// Tell the master we exist and ready to work
func Register(master string, me string) {
args := RegisterArgs{}
args.Worker = me
var reply RegisterReply
ok := call(master, “MapReduce.Register”, args, reply)
if ok == false {
fmt.Printf(“Register: RPC %s register error\n”, master)
}
}

其中第6行调用 MapReduce.Register 这个 RPC 接口,继续看,发现 mapreduce.go#Register 这个函数中有下面一句话

mr.registerChannel <- args.Worker

发现 registerChannel 是 mapreduce 这个结构体中的一个 channel,也就是在 RunWorker 的时候,我们能从 mr.registerChannel 得到一个标识 worker 的字符串(可以理解为这个 worker 的名字),而这个字符串,后续我们传给 common.go#call 函数,调用相关的 RPC 接口。

好,至少我们知道什么时候会得到通知有 worker 注册了,那么如何知道 worker 从忙变成闲呢,通过上面的流程,我们可以复用 registerChannel,也就是如果一个 worker 处理完任务的时候,我们也往这个 channel 发送 args.Worker 这个字段,这里就需要更改 registerChannel 的定义,因为我们不知道注册 worker 和分配任务给 worker 谁先谁后,在这里我们只需要把 registerChannel 变成带 buffer 的就行了。最终需要处理 worker 中途挂掉的情况,只需要在外层起一个死循环,直到 call 这个 函数返回 true 的时候才退出即可。

总结:

梳理一下:我们在 RunMaster 中需要并行的执行 Map,所有 Map 操作执行完成之后,并行的执行 Reduce 操作,这些操作需要通过分配给 worker 来执行,通过 channel 可以知道什么时候有空闲的 worker(注册或者由忙变闲),然后在调用 Worker.DoJob 的外层用死循环包装一层,知道 RPC 返回成功才退出即可

 

Comments