MapReduce理解

Hadoop MapReduce是一个软件框架,该框架能够轻松编写出应用程序,这些应用程序以可靠,容错的方式在大型集群(数千节点)的商用硬件上并行处理大量数据。

一、优缺点

优点

1)易于编程。

  它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的 PC 机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得 MapReduce 编程变得非常流行。

2)良好的扩展性。

  当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

3)高容错性。

  MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就 要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一 个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成。

4)适合PB级以上海量数据的离线处理。

  这里强调离线处理,说明它适合离线处理而不适合在线处理。比如像毫秒级别的返回一个结果,MapReduce 很难做到。

缺点

1)实时计算。

  MapReduce 无法像 Mysql 一样,在毫秒或者秒级内返回结果。

2)流式计算。

  流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。

3)DAG(有向图)计算。

  多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。(spark使用的是内存)

二、流程

客户端(Client):编写mapreduce程序,配置作业,提交作业,这就是程序员完成的工作;

JobTracker:JobTracker是一个后台服务进程,启动之后,会一直监听并接收来自各个TaskTracker发送的心跳信息,包括资源使用情况和任务运行情况等信息。

  • 作业控制:在hadoop中每个应用程序被表示成一个作业,每个作业又被分成多个任务,JobTracker的作业控制模块则负责作业的分解和状态监控。
  • 状态监控:主要包括TaskTracker状态监控、作业状态监控和任务状态监控。主要作用:容错和为任务调度提供决策依据。
  • JobTracker只有一个,他负责了任务的信息采集整理,你就把它当做包工头把,这个和采用Master/Slave结构中的Master保持一致
  • JobTracker 对应于 NameNode
  • 一般情况应该把JobTracker部署在单独的机器上

TaskTracker:TaskTracker是JobTracker和Task之间的桥梁。TaskTracker与JobTracker和Task之间采用了RPC协议进行通信。

  • 从JobTracker接收并执行各种命令:运行任务、提交任务、杀死任务等
  • 将本地节点上各个任务的状态通过心跳周期性汇报给JobTracker,节点健康情况、资源使用情况,任务执行进度、任务运行状态等,比如说map task我做完啦,你什么时候让reduce task过来拉数据啊
  • TaskTracker是运行在多个节点上的slaver服务。TaskTracker主动与JobTracker通信,接收作业,并负责直接执行每一个任务。
  • TaskTracker都需要运行在HDFS的DataNode上

HDFS:保存作业的数据、配置信息等等,最后的结果也是保存在hdfs上面

  • NameNode: 管理文件目录结构,接受用户的操作请求,管理数据节点(DataNode)
  • DataNode:是HDFS中真正存储数据的
  • Block:是hdfs读写数据的基本单位,默认128MB大小,就是说如果你有130MB数据,那就要分成两个block,一个存放128MB,后一个存放2MB数据,虽然最后一个block块是128MB,但实际上占用空间为2MB
  • Sencondary NameNode:它的目的是帮助 NameNode 合并编辑日志,减少 NameNode 启动时间,在文件系统中设置一个检查点来帮助NameNode更好的工作。它不是要取代掉NameNode也不是NameNode的备份。
    avatar

提交作业

1.在客户端启动一个作业。拿个比方说,我提交了一个hive程序。
2.向JobTracker请求一个Job ID,就像你排队买车一样,你不得摇个号啊,没有这个号你就不能买车(执行任务)。
3.将运行作业所需要的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息。
4.提交作业

初始化作业

5.JobTracker接收到作业后,将其放在一个作业队列里(默认的调度方法是FIFO),等待作业调度器对其进行调度。
6.当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息(Split)为每个划分创建一个map任务,并将map任务分配给TaskTracker执行。

分配任务

7.TaskTracker和JobTracker之间的通信和任务分配是通过心跳机制按成的。TaskTracker作为一个单独的JVM执行一个简单的循环,主要实现每隔一段时间向JobTracker发送心跳(Heartbeat),以次告诉JobTracker此TaskTracker是否存活,是否准备执行新的任务。(心跳中还携带着很多的信息,比如当前map任务完成的进度等信息)

执行任务

8.TaskTracker申请到新的任务之后就要在本地运行任务。运行任务的第一步是将任务本地化:将任务运行所必需的数据、配置信息、程序代码从HDFS复制到TaskTracker本地。
9.发布任务
10.启动新的JVM来运行每个任务。

更新任务执行进度和状态

  通过心跳机制,所有的TaskTracker的统计信息都会汇总到JobTracker处。JobTracker将这些信息合并起来,产生一个全局作业进度统计信息,用来表明正在运行的所有作业,以及其中所含任务的状态。最后,JobClient通过每秒查看JobTracker来接收作业进度的最新状态。

完成作业

  所有TaskTracker任务的执行进度信息都会汇总到JobTracker处,当JobTracker接收到最后一个任务的已完成通知后,便把作业的状态设置为“成功”。然后,JobClient也将及时得到任务已经完成,它会显示一条信息告知用户作业完成,最后从runJob()方法处返回。

三、框架原理

一个wordcount的例子:
avatar
  MapReduce大体分为Input,Split,Map,Shuffle,Reduce五个步骤,重点在Map,Shuffle,Reduce三个阶段。
