hadoop源码分析之MapReduce(一)-程序员宅基地

技术标签: 应用服务器  Hadoop  网络应用  Mapreduce  配置管理  mapreduce&parallel  

      hadoop的源码已经粗看过一遍,但每次想要了解细节的时候,还得去翻代码. 看了又是忘记. 所以我决定这些天把其中的重要的细节记下来。

声明:

1. 本文假设读者已经掌握一些MapReduce的基本概念,曾经编写过MapReduce程序。

2. 此源代码分析是基于hadoop svn的trunk之上(目前0.20.0-dev),由于hadoop正在换新的MapReduce api(org.apache.hadoop.mapreduce包), 以后很多类会弃用,很多接口会改变,这儿只能尽量保持同步。

3. 关于hdfs源代码可以参考caibinbupt的hdfs源代码分析 ,这儿就不再详述。

4. 这篇文章是基于javen 的分析之上的,感谢javen的辛勤劳动。javen的源码分析是在早期的hadoop版本上,在这儿有一些内容会不一样。

一、基本概念

1.1 MapReduce逻辑过程
1.2 MapReduce物理分布

二、实现细节

2.1 总体结构

我们在编写MapReduce程序时通常是上是这样写的:

    Configuration conf = new Configuration(); // 读取hadoop配置
    Job job = new Job(conf, "作业名称"); // 实例化一道作业
    job.setMapperClass(Mapper类型);
    job.setCombinerClass(Combiner类型);
    job.setReducerClass(Reducer类型);
    job.setOutputKeyClass(输出Key的类型);
    job.setOutputValueClass(输出Value的类型);
    FileInputFormat.addInputPath(job, new Path(输入hdfs路径));
    FileOutputFormat.setOutputPath(job, new Path(输出hdfs路径));
    // 其它初始化配置
    JobClient.runJob(job);


一道MapRedcue作业是通过JobClient.rubJob(job)向master节点的JobTracker提交的, JobTracker接到JobClient的请求后把其加入作业队列中。在这之前master节点的NameNode, SecondedNameNode,JobTracker和slaves节点的DataNode, TaskTracker都已经启动。JobTracker一直在等待JobClient通过RPC提交作业,而TaskTracker一直通过RPC向 JobTracker发送心跳heartbeat询问有没有任务可做,如果有,让其派发任务给它执行。如果JobTracker的作业队列不为空, 则TaskTracker发送的心跳将会获得JobTracker给它派发的任务。这是一道pull过程: slave主动向master拉生意。slave节点的TaskTracker接到任务后在其本地发起Task,执行任务。以下是简略示意图:


2.1.1 Mapper和Reducer

运行于Hadoop的MapReduce应用程序最基本的组成部分包括一个Mapper和一个Reducer类,以及一个创建JobConf的执行程序,在一些应用中还可以包括一个Combiner类,它实际也是Reducer的实现。

2.1.2  JobTracker和TaskTracker

它们都是由一个master服务JobTracker和多个运行于多个节点的slaver服务TaskTracker两个类提供的服务调度的。master负责调度job的每一个子任务task运行于slave上,并监控它们,如果发现有失败的task就重新运行它,slave则负责直接执行每一个task。TaskTracker都需要运行在HDFS的DataNode上,而JobTracker则不需要,一般情况应该把JobTracker部署在单独的机器上。

2.1.3 JobClient

每一个job都会在用户端通过JobClient类将应用程序以及配置参数Configuration打包成jar文件存储在HDFS,并把路径提交到JobTracker的master服务,然后由master创建每一个Task(即MapTask和ReduceTask)将它们分发到各个TaskTracker服务中去执行。

2.1.4 JobInProgress

JobClient提交job后,JobTracker会创建一个JobInProgress来跟踪和调度这个job,并把它添加到job队列里。JobInProgress会根据提交的job jar中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用于监控和调度MapTask,同时在创建指定数目的TaskInProgress用于监控和调度ReduceTask,缺省为1个ReduceTask。

2.1.5 TaskInProgress

JobTracker启动任务时通过每一个TaskInProgress来launchTask,这时会把Task对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress,作用类似)用于监控和调度该Task。启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。TaskRunner会自动装载job jar,并设置好环境变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中。

2.1.6  MapTask和ReduceTask

一个完整的job会自动依次执行Mapper、Combiner(在JobConf指定了Combiner时执行)和Reducer,其中Mapper和Combiner是由MapTask调用执行,Reducer则由ReduceTask调用,Combiner实际也是Reducer接口类的实现。Mapper会根据job jar中定义的输入数据集按<key1,value1>对读入,处理完成生成临时的<key2,value2>对,如果定义了Combiner,MapTask会在Mapper完成调用该Combiner将相同key的值做合并处理,以减少输出结果集。MapTask的任务全完成即交给ReduceTask进程调用Reducer处理,生成最终结果<key3,value3>对。这个过程在下一部分再详细介绍。

 

2.2 JobTracker与作业处理

2.2.1 JobClient提交作业

