您现在的位置是:首页 >技术杂谈 >带你彻底理解Spark的分区网站首页技术杂谈

带你彻底理解Spark的分区

SunnyRivers 2023-07-19 12:00:02
简介带你彻底理解Spark的分区

前言

我:什么是RDD?
面试者:RDD是被分区的,由一系列分区组成…

我:你怎么理解分区?
面试者:…
我:Spark中有哪些可以实现分区的方法?分别使用的场景是什么?
面试者…
我:Spark默认分区数是多少?如何保证一个分区对应一个文件?
面试者…
我:…谢谢您的面试,回去等通知吧!

什么是分区

Spark分区是将大型数据集划分为较小的数据块,每个数据块称为分区,分区是一个逻辑数据块,对应相应的物理块Block。每个分区都可以在集群中的不同节点上并行处理,这样可以提高Spark的并行性和性能。分区的数量可以通过设置Spark的分区数来控制,分区数越多,Spark可以并行处理的数据块就越多,从而提高性能。分区的数量应根据数据的大小和集群的资源进行调整,以充分利用集群的并行处理能力。在处理大型数据集时,Spark分区是非常重要的,因为它可以帮助Spark充分利用集群的资源和并行处理能力,从而加快数据处理速度。

有哪些分区方法,使用场景是什么

分区方法使用场景
repartition(numPartitions : scala.Int)对数据集随机打散进行范围分区,每个分区中数据量大致相同
repartition(partitionExprs : Column*)对数据集中指定列进行哈希分区
repartition(numPartitions : scala.Int, partitionExprs : Column*)partition = hash(partitionExprs) % numPartitions,使用指定列的哈希值对指定分区数进行取模
coalesce(numPartitions : scala.Int)用来减少分区数量,可以避免shuffle
repartitionByRange(partitionExprs : Column*)使用范围分区,非常使用与数字列
partitionBy(colNames : root.scala.Predef.String*)用于将数据写入磁盘中对应的子文件夹,类似于hive的分区

注意:partitionBy()是DataFrameWriter类中的一个方法,其他所有方法都来自DataFrame。

分区的几个常识

  1. 默认情况下,Spark创建的分区等于服务器中CPU内核的数量。
  2. 每个分区的数据都存在一台服务器上(这里注意和HDFS上Block的区别)。
  3. Spark为每个分区创建一个Task。
  4. Spark Shuffle操作将数据从一个分区移动到其他分区。
  5. 分区是一项昂贵的操作,因为它会造成shuffle(数据可能在节点之间移动)。
  6. 默认情况下,DataFrame shuffle操作会创建200个分区。

内存分区和磁盘分区

Spark支持内存分区(RDD/DataFrame)和磁盘分区(文件系统)。

  1. 内存分区
    您可以通过调用repartition() or coalesce()算子来对DataFrame进行分区或重分区。
  2. 磁盘分区
    在将DataFrame写回磁盘时,可以使用DataFrameWriter的partitionBy()来选择如何根据列对数据进行分区。这与Hive分区类似。

分区的优点

众所周知,Spark的处理大型数据集的速度是MapReduce处理速度的100倍,如果没有分区,这是不可能的。以下是在内存或磁盘上使用Spark分区的一些优点。

  1. 快速访问数据。
  2. 提供在较小数据集上执行算子的能力。

很多数据处理框架都在使用分区,因为它能够很快读取数据。

默认分区和配置

默认情况下,Spark会根据运行Job的模式对数据进行分区。

  1. Local模式
    在本地以独立模式运行时,Spark会将数据划分为系统上的CPU核数或创建SparkSession对象时指定的值。
        SparkSession spark = SparkSession
                .builder()
                .appName("appName")
                .master("local[5]")
                .enableHiveSupport()
                .getOrCreate();

上面的例子提供了local[5]作为master()方法的参数,这意味着用5个分区在本地运行作业。就算你系统只有2个内核,它仍然会创建5个分区任务。

package sparkdemo;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;


public class PartitionTest {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession
                .builder()
                .appName("appName")
                .master("local[5]")
                .enableHiveSupport()
                .getOrCreate();

        Dataset<Long> ds = spark.range(0, 20);
        System.out.println(ds.rdd().getNumPartitions());
        spark.stop();
    }
}

结果打印的是:5

  1. HDFS Cluster模式
    在Hadoop集群上运行Spark作业时,默认的分区数量需要考虑以下几个种情况:
  • 在HDFS集群上,默认情况下,Spark为文件的每个Block创建一个分区。
  • 在Hadoop1.X中,HDFS块大小为64MB,在Hadoop2.X中HDFS块大小为128MB。
  • 集群中所有executor核总数和2,哪个大取哪个。

