您现在的位置是:首页 >学无止境 >MapReduce调优网站首页学无止境

MapReduce调优

不知落叶何时落 2024-06-16 12:01:02
简介MapReduce调优

MapReduce应用场景

优点

易于编程

固定的八股文编程模式,简单的定义即可快速实现开发分布式程序。用户往往专注于业务问题即可

易拓展

可以简单地通过增加机器来拓展MapReduce的计算能力

高容错

Hadoop会自动通过YARN完成机器级别的故障转移,如果某一台机器挂掉了,可以将上面的计算任务转移到另一个节点上运行,不至于任务失败,而且这个过程不需要人工参与,完全是由hadoop内部完成的

适用于PB级别的大数据量

理论上只要机器硬件足够多就可以出路无穷大的数据量,Hadoop设计之初就是基于链接PC机器构建大型分布式集群

缺点

处理速度很慢,不适合时效性要求较高的场景

数据存储主要使用静态数据,不适合实时数据流

MapReduce主要处理的数据来源自文件系统,所以无法向MySQL那样在毫秒或秒级别内返回数据,并且文件系统的数据是静态的,MapReduce则无法处理实时的流式数据

只有Map 和Reduce阶段,缺乏DAG设计

DAG 有向无环图

MapReduce处理数据过程中,如果需要经过多个步骤来实现,一个MapReduce就无法完成,如果通过多个MapReduce来实现,那么就必须将前一个MapReduce的结果写入磁盘,导致大量的IO小号导致MapReduce性能较差

擅长应用场景

TopN 问题

从海量数据中查询出现频率最高的前N个

Web日志访问频率统计

统计url、用户、搜索出现的频率统计

数据倒排索引

基于数据构建倒排索引,实现基于复杂条件词的数据检索

不擅长应用

迭代计算

从某个值开始,不断地由上一部结果计算或者推断出下一步的结果

机器学习
分类、聚类、关联、预测

连接计算

Join关联

MapReduce优化需求与方向

需求

基于MapReduce所存在的优化性能问题,在实际工作中可以通过优化方案来提高MapReduce整体性能,从而节约生产成本

优化方向

  1. 从文件角度考虑,通过更改二进制文件、列式存储、压缩来降低磁盘以及网络IO,进而提高性能
  2. 通过控制MapReduce过程中的资源属性,合理分配资源,提高资源利用率,提高程序运行效率

文件存储格式

行式存储、列式存储

行式存储(Row - Based) 同一行数据存储在一起

列式存储(Column-Based): 同一列数据存储在一起

image-20230510154419105

优缺点

行式存储的写入时一次性完成的,消耗的时间比列式存储少,并且能够保证数据的完整性,缺点是数据通过读取过程中会产生冗余数据,如果只有少量数据,此影响可以忽略,数量大可能会影响到数据的处理效率。行式存储适合插入,不适合查询

列式存储再写入效率、保证数据完整性上都不如行式存储,它的优点是在读取过程中不会产生冗余数据,这对数据完整性要求不高的大数据处理领域,比如说互联网来说非常重要。列式存储适合查询,不适合插入

Sequence File

SequenceFile时Hadoop提供的一种二进制文件存储格式

一条数据称之为record(记录),底层直接以<Key,Value>键值对形式序列化到文件中

image-20230510155310982

优缺点

优点
二进制格式存储 比文本文件更加紧凑

支持不同级别压缩(基于Record或者Block压缩)

文件可以拆分和并行处理,适用于MapReduce程序

局限性

二进制格式文件不方便查看

特定于Hadoop,只有Java Api可以与之交互,未提供多种语言支持

Sequence File格式

根据压缩类型,有三种不同的Sequence File格式: 未压缩格式、Record压缩格式、Block压缩格式

Sequence File有一个header和一个或者多个record组成,以上三种格式均使用相同的header结构:

前三个字节未SEQ,表示该文件是序列文件,后跟一个字节表示实际版本号(例如SEQ4或者SEQ6),Header中也包括其他key、value、class名字、压缩细节、metadata、Sync marker,Sync Marker 同步标记,用于可以读取任务位置的数据。

image-20230510155934721

Sequence File 未压缩格式

未压缩Sequence File文件由header、record、sync三部分组成,其中record包括4各部分:record length(记录长度)、key length(键长)、key、value

每隔几个record(100字节左右)就会有一个同步标记

image-20230510160133584

Sequence File 基于record压缩格式

基于record压缩的Sequence File文件由header、record、sync三个部分组成,其中record包含了4各部分:record length(记录长度)、key length(键长)、key、compressed value(被压缩的值)