下面这张图是根据官网图片翻译重画而来:
avatar
  shuffle横跨map和reduce两端,是MapReduce的核心过程。
  Shuffle描述着数据从map task输出到reduce task输入的这段过程。map task干完活了要输出数据了,然后接下来数据给哪个reduce?怎么分配?就有了shuffle过程。Shuffle阶段可以分为Map端的Shuffle和Reduce端的Shuffle。

Map端

avatar
  map端的shuffle包括环形内存缓冲区执行溢出写,partition,sort,combiner,生成溢出写文件,合并。
  Map端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。map函数开始产生输出时并非简单地将它输出到磁盘。因为频繁的磁盘操作会导致性能严重下降。它的处理过程更复杂,数据首先写到内存中的一个缓冲区,并做一些预排序,以提升效率。

输入

  输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取Split,也就是是我们所说的分片。在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务。

  • Block是HDFS的基本存储单元,上文也有写,Block默认大小是128M。
  • Split是map task只读的单位,存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。
  • Split与block的对应关系可能是多对一,默认是一对一。

  每个map任务都有一个环形内存缓冲区,用于存储任务的输出(默认大小100MB,mapreduce.task.io.sort.mb调整),被缓冲的K-V对记录已经被序列化,但没有排序。而且每个K-V对都附带一些额外的审计信息。一旦缓冲内容达到阈值(mapreduce.map.io.sort.spill.percent,默认0.80,或者80%),就会创建一个溢出写文件,同时开启一个后台线程把数据溢出写(spill)到本地磁盘中。溢出写过程按轮询方式将缓冲区中的内容写到mapreduce.cluster.local.dir属性指定的目录中。

分区

  在写磁盘之前,线程首先根据数据最终要传递到的Reducer把数据划分成相应的分区(partition),输出key会经过Partitioner分组或者分桶选择不同的reduce。默认的情况下Partitioner会对map输出的key进行hash取模,比如有6个ReduceTask,它就是模6,如果key的hash值为0,就选择第0个ReduceTask(为1,选Reduce Task1)。这样不同的map对相同单词key,它的hash值取模是一样的,所以会交给同一个reduce来处理。目的是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据

按key排序

  在每个分区中,后台线程将数据按Key进行排序(排序方式为快速排序)。接着运行combiner在本地节点内存中将每个Map任务输出的中间结果按键做本地聚合(如果设置了的话),可以减少传递给Reducer的数据量。可以通过setCombinerClass()方法来指定一个作业的combiner。当溢出写文件生成数至少为3时(mapreduce.map.combine.minspills属性设置),combiner函数就会在它排序后的输出文件写到磁盘之前运行。

合并

  在写磁盘过程中,另外的20%内存可以继续写入数据,两种操作互不干扰,但如果在此期间缓冲区被填满,那么map就会阻塞写入内存的操作,让写入磁盘操作完成后再执行写入内存。 在map任务写完其最后一个输出记录之后,可能产生多个spill文件,在每个Map任务完成前,溢出写文件被合并成一个索引文件和数据文件(多路归并排序)(Sort阶段)。一次最多合并多少流由io.sort.factor控制,默认为10。至此,Map端的shuffle过程就结束了。
  溢出写文件归并完毕后,Map将删除所有的临时溢出写文件,并告知NodeManager任务已完成,只要其中一个MapTask完成,ReduceTask就开始复制它的输出(Copy阶段分区输出文件通过http的方式提供给reducer)

combiner

如果指定了Combiner,可能在两个地方被调用:
1.当为作业设置Combiner类后,缓存溢出线程将缓存存放到磁盘时,就会调用;
2.缓存溢出的数量超过mapreduce.map.combine.minspills(默认3)时,在缓存溢出文件合并的时候会调用
  combiner可以在输入上反复运行,但并不影响最终结果。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量,combiner简单说就是map端的reduce。其目的是对将要写入到磁盘上的文件先进行一次处理,这样使得map输出结果更加紧凑,减少写到磁盘的数据。如果只有1或2个溢出写文件,那么由于map输出规模减少,因此不会为该map输出再次运行combiner。
  combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入
例如:如果计算只是求总数,最大最小值可以使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错。

压缩

   写磁盘时压缩map端的输出,因为这样会让写磁盘的速度更快,节约磁盘空间,并减少传给reducer的数据量。默认情况下,输出是不压缩的(将mapreduce.map.output.compress设置为true即可启动)

Reduce端

avatar
avatar

1.Copy过程:

  Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。
有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就ok了哦。

2.Merge(合并):

  这里的merge和map端的merge动作类似,只是数组中存放的是不同map端copy来的数值。CCopy过来的数据会先放入内存缓冲区中,然后当使用内存达到一定量的时候才spill磁盘。这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置。
  merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。

3.Reducer的输入文件:

  不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。

4.reduce阶段:

  和map函数一样也是程序员编写的,最终结果是存储在hdfs上的。

四、二次排序

为什么需要二次排序?

  在MapReduce操作时,我们知道传递的<key,value>会按照key的大小进行排序,最后输出的结果是按照key排过序的。有的时候我们在key排序的基础上,对value也进行排序。这种需求就是二次排序。

方法:

将map端输出的<key,value>中的key和value组合成一个新的key(称为newKey),value值不变。这里就变成<(key,value),value>,在针对newKey排序的时候,如果key相同,就再对value进行排序。

PS:一开始觉得既然都要经过reduce端归约,那二次排序有什么意义?其实不同的业务场景,需求也不一样,reduce阶段也不是必须要执行。