您现在的位置是:首页 >技术杂谈 >【大数据】Hadoop总结网站首页技术杂谈
【大数据】Hadoop总结
本文对于Hadoop中的HDFS和MapReduce的相关面试重点进行了总结,下篇将介绍调优、数据倾斜等进阶知识。
一、概述
1. Hadoop特性
(1)高可靠性
采用冗余数据存储方式,即使一个副本发生故障,其他副本也可以保证正常对外提供服务。
(2)高效性
作为并行分布式计算平台,Hadoop采用分布式存储和分布式处理两大核心技术,能够高效地处理PB级别数据。
(3)高容错性
采用冗余数据存储方式,自动保存数据的多个副本,并且能够自动将失败的任务重新进行分配。
(4)高可扩展性
Hadoop的设计目标是可以高效稳定地运行在廉价的计算机集群上,可以扩展到数以千计的计算机节点上。
(5)成本低
Hadoop采用廉价的计算机集群,成本比较低,普通用户也很容易用自己的PC搭建Hadoop运行环境。
(6)运行在Linux平台上
Hadoop是基于Java语言开发的,可以较好地运行在Linux平台上。
(7)支持多种编程语言
Hadoop上的应用程序也可以使用其他语言编写,如C++。
2. HDFS结构
- Hadoop主要由HDFS架构(分布式文件系统)和MapReduce架构(并行计算框架)组成。
HDFS 架构
(1)主从结构
- 主节点,只有一个: namenode
- 从节点,有很多个: datanodes
(2)namenode负责:
- 接收用户操作请求
- 维护文件系统的目录结构
- 管理文件与block之间关系
(3)datanode负责:
- 存储文件,文件被分成block存储在磁盘上(每个块128M),为保证数据安全,文件会有多个副本
(4)在Hadoop中,一个文件被划分成大小固定的多个文件块,分布的存储在集群中的节点中。如下图
同一个文件块在不同的节点中有多个副本。如下图
我们需要一个集中的地方(Namenode)保存文件的分块信息。
二、HDFS分布式文件系统
1 概述
- Hadoop Distributed File System -Hadoop分布式文件存储系统
- HDFS为了保证数据存储的可靠性(副本)和读取性能(切块),对数据进行切块后进行复制(保证副本的数量)并存储在集群的多个节点中。
- HDFS中存在一个名字节点NameNode和多个数据节点DataNode
2. HDFS存储数据架构图
NameNode
1.接收用户操作请求
2. 存储元数据信息,元数据保存在内存(为了保证读写效率)以及磁盘(用于崩溃恢复)中
3.保存文件、block、datanode之间的映射关系
DataNode
1.存储block内容
2.存储在磁盘中
3.保存block id与文件的映射关系
3 HDFS优点
1.适合大数据处理
处理数据达到GB、TB甚至PB级别,能够处理百万规模以上的文件数量,数量相当之大。
2.检测和快速应对硬件故障
基于心跳机制
3.流式数据访问
HDFS的数据处理规模比较大,应用一次需要访问大量的数据,同时这些应用一般都是批量处理,而不是用户交互式处理。应用程序能以流的形式访问数据集。主要的是数据的吞吐量,而不是访问速度。
4.简化的一致性模型
大部分HDFS操作文件时,需要一次写入,多次读取。在HDFS中,一个文件一旦经过创建、写入、关闭后,一般就不需要修改了。这样简单的一致性模型,有助于提高吞吐量。
5.高容错性
数据自动保存多个副本,副本丢失后自动恢复。
6.可构建在廉价机器上
构建在廉价机器上可以轻松的通过扩展机器数量来近乎线性的提高集群存储能力。
4 HDFS缺点(不适用HDFS的场景)
1.不适合低延迟数据访问
由于Hadoop针对海量数据的吞吐量做了优化,牺牲了获取数据的延迟,所以对于低延迟来说,不适合用Hadoop来做。如和用户进行交互的应用,需要数据在毫秒或秒的范围内得到响应等。
2.不适合小文件存储
HDFS支持超大的文件,是通过数据分布在DataNode,数据的元数据保存在NameNode上。NameNode的内存大小,决定了HDFS文件系统可保存的文件数量。虽然现在的系统内存都比较大,但大量的小文件还是会影响NameNode的性能。且小文件存储的寻道时间会超过读取时间,它违反了HDFS的设计目标。
3.多用户写入文件、修改文件
hdfs的文件只能有一次写入,不支持修改和追加写入(2.0版本以后支持追加)。
4.不支持超强的事务
没有像关系型数据库那样,对事务有强有力的支持。
5 HDFS技术细节
(hdfs fsck path
查看块信息)
Block
-
数据块(Block)是HDFS中存储文件的最基本的存储单位。
-
在HDFS中存储的超大数据的文件以一个标准分成几块,分别存储到不同的磁盘上,这个标准就称为block。block的默认大小为64M(1.0版本)(2.0版本为128M)。
-
对于文件内容而言,一个文件的长度大小是size,那么从文件的0偏移开始,按照固定的大小,顺序对文件进行划分并编号,划分好的每一个块称一个Block。HDFS默认Block大小是128MB,以一个256MB文件,共有256/128=2个Block.
-
不同于普通文件系统的是,HDFS中,如果一个文件(30M)小于一个数据块的大小(128M),并不占用整个数据块存储空间(实际还是30M)。
NameNode
- NameNode维护元数据信息
NameNode中维护着HDFS中的元数据信息,包括文件和Block之间关系的信息、Block数量信息、Block和DataNode之间的关系信息,数据格式参照如下:
例如: /test/a.log,3,{b1,b2},[{b1:[h0,h1,h3]},{b2:[h0,h2,h4]}]
FileName(文件名)replicas(副本数)block-ids(块id)id-host(id所在主机)
- NameNode磁盘中的文件
NameNode中的元数据信息存储在内存以及文件中,内存中为实时信息,文件中为数据镜像作为持久化存储使用。文件包括:
- fsimage:元数据镜像文件。存储某NameNode元数据信息,并不是实时同步内存中的数据。
- edits:操作日志文件,记录了NameNode所要执行的操作。
- fstime:保存最近一次checkpoint的时间。(上一次更新时间)
- NameNode更新数据细节
合并流程如下图所示:
- 每隔一段时间,Secondary NameNode会和NameNode通信,请求其停止使用Editlog文件,暂时将新到达的写操作添加到一个新的文件edtis.new中。
- Secondary NameNode把NameNode中的FsImage文件和EditLog文件拉回到本地,加载到内存中,对二者执行合并操作,合并为一个新的fsimage,即在内存中逐条执行EditLog中的操作,使得FsImage保持最新。合并结束后,Secondary NameNode会把合并后最新的FsImage文件发送到NameNode。NameNode收到后,会用最新的FsImage文件去替换旧的FsImage文件,同时NameNode将edtis.new改为edits,从未减小EditLog文件的大小。
DataNode
- 在hadoop中,数据是存放在DataNode上面的,以Block的形式存储。
- DataNode节点会不断向NameNode节点发送心跳报告。(状态信息和数据信息)
6 HDFS 副本放置策略
- 默认副本放置策略:
在默认情况下副本数量是3个,所有的DN都是在同一个机架下,此时写block时,三个DN机器的选择是完全随机的。
- 配置机架感知后的副本放置策略:
配置机架感知后,HDFS在选择三个DN时,副本放置策略如下:
(1)把第一副本放在和客户端同一个节点上,如果客户端不在集群中,那么就会随即选一个节点存放。
(2)第二个副本会在和第一个副本不同的机架上随机选一个。
(3)第三个副本会在第二个副本相同的机架上随机选一个不同的节点。
7 何为机架感知
- 定义:在分布式系统中,能够识别不同的机架(rack)和它们之间的拓扑关系,从而更加有效地管理数据和资源。
- hadoop能在系统内部建立一套服务器和机架的位置拓扑图,并且能识别系统节点的拓扑位置。告诉 Hadoop 集群中哪台机器属于哪个机架。
- 为什么要设置机架感知:
(1)开启机架感知,NN可以知道DN所处的网络位置
(2)根据网络拓扑图可以计算出rackid,通过rackid信息可以计算出任意两台DN之间的距离
(3)在HDFS写入block时,会根据距离,调整副本放置策略
(4)写入策略会将副本写入到不同的机架上,防止某一机架挂掉,副本丢失的情况。同时可以降低在读取时候的网络I/O。但是会增加写操作的成本。
8 namenode 被格式化之后产生了哪些文件(edits,fsimage,seen_txid,VERSION)?各自的功能是什么?
- 格式化文件存放路径:
/usr/local/hadoop2.7.1/data/hdfs/name/current
- 格式化文件:
edits 、fsimage、seen_txid、VERSION
(1) fsimage文件:HDFS文件系统元数据的一个永久性检查点,包含HDFS文件系统的所有目录和文件idnode的序列化信息。
(2)edits文件:存放HDFS文件系统的所有更新操作的路径,文件系统客户端所有的写操作首先会被记录到edits文件中。
(3)seen_txid文件保存的是一个数字,即最后一个edits的数字。每次NameNode启动时都会将fsimage文件读入内存,并从0001开始到seen_txid中记录的数字,依次执行每个edits里面更新操作,保证内存中的元数据信息是最新的,同步的,可以看成namenode启动时,将fsimage和edits文件进行合并。
9 Secondary NameNode的功能
(1)首先,可以完成EditLog与FsImage的合并操作,减小EditLog文件大小,缩短名称节点重启时间。
- 每隔一段时间,Secondary NameNode会和NameNode通信,请求其停止使用Editlog文件,暂时将新到达的写操作添加到一个新的文件edtis.new中。
- Secondary NameNode把NameNode中的FsImage文件和EditLog文件拉回到本地,加载到内存中,对二者执行合并操作,合并为一个新的fsimage,即在内存中逐条执行EditLog中的操作,使得FsImage保持最新。合并结束后,Secondary NameNode会把合并后最新的FsImage文件发送到NameNode。NameNode收到后,会用最新的FsImage文件去替换旧的FsImage文件,同时NameNode将edtis.new改为edits,从未减小EditLog文件的大小。
(2)可以作为NameNode的“检查点”,保存NameNode中的元数据信息。
从上面的合并过程可以看出,Secondary NameNode会定期和NameNode通信,从NameNode获取FsImage文件和Edits文件,执行合并操作得到最新的FsImage文件。从这个角度来讲,Secondary NameNode相当于为NameNode设置了一个检查点,周期性地备份NameNode中的元数据信息,当NameNode发生故障,就可以用Secondary NameNode中记录的元数据信息进行系统恢复。
在HDFS设计中,并不支持把系统直接切换到Secondary NameNode,因此,从这个角度来讲,Secondary NameNode只是起到了NameNode的“检查点”作用,并不起到热备份的作用。
10 HDFS执行流程(重要)
HDFS读流程
(1)客户端Client向远程的NameNode发起RPC读请求;
(2)NameNode返回有该block的DataNode地址;客户端Client会选取离客户端最接近的DataNode来读取block;如果客户端本身就是DataNode,那么将从本地直接获取数据;
(3)客户端调用read函数开始读取数据。数据从该数据节点读到客户端,当该数据块读取完毕时,关闭和该数据节点的连接。
(4)输入流查找下一个数据块,找到该数据块的最佳位置节点,读取数据。
(6)当客户端读取完毕数据的时候,关闭输入流。
HDFS写流程
1、客户端与namenode通信请求上传文件,namenode检查目标文件是否已存在,父目录是否存在
2、namenode返回是否可以上传
3、client请求第一个 block该传输到哪些datanode服务器上
4、namenode返回3个datanode服务器ABC
5、client请求3台dn中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将pipeline建立完成,逐级返回客户端
6、client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答
7、当一个block传输完成之后,client再次请求namenode上传第二个block的服务器。
8、当所有数据块都写入成功后,客户端向NameNode发送完成写入的请求,NameNode则更新元数据信息,标记该文件已经可用。
HDFS删除流程
1)先在NameNode上执行节点名字的删除。
2)当NameNode执行delete方法,它只标记操作涉及的要被删除的数据块,而不会主动联系这些数据块所在的DataNode节点。
3)当保存着这些数据块的DataNode节点向NameNode节点发送心跳时,在心跳应答里,NameNode节点会向DataNode发出指令,从而把数据删除掉。
4)所以在执行完delete方法后的一段时间,数据块才能被真正的删除掉。
- 注意:在读写过程中,NameNode只负责地址的记录和查询,所有的数据的读写都是客户端和DataNode直接联系,这种形式的好处在于能够提高NameNode的应答速度,同时提供HDFS的线程并发的能力。
11 何为RPC,RPC的调用过程
(1) 何为RPC
-
RPC——远程过程调用协议
它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
-
hadoop的整个体系结构构建在RPC之上(org.apache.hadoop.ipc)
(2) RPC的调用过程
先启动服务端,再运行客户端,可以分析查看服务端和客户端的输出信息。
- 定义RPC协议(创建接口类)
RPC协议是客户端和服务器端之间的通信接口,它定义了服务器端对外的服务接口。 - 实现RPC协议(创建服务类)
Hadoop RPC 协议通常是一个java接口,用户需要实现该接口。 - 构造并启动RPC Server(创建服务器类)
RPC协议定义完毕,可以介于这个协议创建RPC服务器端了,可以直接使用new RPC.Bulder(conf),来构造一个RPC Server,并调用函数start()启动该服务。 - 构造RPC Client并发送RPC请求(创建客户端类)
使用静态方法getProxy()构造Client代理对象,直接通过代理对象调用远程端口的方法。
12 何为安全模式
在分布式文件系统启动的时候,开始的时候会有安全模式,当分布式文件系统处于安全模式的情况下,文件系统中的内容不允许修改也不允许删除,直到安全模式结束。安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。运行期通过命令也可以进入安全模式。在实践过程中,系统启动的时候去修改和删除文件也会有安全模式不允许修改的出错提示,只需要等待一会儿即可。
- 安全模式时间段:系统刚刚启动,datanode的数据节点位置信息要上报给namenode,这时候还不能写、重命名和删除。这个时间段处于安全模式下。
- NameNode在启动的时候首先进入安全模式,如果datanode丢失的block达到一定的比例(1- dfs.safemode.threshold.pct),则系统会一直处于安全模式状态即只读状态。
三、MapReduce和Yarn基本介绍
1. MapReduce概述
MapReduce基于Google发布的MapReduce论文设计开发,基于分而治之的思想,用于大规模数据集(大于1TB) 的并行计算和离线计算,具有如下特点:
- 高度抽象的编程思想:程序员仅需描述做什么,具体怎么做交由系统的执行框架处理。
- 良好的扩展性:可通过添加节点以扩展集群能力。
- 高容错性:通过计算迁移或数据迁移等策略提高集群的可用性与容错性。
2. 资源调度与分配—Yarn的引入
在Hadoop1.0版本中,只有HDFS和MapReduce, 而资源调度通过MRv1来进行,在2.0中,资源调度由Yarn负责,而MapReduce负责并行计算任务。
Apache Hadoop YARN (Yet Another Resource Negotiator), 中文名为“另一 种资源协调者"。它是一种新的Hadoop资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。
引入了Yarn之后,它可以支持了多种计算模式,比如说离线计算、实时计算以及迭代计算。
因此,我们可以这样理解,HDFS可以理解为分布式硬盘-分布式文件管理系统,YANR可以理解为分布式的操作系统-分布式资源(cpu, memory, disk, network)管理和分布式的进程调度,MapReduce是运行在分布式操作系统YARN上的应用程序。
3. Yarn基本架构
主从结构
- 主节点,只有一个: ResourceManager
- 从节点,有很多个: NodeManager
YARN总体上仍然是master/slave结构,在整个资源管理框架中,resourcemanager为master,nodemanager是slave。
Resourcemanager负责对各个nademanger上资源进行统一管理和调度。当用户提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的ApplicationMaster,它负责向ResourceManager申请资源,并要求NodeManger启动可以占用一定资源的任务。由于不同的ApplicationMaster被分布到不同的节点上,因此它们之间不会相互影响。
(1) ResourceManager
整个集群只有一个,负责集群资源的统一管理和调度
(2) NodeManager
整个集群有多个,负责单节点资源管理和使用
(3) ApplicationMaster
每个应用有一个,负责应用程序的管理
(4) Container
对任务运行环境的抽象(资源分配的基本单位)描述一系列信息
MapReduce on Yarn工作机制
MapReduce2.0运行在YARN之上。YARN由ResourceManager(RM) 和NodeManager(NM)两大块组成
(1)Job(Application)提交
第0步:MR Client端调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
第1步:Client向RM申请一个Application id。
第2步:RM给Client返回该job资源的提交路径和Application id。
第3步:Client提交jar包、切片信息和配置文件到指定的资源提交路径(HDFS)。
第4步:Client提交完资源后,向RM申请运行MrAppMaster(适配YARN的ApplicationMaster)。
(2)Job(Application)初始化
第5步:当RM收到Client的请求后,将该job添加到容量调度器中(任务队列)。
第6步:某一个空闲的NM领取到该job。
第7步:该NM创建Container(资源抽象),并通过命令启动运行MrAppMaster。
第8步:下载Client提交的资源到本地。
(3)资源分配
第9步:MrAppMaster向RM申请运行多个maptask任务资源。
第10步:RM将运行maptask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(4)资源调度
第11步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动maptask,maptask对数据分区排序。
第12步:MrAppMaster等待所有maptask运行完毕后,向RM申请容器,运行reduce task。
第13步:reduce task向maptask获取相应分区的数据。
第14步:程序运行完毕后,MR会向RM申请注销自己。
(5)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外, 客户端每5分钟都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
4. Yarn的工作流程
步骤1 用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
步骤2 ResourceManager为该应用程序分配第一个Container(这里可以理解为一种资源比如内存),并与对应的Node-Manager通信,要求它在这个Container中启动应用程序的ApplicationMaster。
步骤3 ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。
步骤4 ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
步骤5 一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。
步骤6 NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
步骤7 各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。
在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
步骤8 应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己。
5. MapReduce详细执行流程
mapreduce计算框架一般分为三个阶段:map、shuffle和reduce
- 在map阶段,将输入文件划分为split,一般map进程的数量为split分片的数量,然后map将之解析为key,value,做自己的逻辑处理转换,输出新的key,value。
- map结果输出之后,将key,value,计算出的partition写入内存环形缓冲区中,当达到一定阈值后,将之锁定,并启动溢写线程,在写入磁盘之前根据快速排序使得partition有序,partition内部key有序,如果有combiner就合并。没有就将该内容写入本地磁盘。
- 写入时不影响map写入缓存,写入结束会产生很多溢写文件,我们需要对这些文件进行merge,利用归并排序,将多个文件合并为一个文件。
- 此时map shuffle结束,进行reduce shuffle,一个reduce从不同map拉取同一个分区的数据,然后进行reduce端的merge操作,这个和map merge很像,如果内存资源足够就放内存,然后merge,如果不够就放磁盘merge。这里也会排序,将相同序号的分区进行合并排序。
- 将结果输入到reduce端,输入形式:<key, {value list}>,写逻辑,输出key,value。
- 将reduce输出保存到文件中。
内存环形缓冲区作用:批量收集map结果,减少磁盘IO的影响。
以下是涉及到的排序算法:
5.1 快速排序
当数据量较小时(当前为小于13)采用插入排序,否则采用快速排序;当快速排序递归到一定深度时,采用堆排序。这种算法也称为introsort,避免了快速排序最坏情况的发生。
5.2 归并排序
基本思想:对于给定的一组集合,利用递归与分治技术将数据序列划分成为越来越小的子集合,再对子集合排序,最后再用递归方法将排好序的子集合合并成为越来越大的有序序列。
经过第一轮比较后得到最小的记录,然后将该记录的位置与第一个记录的位置交换;接着对不包括第一个记录以外的其他记录进行第二次比较,得到最小记录并与第二个位置记录交换;重复该过程,知道进行比较的记录只剩下一个为止。
5.3 MapReduce过程中的几次排序
在MapReduce的shuffle过程中通常会执行三次排序,分别是:
- Map的溢写阶段:根据分区以及key进行快速排序
- Map的合并溢写文件:将同一个分区的多个溢写文件进行归并排序,合成大的溢写文件
- Reduce输入阶段:将同一分区,来自不同Map task的数据文件进行归并排序
此外,在MapReduce整个过程中,默认是会对输出的KV对按照key进行排序的,而且是使用快速排序。
Map输出的排序,其实也就是上面的溢写过程中的排序。
Reduce输出的排序,即Reduce处理完数据后,MapReduce内部会自动对输出的KV按照key进行排序。
6 hadoop的shuffle过程详解
6.1 Map端的shuffle
Map端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill。 在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序。partition的目是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据。接着运行combiner(如果设置了的话),combiner的本质也是一个Reducer,其目的是对将要写入到磁盘上的文件先进行一次处理,这样,写入到磁盘的数据量就会减少。最后将数据写到本地磁盘产生spill文件(spill文件保存在{mapred.local.dir}指定的目录中,Map任务结束后就会被删除)。
最后,每个Map任务可能产生多个spill文件,在每个Map任务完成前,会通过多路归并算法将这些spill文件归并成一个文件。至此,Map的shuffle过程就结束了。
6.2 Reduce端的shuffle
Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。 首先要将Map端产生的输出文件拷贝到Reduce端,但每个Reducer如何知道自己应该处理哪些数据呢?因为Map端进行partition的时候,实际上就相当于指定了每个Reducer要处理的数据(partition就对应了Reducer),所以Reducer在拷贝数据的时候只需拷贝与自己对应的partition中的数据即可。每个Reducer会处理一个或者多个partition,但需要先将自己对应的partition中的数据从每个Map的输出结果中拷贝过来。 接下来就是sort阶段,也成为merge阶段,因为这个阶段的主要工作是执行了归并排序。从Map端拷贝到Reduce端的数据都是有序的,所以很适合归并排序。最终在Reduce端生成一个较大的文件作为Reduce的输入。
最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上。
6.3 shuffle 总结
(1) 分区(partitioner):按照key的不同将数据分区处理,以便将来reduce取用(此步在缓冲区中进行)。分区时可以自定义的,如果不自定义的话,默认所有数据都在一个分区。
(2) 排序(sort):此步透明,按照key值进行排序,此步在缓冲区中进行。
(3) 合并(combine):一次reduce的预演,减少将来reduce的工作量(此步可做可不做,注意特殊场景)。
(4) copy:此步透明,数据的拉取,不同的reduce按照partition的分区,拉取不同的数据。
(5) 归并(merge):此步透明,因为拉取的是文件,需要将不同节点的数据合并写入文件。
6.4 小知识点
Split分片默认大小,分片调用的方法?
1.split的大小是多少
切分成多少个Split,那么就有多少个Map任务执行,一个Map任务只处理一个Split(默认)。
一个数据分片就是一个块,默认大小:128M
2.分片调用的方法
FileInputFormat类:getsplits方法
四、Hadoop补充知识
1. Hadoop序列化作用,要实现的接口,key必须要实现的接口
序列化在分布式环境的两大作用:
(1)进程间通信
(2)永久存储。
(3)为了更高效地处理数据。当处理大量数据时,序列化可以减少数据传输的大小,并且可以更快地将数据从磁盘加载到内存中。
要实现的接口:Writable
Key必须要实现的接口:WritableComparable
2. 为什么Key必须要实现WritableComparable接口呢?原因有以下几点:
a. 序列化和反序列化:Hadoop需要将Key进行序列化和反序列化,以便在MapReduce任务之间进行传递和存储。因此,Key必须实现Writable接口。
b. 排序:在MapReduce中,Map阶段产生的Key需要按照某种方式进行排序,才能保证Reduce阶段得到的数据是按照预期的方式分组的。为了实现排序,Key必须实现Comparable接口。
c. 分组:在MapReduce中,Reduce阶段的数据是按照Map阶段产生的Key进行分组的。因此,如果需要自定义分组方式,Key也必须实现Comparable接口。
综上所述,为了实现在Hadoop中高效地排序和分组,以及在MapReduce任务之间进行传递和存储,Key必须实现WritableComparable接口。
3. 手写WordCount
/*
* LongWritable对应输入的key类型,默认是行的偏移量LongWritable
* Text,对应上输入的value类型,默认行数据Text
* Text:对应输出的key类型,不能使用默认值,需要根据需求更改
* Text:对应输出的value类型,根据需求修改
* @author lesie
* 要求输出的格式(key,1)
* 单词计数输出的key类型为Text
* 输出的value类型为IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/*
* KEYIN
* VALUEIN
* context--环境对象,输出结果
* @see org.apach.hadoop.mapreduce.Mapper#map(KEYIN,VALUEIN,...)
*/
public void map(LongWritable ikey,Text ivalue,Context context) throws IOException, InterruptedException
{
//获取一行数据
String line=ivalue.toString();
//按空格切片
String []arrs=line.split(" ");
for(String arr:arrs)
{
context.write(new Text(arr),new IntWritable(1));
}
}
}
/*
* reducer的数输入key用公式mapper输出的key类型
* valuein:reducer的输入value应该是mapper输出的value类型
* keyout:根据业务而定
* valueout:根据业务而定
* @author lesie
* 工作机制:
* 1.将key相同的value进行合并,形成一个Iterable,交给程序
* eg:(hello,<1,1,1,1,1,1>)
* 2.reduce方法执行的次数取决于mapper输出的key,有多个不同的key执行多少次
* 3.默认的排序,对key进行排序,先按照数字进行排再按照字典顺序
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// process values
//定义计数变量
int sum=0;
//进行累加操作
for (IntWritable val : values) {
//通过get方法取出其中的值
sum+=val.get();
}
//输出数据,最终结果,key是单词Text,value是单词出现的总次数
context.write(_key, new IntWritable(sum));
}
}
public class WordCountDriver {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
//获取当前配置
Configuration conf=new Configuration();
//获取一个表示当前Mapreduce作业的Job对象,向ahdoop申请一个job任务执行逻辑
Job job=Job.getInstance();
//指定程序入口
job.setJarByClass(WordCountDriver.class);
//设置需要执行的Mapper类
job.setMapperClass(WordCountMapper.class);
//设置Reducer类
job.setReducerClass(WordCountReducer.class);
//设置Mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置Reducer的输出结果类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入路径
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.140.128:9000/wc/words.txt"));
//设置输出路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.140.128:9000/wc/result6"));
//任务的提交
job.waitForCompletion(true);
}
}
4. 手写Top-K
public class TopKMapper extends Mapper<LongWritable, Text, NullWritable, IntWritable> {
private int k = 10; // TopK值
private PriorityQueue<Integer> heap; // 堆
@Override
public void setup(Context context) {
k = context.getConfiguration().getInt("k", 10); // 从Configuration中获取TopK值
heap = new PriorityQueue<>(k, Collections.reverseOrder()); // 初始化堆
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
int num = Integer.parseInt(value.toString()); // 获取输入数据
if (heap.size() < k) { // 如果堆未满,则将数据加入堆中
heap.offer(num);
} else {
int smallest = heap.peek(); // 获取当前堆中的最小值
if (num > smallest) { // 如果输入数据大于最小值,则替换堆中最小值
heap.poll();
heap.offer(num);
}
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
for (int num : heap) { // 将堆中的TopK数据输出
context.write(NullWritable.get(), new IntWritable(num));
}
}
}
public class TopKReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> {
private int k = 10; // TopK值
private PriorityQueue<Integer> heap; // 堆
@Override
public void setup(Context context) {
k = context.getConfiguration().getInt("k", 10); // 从Configuration中获取TopK值
heap = new PriorityQueue<>(k, Collections.reverseOrder()); // 初始化堆
}
@Override
public void reduce(NullWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable value : values) { // 遍历输入数据
int num = value.get();
if (heap.size() < k) { // 如果堆未满,则将数据加入堆中
heap.offer(num);
} else {
int smallest = heap.peek(); // 获取当前堆中的最小值
if (num > smallest) { // 如果输入数据大于最小值,则替换堆中最小值
heap.poll();
heap.offer(num);
}
}
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
for (int num : heap) { // 将堆中的TopK数据输出
context.write(NullWritable.get(), new IntWritable(num));
}
}
}
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class TopKApp {
// 定义输入路径和输出路径
private static final String INPUT_PATH = "hdfs:/xxx/topk_input";
private static final String OUTPUT_PATH = "hdfs://xxx/topk_output";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
Path outputPath = new Path(OUTPUT_PATH);
// 如果输出路径已经存在,则先删除
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
// 创建一个MapReduce任务,并设置相关的配置信息
Job job = Job.getInstance(conf, "TopKApp");
job.setJarByClass(TopKApp.class);
// 设置Mapper和Reducer的类
job.setMapperClass(TopKMapper.class);
job.setReducerClass(TopKReducer.class);
// 设置Mapper的输出类型
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
// 设置Reducer的输出类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// 设置输出文件格式
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, outputPath);
// 等待任务完成
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}