每隔几个record(100字节左右)就会有一个同步标记

image-20230510160604761

Sequence File基于block压缩格式

基于block压缩的Sequence File文件由header、block、sync三个部分组成

block指的是record block,可以理解为多个record记录组成的块,注意,这个block和HDFS中的分块存储不是一个概念,Block 中包括:record条数、压缩key长度、压缩的keys、压缩的value长度、压缩的values,每隔一个block就有一个同步标记

blocky阿索比record压缩提供更多的压缩率,使用Sequence File时,通常首选块压缩

image-20230510160854987

生成Sequence File文件

  1. 使用TextInputFormat读取不同文字文件
  2. Map阶段对读取文件的每一行进行输出
  3. Reduce阶段直接输出每条数据
  4. 使用SequenceFileOutPutFormat将结果进行SequenceFile
// 设置输出格式 在这里输出格式为SequenceFile格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 设置压缩类型 在这里选择的是BLOCK压缩格式
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);

去查看输出的样式

image-20230510163428238

读取Sequence File文件

  1. 使用SequenceFileInputFormat读取SequenceFile
  2. Map姐u但直接读取每一条数据
  3. Reduce阶段直接输出,每一条数据
  4. 使用TextOutputFormat将结果保存为文本文件

只需要在这里进行修改即可

job.setInputFormatClass(SequenceFileInputFormat.class);

使用Sequece File合并小文件

假设HDFS某个目录下有多个小文件,这些文件虽然磁盘占用空间不大,但是内存空间中用却不少(元数据存储在内存中)

可以编写一个程序将所有的小文件写入到一个Sequence File中,即将文件名作为key,文件内容作为value序列化到Sequence File大文件中,这就是所谓的使用Sequence File合并小文件

有序二进制文件 MapFile

介绍

可以理解MapFile是排序后的SequenceFile,通过观察结构可以看到MapFile由两部分组成,分别是data和index,data为存储数据的文件,index作为文件的数据索引,主要记录了每个Record的Key值以及Record在文件中的偏移位置

image-20230511090507083

优点

在MapFile被访问的时候,索引文件会被索引映射关系可以迅速定位到指定的Record文件所在的文件位置,因此,相对于Sequence File而言,MapFile加载到内存,通过索引效率最高

缺点

会消耗一部分内存来存储index数据

案例

生成MapFile文件

生成MapFile文件和生成Sequence File的方法相同,其他地方无需改动,只需要在Driver阶段指定输出的格式就可

package MapFileTest;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;
import java.util.Random;

public class MapFileCreate extends Configured implements Tool {
    private static class MapFileMapper extends Mapper<LongWritable, Text, IntWritable,Text> {
        private IntWritable outKey =new IntWritable();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            Random random = new Random();
            outKey.set(random.nextInt(10000));
            context.write(outKey,value);
        }
    }
    private static class MapFileReduce extends Reducer<IntWritable,Text,IntWritable,Text>{
        @Override
        protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Iterator<Text> iterator = values.iterator();
            while (iterator.hasNext()){
                context.write(key,iterator.next());
            }
        }
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new MapFileCreate(),args);
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), MapFileCreate.class.getSimpleName());
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setJarByClass(MapFileCreate.class);
        job.setMapperClass(MapFileMapper.class);
        job.setReducerClass(MapFileReduce.class);
        // todo 在这里进行设置,设置最终的输出格式是MapFile格式
        job.setOutputFormatClass(MapFileOutputFormat.class);
        FileInputFormat.addInputPath(job,new Path("E:\MapReduceTest\input"));
        FileOutputFormat.setOutputPath(job,new Path("E:\MapReduceTest\MapFileOut"));
        return job.waitForCompletion(true) ? 0 : 1;
    }
}

运行后查看输出效果

image-20230511092456956

image-20230511092505488

查看data文件内容

image-20230511092612437

可见本质上就是Sequence 文件

读取MapFile文件生成Text文件

MapReduce中没有封装MapFile的读取输入类,工作中可以根据情况选择以下两种方案来实现:

  1. 自定义InputFormat,使用MapFileOutputFormat中的getReader方法来获取读取对象
  2. 使用SequenceFileInputFormat对MapFile文件进行解析
    1. 因为MapFile本质上就是一个Sequence + 索引优化,直接使用SequenceFileOutputFormat无非是放弃了索引优化直接来获取内容罢了

列式存储ORCFIle