JobClient.runJob(job)静态方法会实例化一个JobClient实例,然后用此实例的submitJob(job)方法向 master提交作业。此方法会返回一个RunningJob对象,它用来跟踪作业的状态。作业提交完毕后,JobClient会根据此对象开始轮询作业的进度,直到作业完成。
submitJob(job)内部是通过submitJobInternal(job)方法完成实质性的作业提交。  submitJobInternal(job)方法首先会向hadoop分布系统文件系统hdfs依次上传三个文件: job.jar, job.split和job.xml。
job.xml: 作业配置,例如Mapper, Combiner, Reducer的类型,输入输出格式的类型等。
job.jar: jar包,里面包含了执行此任务需要的各种类,比如 Mapper,Reducer等实现。
job.split: 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。
这三个文件在hdfs上的路径由hadoop-default.xml文件中的mapreduce系统路径mapred.system.dir属性 + jobid决定。mapred.system.dir属性默认是/tmp/hadoop-user_name/mapred/system。写完这三个文件之后, 此方法会通过RPC调用master节点上的JobTracker.submitJob(job)方法,此时作业已经提交完成。关于RPC的细节,后续章节将会阐述。

2.2.2 JobTacker调度作业

JobTracker接到JobClient提交的作业后,即在JobTracker.submitJob(job)方法中,首先产生一个JobInProgress对象。此对象代表一道作业,它的作用是维护这道作业的所有信息,包括作业剖析JobProfile和最近作业状态JobStatus,并登记此作业所有Tasks进任务表中。随后JobTracker将此JobInProgress对象通过listener.jobAdded(job)方法加入到调度队列中,并用一个成员变量jobs来维护所有的作业。

下面将说明hadoop的作业调度


作业调度在hadoop-0.19.0版得到了很大的改进,原来的调度策略规定是先进先出(FIFO)的。随着hadoop的商业应用增多,各个公司对它的需求也增多。其中Facebook公司提交了一个公平调度器Fair Scheduler; Yahoo!公司提交了Capacity Scheduler。它们分别在hadoop源码树的src/contrib/fairscheduler和src/contrib/capacity- scheduler目录中。而hadoop默认的调度器是FIFO策略的JobQueueTaskScheduler,它有两个成员变量jobQueueJobInProgressListener与eagerTaskInitializationListener。

      其中eagerTaskInitializationListener负责任务Task的初始化。其具体实现是这样的: 这个listener在初始化时会开启一个JobInitThread线程,当作业通过jobAdded(job)加入到初始化队列jobInitQueue中,根据作业的优先级排序(resortInitQueue方法)后, 这个线程就会调用JobInProgress.initTasks()立即初始化作业的所有任务。

2.2.3 JobInProgress初始化任务

 

       任务Task分两种: MapTask 和reduceTask,它们的管理对象都是TaskInProgress 。
 JobInProgress.initTasks()方法首先从JobClient上传的job.split文件中读取所有数据块的列表,然后根据这个列表创建对应数目的Map执行管理对象TaskInProgress。创建这些TaskInProgress对象完毕后,initTasks()方法会通过createCache()方法为这些对象产生一个未执行任务的Map缓存nonRunningMapCache。slave端的TaskTracker向master发送心跳时,就可以直接从这个cache中取任务去执行。createCache()方法的作用是为以上TaskInProgress对象在网络拓扑结构上分配拥有此任务数据块的节点。从近到远一层一层地寻找,首先是同一节点,然后在寻找同一机柜上的节点,接着寻找相同关换机下的节点,直到找了maxLevel层结束。这样的话,在JobTracker给TaskTracker派发任务的时候,可以迅速找到最近的TaskTracker,让它执行任务。
 其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,缺省只创建1个Reduce任务。监控和调度Reduce任务的也是TaskInProgress类,不过构造方法有所不同,TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask。同样地,initTasks()也会通过createCache()方法对这些TaskInProgress对象寻找maxLevel层的可行TaskTracker,进而产生nonRunningReduceCache成员。
 JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,然后再调用JobHistory.JobInfo.logStarted()记录job的执行日志。到这里JobTracker里初始化job的过程全部结束,执行则是通过另一异步的方式处理的,下面接着介绍它。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/coderplay/article/details/83325205

智能推荐

keepalive单实例通过脚本绑定外部服务实现主备自动切换_keepalived主备模型,当主节点宕机 vip挂载到备机时调用一个脚本如何实现-程序员宅基地

文章浏览阅读726次。keepalive的算法选择的实际操作_keepalived主备模型,当主节点宕机 vip挂载到备机时调用一个脚本如何实现

使用Maven搭建Struts2+Spring3+Hibernate4的整合开发环境_maven struts2+hibernate4-程序员宅基地

文章浏览阅读2.6k次,点赞4次,收藏11次。使用Maven搭建Struts2+Spring3+Hibernate4的整合开发环境一.新建Maven项目1.新建一个Web Project创建好的项目如下图所示:2.修改默认的JDK右键点击,选择Properties3.创建Maven标准目录    src/main/java     src/main/resources_maven struts2+hibernate4

udp一对多聊天java_用udp方式进行聊天的java实现. (转)-程序员宅基地

