您现在的位置是:首页 >技术教程 >Hadoop框架---MapReduce框架原理(上)网站首页技术教程
Hadoop框架---MapReduce框架原理(上)
目录
一.MapReduce框架原理(上)
1.1 InputFormat数据输入
1.1.1 切片与MapTask并行度决定机制
1)问题引出
MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。
思考:1G 的数据,启动 8 个 MapTask,可以提高集群的并发处理能力。那么 1K 的数据,也启动 8 个 MapTask,会提高集群性能吗?MapTask 并行任务是否越多越好呢?哪些因素影响了 MapTask 并行度?
2)MapTask 并行度决定机制
数据块:Block 是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是 MapReduce 程序计算输入数据的单位
,一个切片会对应启动一个 MapTask。
1.1.2 Job提交流程源码和切片源码详解
(1)Job提交流程源码详解
1)打好断点后,在Driver类中开始Debug,看到waitForCompletion
方法直接跳转到了submit
方法:
2)进入到submit方法体内后,可以看到内部又依次
先调用了setUseNewAPI方法
和connect方法
,其中前者
主要是对新旧Hadoop版本之间的map和reduce组件进行设置
,后者
是建立连接
3)connect方法
中主要开展了两项工作
:第一项是创建提交 Job 的代理new Cluster(getConfiguration())
,第二项是判断是本地运行环境还是 yarn 集群运行环境initialize(jobTrackAddr, conf)
:
4)之后开始提交Job—submitJobInternal方法
,进入后可以首先看到checkSpecs方法
,该方法主要是判断输出路径是否已存在
;之后创建给集群提交数据的 Stag 路径
:JobSubmissionFiles.getStagingDir(cluster, conf)
,该方法执行后可以看到在本地文件系统中tmphadoopmapredstaging¸¡746783080.staging
新路径已生成。
5)全速运行,获取 jobid
,并创建 Job 路径
:
注:之后会在
Stag路径
下的jobId下
生成xml文件+jar包+切片信息文件
(三件套),如果是本地模式则jar包不会产生!
6)拷贝 jar 包到集群,注意如果是本地模式则不拷贝,只有集群模式时会拷贝jar包
运行到此时,jobId路径
在本地文件系统中得以生成
:
7)计算切片,生成切片规划文件
(writeSplits方法
):
之后从本地文件系统中可以看到切片文件产生了:
8)向 Stag 路径写 XML 配置文件
(writeConf方法
):
从本地文件系统中看到xml文件也产生了:
9)提交 Job
,返回提交状态:
整个操作的流程图如下:
(2)FileInputFormat切片源码解析
1.1.3 FileInputFormat切片机制
1.1.4 TextInputFormat
1)FileInputFormat 实现类
思考:==在运行 MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。==那么,针对不同的数据类型,MapReduce 是如何读取这些数据的呢?
FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat CombineTextInputFormat 和自定义 InputFormat
等。
2)TextInputFormat
TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text 类型。
以下是一个示例,比如,一个分片包含了如下 4 条文本记录。
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise
每条记录表示为以下键/值对:
(0,Rich learning form)
(20,Intelligent learning engine)
(49,Learning more convenient)
(74,From the real demand for more close to the enterprise)
1.1.5 CombineTextInputFormat切片机制
框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
1)应用场景:
CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。
2)虚拟存储切片最大值设置CombineTextInputFormat.setMaxInputSplitSize(job, 4194304
);// 4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
3)切片机制
生成切片过程包括:虚拟存储过程和切片过程二部分。
1.1.6 CombineTextInputFormat案例实操
1)需求
将输入的大量小文件合并成一个切片统一处理。
(1)输入数据
准备 4 个小文件:
(2)期望
期望一个切片处理 4 个文件
2)实现过程
(1)不做任何处理,运行 1.8 节的 WordCount 案例程序,观察控制台切片个数为 4。
(2)在 WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为 3。
(a)驱动类中添加代码如下:
// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置 4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
(b)运行如果为 3 个切片:
(3)在 WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为 1。
(a)驱动中添加代码如下:
// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置 20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
(b)运行如果为 1 个切片:
1.2 MapReduce工作流程
上面的流程是整个 MapReduce 最全工作流程,但是 Shuffle 过程
只是从第 7 步开始到第16 步
结束,具体 Shuffle 过程详解,如下:
(1)MapTask 收集我们的 map()方法输出的 kv 对,放到内存缓冲区中
(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
(3)多个溢出文件会被合并成大的溢出文件
(4)在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序
(5)ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据
(6)ReduceTask 会抓取到同一个分区的来自不同 MapTask 的结果文件,ReduceTask 会将这些文件再进行合并(归并排序)
(7)合并成大文件后,Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对 Group,调用用户自定义的 reduce()方法)
注:
(1)Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。
(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb 默认 100M。