ORC(OptimizedRC File)文件格式是一种Hadoop生态圈中的类是存储格式,他被生产自Hive用来降低hadoop数据存储空间和加速Hive查询速度。ORC并不是单纯的列式存储,仍然是首先根据Stripe(条纹、组行)分割整个表,在每一个Stripe内进行按列存储。

image-20230511093712170

ORC 文件时子描述的,他的元数据使用Protocol Buffers序列化,并且文件中的数据尽可能的压缩以降低存储空间的小号,目前也被Spark SQL、Presto等查询引擎支持

ORC文件是以二进制方式存储的,所以是不可以直接读取的

ORC与MapReduce继承

关联Maven依赖

<!--ORC 关联MapReduce-->
<dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-shims</artifactId>
    <version>1.8.3</version>
</dependency>
<dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-core</artifactId>
    <version>1.8.3</version>
</dependency>
<dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-mapreduce</artifactId>
    <version>1.8.3</version>
</dependency>

关键的两个类OrcOutputFormat: 用于生成ORC文件OrcInputFormat:用于实现读取ORC文件类型


数据压缩优化

压缩设计与压缩算法

优点

  1. 减少文件存储所占的空间
  2. 加快文件传输效率,减少网络传输带宽
  3. 降低IO读写次数

缺点

使用数据是需要先对文件进行解压,加重了CPU负荷,压缩算法越复杂,解压时间越长

image-20230511103500591

压缩的位置

image-20230511103804578

压缩配置

在Hadoop中配置压缩

Input:MapReduce输入通过文件后名进行判断,自动识别读取压缩类型,不需要做任何配置

Map output:需要配置以下参数

image-20230511104234661

Reduce Output:需要配置以下参数

image-20230511104240450

Gzip压缩

优缺点

压缩算法优点缺点
Gzip压缩比较高;
hadoop本身支持,在应用中处理gzip格式文件就和直接处理文本文件一样
有hadoop native库
大部分linux系统都自带gzip命令,使用起来比较方便
不支持split

编码器

org.apache.hadoop.io.compress.GzipCodec

读取普通文本文件,将普通文本文件压缩为Gzip格式

在进行压缩这个过程,整体上不需要进行大的改动,无非是在最后的输入阶段进行一个修改

Java文件

package Zip;

import MapFileTest.MapFileCreate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

/**
 * @author wxk
 * @date 2023/05/11/10:52
 */
public class GzipTest extends Configured implements Tool {
    private static class MRMapper extends Mapper<LongWritable, Text, NullWritable,Text> {
        private NullWritable outKey= NullWritable.get();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(outKey,value);
        }
    }
    private static class MRReduce extends Reducer<NullWritable,Text, NullWritable,Text> {
        @Override
        protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Iterator<Text> iterator = values.iterator();
            while (iterator.hasNext()){
                context.write(key,iterator.next());
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration cfg =new Configuration();
        // 配置输出结果压缩为gzip格式
        cfg.set("mapreduce.output.fileoutputformat.compress","true");
        cfg.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec");
        // 提交job
        final int run = ToolRunner.run(cfg, new GzipTest(), args);
        System.exit(run);
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), MapFileCreate.class.getSimpleName());
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setJarByClass(GzipTest.class);
        job.setMapperClass(MRMapper.class);
        job.setReducerClass(MRReduce.class);

        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : -1;
    }
}

运行jar包之后查看运行结果

image-20230511114441069

原来的文件大小

image-20230511114500014

读取Gzip文件 还原普通文本文件

在上文中我们讲过hadoop在读取过程中会根据文件后缀自动的进行解压缩,所以我们在读取的过程中不需要任何的操作,我们使用原来的代码但是仅仅注掉压缩的两行代码,其余无需修改,直接打成jar包直接运行

        // cfg.set("mapreduce.output.fileoutputformat.compress","true");
        // cfg.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec");

运行结果如下

image-20230511115401209

image-20230511115730199

文件也可以正常读,和以前的一样

Snappy压缩

优缺点

压缩算法优点缺点
Snappy压缩速度快
支持Hadoop native库
不支持spilt
压缩比低
hadoop本身不支持
需要安装
linux系统下没有对应的位置

编码器 org.apache.hadoop.io.compress.SynappyCodec

Lzo压缩

优缺点

压缩算法优点缺点
Lzo压缩/解压速度比较快,合理的压缩率
支持spilt,时hadoop中最流行的压缩格式
支持hadoop native库
需要Linux 系统下自行安装lzop命令,使用方便
压缩率比gzip要低
hadoop本身不支持,需要安装
lzo支持split,但是需要对lzo文件建索引,否则hadoop也会将lzo文件看成一个普通文件(为了支持split需要建立索引,需要指定inputformat为lzo格式)