例如,如果某个文件大小为640 MB,在Hadoop2.X上运行,则会创建5个分区,每个分区由128 MB的Block块组成(5个块*128 MB=640 MB)。如果将分区重新划分为10,那么它会为每个块创建2个分区。

  1. 通过配置设置分区
  • spark.default.parallelism
    配置默认值设置为集群中所有节点上的所有核数,在本地,它设置为系统上的核数。
  • spark.sql.shuffle.partitions
    配置默认值设置为200,当调用shuffle算子,如groupBy、join算子等,此属性仅在DataFrame API中可用,在RDD中不可用。
    在代码中设置配置属性的方式如下:

spark.conf.set("spark.sql.shuffle.partitions", "500")

也可在使用spark-submit命令提交spark应用程序的时候,增加配置属性:


./bin/spark-submit --conf spark.sql.shuffle.partitions=500 --conf spark.default.parallelism=500

动态修改分区数

创建RDD/DataFrame时,可以通过参数设定分区的数量,Spark也可以修改内存分区数量、磁盘分区数量。

  1. repartition()和coalesce()
    在处理分区数据时,可以通过reparation和coalesce来增加和减少分区。
package sparkdemo;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;


public class PartitionTest {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession
                .builder()
                .appName("appName")
                .master("local[5]")
                .enableHiveSupport()
                .getOrCreate();
        Dataset<Long> ds = spark.range(0, 10);
        System.out.println("默认分区数量:"+ds.rdd().getNumPartitions());
        Dataset<Long> repartition = ds.repartition(10);
        System.out.println("repartition增加分区数:"+repartition.rdd().getNumPartitions());
        Dataset<Long> coalesce = ds.coalesce(2);
        System.out.println("coalesce减少分区数:"+coalesce.rdd().getNumPartitions());
        spark.stop();
    }
}

默认分区数量:5
repartition增加分区数:10
coalesce减少分区数:2

注意:当你想减少分区的数量时,建议使用coalesce而不是repartition,因为能避免shuffle。

  1. partitionBy()
    Spark partitionBy()是DataFrameWriter类的一个方法,用于在将DataFrame写入磁盘/文件系统时基于一个或多个列值进行分区。
    当通过调用partitionBy()将Spark DataFrame写入磁盘时,Spark会根据分区列拆分记录,并将每个分区数据存储到一个子目录中。
    partitionBy可以对单个列或多个列进行分区。
    测试数据:
code,country
1,China
1,China
1,China
2,America
2,America
3,England
3,England
4,Japan
4,Japan
4,Japan
5,Korea
5,Korea

代码:

package sparkdemo;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


public class PartitionTest {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession
                .builder()
                .appName("appName")
                .master("local[5]")
                .enableHiveSupport()
                .getOrCreate();
        Dataset<Row> ds = spark.read().option("header", "true").csv("./data/country.csv");
        ds.write().option("header", "true").partitionBy("country")
                .mode("overwrite").csv("./data/country");
        spark.stop();
    }
}

DataFrame中共有5个不同的country,因此,它创建了5个目录,如下图所示。子目录的名称将是分区列及其值(分区列=值)。
在这里插入图片描述

  1. repartitionByRange()
    测试数据:
id,count
1,10
2,20
3,10
4,20
5,10
6,30
7,50
8,50
9,50
10,30
11,30
12,40
13,40
14,20
15,20

代码:

package sparkdemo;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


public class PartitionTest {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkSession spark = SparkSession
                .builder()
                .appName("appName")
                .master("local[5]")
                .enableHiveSupport()
                .getOrCreate();
        Dataset<Row> ds = spark.read().option("header", "true").csv("./data/num.csv");
        Dataset<Row> count = ds.repartitionByRange(5, new Column("count"));
        count.write().option("header", "true").mode("overwrite").csv("./data/range-partition");
        spark.stop();
    }
}

在这里插入图片描述
打开看其中一个分区:
在这里插入图片描述
所有count=50的在一个分区。

如何选择分区列

当使用partitionBy()时,必须非常小心它创建的分区数量,因为分区太多会在一个目录中创建太多子目录,这会给NameNode带来不必要的开销(如果你使用的是Hadoop),因为它必须将文件系统的所有元数据保存在内存中。因此,需要根据分区字段进行判断,比如使用country作为分区,就比使用city可能会更合适。

分区越多越好还是越少越少

分区可以提高作业的并行度从而提高spark的性能,那我们设置超级多分区岂不是美哉?
nonono,Spark必须为每个分区创建一个任务,如果大部分时间都用于创建、调度和管理任务,然后再执行,而不是大部分时间用来处理数据。同理,太少的分区根本不能充分利用集群资源,那简直就是对集群资源的一种浪费。因此,根据不同的数据量设置相应的分区才是最佳选择。

后记

相信看完这篇博客,你对spark的分区会有更加清晰的认识,无论在工作中,还是面试都将游刃有余。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。