文章浏览阅读307次。用udp方式进行聊天的java实现. (转)[@more@]我最近也在研究用实现oicq。这是我的客户端的简单实现。主要是通过udp方式,实现间的对话。其中运用了序列化类的方法。以下是,请多提意见。//发送消息类Message.javaimport java.io.Serializable;public class Message implements Serializable{private S..._java客服一对多对话udp

图解TCPIP-ICMP_icmp不可达消息 图解tcp/ip-程序员宅基地

文章浏览阅读231次。ICMP1.确认IP包是否成功送达目标地址2.通知IP包被废弃的具体原因3.改善网络设置4.类型(0回送应答echo reply;3目标不可达Destination Unreachable;)常见类型:3.Destination Unreachable Message5.ICMP Redirect Message路由器发现发送端主机使用了次优路径发送数据,就会返回ICMP重定向(..._icmp不可达消息 图解tcp/ip

合并两个DBC文件的方法_dbc合并-程序员宅基地

文章浏览阅读4.5k次,点赞3次,收藏9次。合并两个DBC文件的方法工具步骤实例配置库及canconvert的其他使用参考工具首先准备好合并文件的工具1.利用python的pip安装canmatrix库。语句:pip install canmatrix2.安装完成后会在pip.exe的附近找到canconvert.exe利用该脚本即可完成合并。(根据提示用pip安装需要的其他库)接下来跟大家讲讲具体步骤,很简单!步骤直接在命令窗中敲出语句:canconvert --merge=second.dbc source.dbc ta_dbc合并

SSA-LSSVM分类预测 | Matlab 麻雀优化最小二乘支持向量机分类预测-程序员宅基地

文章浏览阅读18次。在机器学习领域,数据分类是一个非常重要的任务,它可以帮助我们将数据分成不同的类别,从而更好地理解和分析数据。支持向量机(Support Vector Machine,SVM)是一种常用的分类算法,它通过构建一个最优的超平面来实现数据分类。然而,传统的SVM算法在处理大规模数据集时存在一些问题,例如计算复杂度高、模型泛化能力不足等。为了解决这些问题,研究人员提出了一种基于麻雀算法优化的最小二乘支持向量机(SSA-LSSVM)。

随便推点

wsl实现图形界面显示_wsl打开图片-程序员宅基地

文章浏览阅读969次。1、ubuntu安装安装 VcXsrvsudo apt-get install ubuntu-desktop unity compizconfig-settings-manager2、配置显示方式export DISPLAY=localhost:03、安装ccsmsudo apt install compizconfig-settings-manager4、打开ccsmccsm5、按照图片勾选好了以后,点击close按钮就好了。..._wsl打开图片

iOS圆角和阴影并存的两种实现方法_ios 圆角加阴影-程序员宅基地

文章浏览阅读6.7k次。圆角和阴影无法共存的原因就是因为这句代码。Because shadow is an effect done outside the View, and that masksToBounds set to YES will tell the UIView not to draw everything that is outside itself.这句话的意思就是,圆角都是我给你割出来的,圆角外面的..._ios 圆角加阴影

RecycleView+卡片+下拉刷新_cf8833 recycleview-程序员宅基地

文章浏览阅读323次。1.添加依赖 implementation 'androidx.cardview:cardview:1.0.0' implementation 'androidx.recyclerview:recyclerview:1.0.0' implementation "androidx.swiperefreshlayout:swiperefreshlayout:1.1.0"2.主界面package com.example.myapplication3import androi_cf8833 recycleview

CTO说:怎么成为并做好CTO-程序员宅基地

文章浏览阅读7.3k次,点赞2次,收藏14次。CTO 可能是大多数开发者心中的梦想,但这些问题:到底什么样的人适合做 CTO ?CTO 要做哪些事情?CTO 持有公司多少股权合适?恐怕多数开发者都搞不清楚……包括我自己,也是五迷三道!直到我看了下面这本书:这本《CTO说》,汇聚了 30 余位 CTO 导师(360副总裁、知乎CTO、京东商城总架构师等等)的经验,可以让我们习得 CTO 成长的道与术。全书共分六篇:CTO的大格局创业平台CTO的_cto说

三分钟教你读懂支票是什么_支票的原理是什么-程序员宅基地

文章浏览阅读8.6k次。三分钟教你读懂支票是什么支票1、支票的概念及特点支票:出票人签发的,委托办理支票存款业务的银行或其他金融机构在见票时无条件支付确定金额给收款人或持票人的票据。支票必填项:支票字样、确定的金额、出票日期、无条件支付委托、付款人名称、出票人签章。支票选填项:付款地、出票地。支票结算特点:(1)简便,手续_支票的原理是什么

POJ 3686 最小费用最大流(拆点建图)_poj3686-程序员宅基地

文章浏览阅读879次。思路:这题还挺难的。刚开始看错题目意思了,然后建图错了得不出答案,然后看了下别人的题解,原来每个矩阵的点还在拆成n个点才得。然后昨晚看了1个小时多小时没理解,今早过来再看,然后用别人的代码运行了一下我自己想出的样例,然后才慢慢理解。解题分析:《参考博客:http://blog.csdn.net/weiguang_123/article/details/7881799》 假设某个_poj3686