编码器1 : org.apache.hadoop.io.compress.LzoCodec [结尾为.lzo_deflate] 不能够建索引 不兼容lzop

编码器2:com.hadoop.compression.lzo.LzopCodec [结尾.lzo 可以构建索引,兼容lzo]


MapReduce属性优化

MapReduce的核心优化在于修改文件类型、合并小文件、使用压缩等方式,通过降低Io 开销来提升MapReduce过程中Task的执行效率;除此之外,MapReduce中也可以通过调节一些参数来从整体上提升MapReduce的性能,可以通过基准测试来测试MapReduce集群对应的性能,观察实施了优化以后MapReduce的性能是否得到了提升等

基准测试概述

yarn jar hadoop安装路径/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-hadoop版本号-tests.jar

例如我的hadoop时3.3.1 ,安装路径为/opt/module/hadoop-3.3.1

image-20230511151204391

测试基准 -MR Bench

功能:用于指定生成文件,MapTask、ReduceTask的个数,并且可以指定执行的次数

例如:生成每隔文件10000行、20个mapper、5个reducer、执行两次

yarn jar hadoop-mapreduce-client-jobclient-3.3.1-tests.jar mrbench -numRuns 2 -inputLines 10000 -maps 20 -reduces 5

image-20230511152149292

最后会输出相应的数据,可以看到我们这个需要平均需要1分多钟

基准测试 -Load Gen

功能: 指定对某个数据进行加载、处理、测试性能耗时,可以调整Map和Reduce个数

例如: 对150M的数据进行测试,10个Map、1个Reduce

yarn jar hadoop-mapreduce-client-jobclient-3.3.1-tests.jar loadgen -m 10 -r 1 -indir 数据位置 -outdir 输出位置 

image-20230511152757570

大概80多秒

Uber模式

Uber运行模式对小作业进行优化,不会给每隔认为u分配Container资源,这些小任务将统一在一个Container中按照先执行map任务后执行reduce任务的顺序串行执行

开启

mapreduce.job.unbertask.enable = true,默认为false可以在mapred-site.xml中修改

限制条件
map任务的数量不大于mapreduce.job.unbertask.maxmaps参数(默认值为9)

reduce 任务的数量不大于mapreduce.job.unbertask.maxreduces参数(默认值是1)

输入文件大小不大于mapreduce.job.unbertask.maxbytes参数(默认为一个块的大小128MB)

map任务和reduce任务需要的资源不能大于MRAppMaster可用的资源总量

重试机制

功能:如果出现MapTask或者ReduceTask,由于网络、资源等外部因素导致TGask失败,AppMaster会检测到Task的任务失败,会立即重分配资源,如果重试以后人没有运行成功,那么整个Job会终止,程序运行失败。

每隔task的最大尝试次数,换句话说,框架将在放弃之前多次尝试执行Task

mapreduce.map.maxattempts=4
mapreduce.reduce.maxattempts=4

关闭推测执行

功能
推测执行是指在一个Task任务执行比预期慢时,程序会尽量检测并启动一个相同的任务作为备份,这就是推测执行,但是如果同时启动两个相同的任务,他们就会相互竞争,导致推测执行无法正常工作,这对资源是一种良妃,默认开启,实际中基于性能可以考虑选择关闭

配置

mapreduce.map.speculative =true
mapreduce.reduce.speculative=true

小文件优化

针对于小文件处理场景,默认每个小文件都会构建一个切片,启动一个maptask处理,可以使用CombineTextInputFormat代替TextInputFormat,将多个小文件合并成为一个切片

CombineTextInputFormat切片机制包括:虚拟存储过程和切片过程两个部分

虚拟过程

将输入目录下所有的文件大小,一次和设置的setMaxInputSplitSize值进行比较,如果不大于设置的最大值,逻辑上划分一个块,如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块,当剩余数据大小超过了设置的最大值但是不大于两倍,那么旧文件平均分成两个虚拟存储块(防止出现太小的切片)

切片过程

判断虚拟存储的文件是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片,如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片

//设置输入类
job.setInputFormatClass(CombinTextInputFormat.class);
CombineTextinputFormat.setMaxInputSplitSize(job,4194304);//4MB

减少Shuffle时Spill和Merge次数

默认每隔缓冲区大小为100MB,每次达到80%开始Spill,如果调大这两个值,可以减少数据spill的次数,从而减少磁盘IO,默认每次生成10个小文件开始进行合并,如果增大文件个数,可以减少merge的次数,从而减少磁盘IO

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。