您现在的位置是:首页 >技术杂谈 >带你彻底理解Spark的分区网站首页技术杂谈
带你彻底理解Spark的分区
前言
我:什么是RDD?
面试者:RDD是被分区的,由一系列分区组成…
…
我:你怎么理解分区?
面试者:…
我:Spark中有哪些可以实现分区的方法?分别使用的场景是什么?
面试者…
我:Spark默认分区数是多少?如何保证一个分区对应一个文件?
面试者…
我:…谢谢您的面试,回去等通知吧!
什么是分区
Spark分区是将大型数据集划分为较小的数据块,每个数据块称为分区,分区是一个逻辑数据块,对应相应的物理块Block。每个分区都可以在集群中的不同节点上并行处理,这样可以提高Spark的并行性和性能。分区的数量可以通过设置Spark的分区数来控制,分区数越多,Spark可以并行处理的数据块就越多,从而提高性能。分区的数量应根据数据的大小和集群的资源进行调整,以充分利用集群的并行处理能力。在处理大型数据集时,Spark分区是非常重要的,因为它可以帮助Spark充分利用集群的资源和并行处理能力,从而加快数据处理速度。
有哪些分区方法,使用场景是什么
分区方法 | 使用场景 |
---|---|
repartition(numPartitions : scala.Int) | 对数据集随机打散进行范围分区,每个分区中数据量大致相同 |
repartition(partitionExprs : Column*) | 对数据集中指定列进行哈希分区 |
repartition(numPartitions : scala.Int, partitionExprs : Column*) | partition = hash(partitionExprs) % numPartitions,使用指定列的哈希值对指定分区数进行取模 |
coalesce(numPartitions : scala.Int) | 用来减少分区数量,可以避免shuffle |
repartitionByRange(partitionExprs : Column*) | 使用范围分区,非常使用与数字列 |
partitionBy(colNames : root.scala.Predef.String*) | 用于将数据写入磁盘中对应的子文件夹,类似于hive的分区 |
注意:partitionBy()是DataFrameWriter类中的一个方法,其他所有方法都来自DataFrame。
分区的几个常识
- 默认情况下,Spark创建的分区等于服务器中CPU内核的数量。
- 每个分区的数据都存在一台服务器上(这里注意和HDFS上Block的区别)。
- Spark为每个分区创建一个Task。
- Spark Shuffle操作将数据从一个分区移动到其他分区。
- 分区是一项昂贵的操作,因为它会造成shuffle(数据可能在节点之间移动)。
- 默认情况下,DataFrame shuffle操作会创建200个分区。
内存分区和磁盘分区
Spark支持内存分区(RDD/DataFrame)和磁盘分区(文件系统)。
- 内存分区
您可以通过调用repartition() or coalesce()算子来对DataFrame进行分区或重分区。 - 磁盘分区
在将DataFrame写回磁盘时,可以使用DataFrameWriter的partitionBy()来选择如何根据列对数据进行分区。这与Hive分区类似。
分区的优点
众所周知,Spark的处理大型数据集的速度是MapReduce处理速度的100倍,如果没有分区,这是不可能的。以下是在内存或磁盘上使用Spark分区的一些优点。
- 快速访问数据。
- 提供在较小数据集上执行算子的能力。
很多数据处理框架都在使用分区,因为它能够很快读取数据。
默认分区和配置
默认情况下,Spark会根据运行Job的模式对数据进行分区。
- Local模式
在本地以独立模式运行时,Spark会将数据划分为系统上的CPU核数或创建SparkSession对象时指定的值。
SparkSession spark = SparkSession
.builder()
.appName("appName")
.master("local[5]")
.enableHiveSupport()
.getOrCreate();
上面的例子提供了local[5]作为master()方法的参数,这意味着用5个分区在本地运行作业。就算你系统只有2个内核,它仍然会创建5个分区任务。
package sparkdemo;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
public class PartitionTest {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.ERROR);
SparkSession spark = SparkSession
.builder()
.appName("appName")
.master("local[5]")
.enableHiveSupport()
.getOrCreate();
Dataset<Long> ds = spark.range(0, 20);
System.out.println(ds.rdd().getNumPartitions());
spark.stop();
}
}
结果打印的是:5
- HDFS Cluster模式
在Hadoop集群上运行Spark作业时,默认的分区数量需要考虑以下几个种情况:
- 在HDFS集群上,默认情况下,Spark为文件的每个Block创建一个分区。
- 在Hadoop1.X中,HDFS块大小为64MB,在Hadoop2.X中HDFS块大小为128MB。
- 集群中所有executor核总数和2,哪个大取哪个。
例如,如果某个文件大小为640 MB,在Hadoop2.X上运行,则会创建5个分区,每个分区由128 MB的Block块组成(5个块*128 MB=640 MB)。如果将分区重新划分为10,那么它会为每个块创建2个分区。
- 通过配置设置分区
- spark.default.parallelism
配置默认值设置为集群中所有节点上的所有核数,在本地,它设置为系统上的核数。 - spark.sql.shuffle.partitions
配置默认值设置为200,当调用shuffle算子,如groupBy、join算子等,此属性仅在DataFrame API中可用,在RDD中不可用。
在代码中设置配置属性的方式如下:
spark.conf.set("spark.sql.shuffle.partitions", "500")
也可在使用spark-submit命令提交spark应用程序的时候,增加配置属性:
./bin/spark-submit --conf spark.sql.shuffle.partitions=500 --conf spark.default.parallelism=500
动态修改分区数
创建RDD/DataFrame时,可以通过参数设定分区的数量,Spark也可以修改内存分区数量、磁盘分区数量。
- repartition()和coalesce()
在处理分区数据时,可以通过reparation和coalesce来增加和减少分区。
package sparkdemo;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
public class PartitionTest {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.ERROR);
SparkSession spark = SparkSession
.builder()
.appName("appName")
.master("local[5]")
.enableHiveSupport()
.getOrCreate();
Dataset<Long> ds = spark.range(0, 10);
System.out.println("默认分区数量:"+ds.rdd().getNumPartitions());
Dataset<Long> repartition = ds.repartition(10);
System.out.println("repartition增加分区数:"+repartition.rdd().getNumPartitions());
Dataset<Long> coalesce = ds.coalesce(2);
System.out.println("coalesce减少分区数:"+coalesce.rdd().getNumPartitions());
spark.stop();
}
}
默认分区数量:5
repartition增加分区数:10
coalesce减少分区数:2
注意:当你想减少分区的数量时,建议使用coalesce而不是repartition,因为能避免shuffle。
- partitionBy()
Spark partitionBy()是DataFrameWriter类的一个方法,用于在将DataFrame写入磁盘/文件系统时基于一个或多个列值进行分区。
当通过调用partitionBy()将Spark DataFrame写入磁盘时,Spark会根据分区列拆分记录,并将每个分区数据存储到一个子目录中。
partitionBy可以对单个列或多个列进行分区。
测试数据:
code,country
1,China
1,China
1,China
2,America
2,America
3,England
3,England
4,Japan
4,Japan
4,Japan
5,Korea
5,Korea
代码:
package sparkdemo;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class PartitionTest {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.ERROR);
SparkSession spark = SparkSession
.builder()
.appName("appName")
.master("local[5]")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> ds = spark.read().option("header", "true").csv("./data/country.csv");
ds.write().option("header", "true").partitionBy("country")
.mode("overwrite").csv("./data/country");
spark.stop();
}
}
DataFrame中共有5个不同的country,因此,它创建了5个目录,如下图所示。子目录的名称将是分区列及其值(分区列=值)。
- repartitionByRange()
测试数据:
id,count
1,10
2,20
3,10
4,20
5,10
6,30
7,50
8,50
9,50
10,30
11,30
12,40
13,40
14,20
15,20
代码:
package sparkdemo;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class PartitionTest {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.ERROR);
SparkSession spark = SparkSession
.builder()
.appName("appName")
.master("local[5]")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> ds = spark.read().option("header", "true").csv("./data/num.csv");
Dataset<Row> count = ds.repartitionByRange(5, new Column("count"));
count.write().option("header", "true").mode("overwrite").csv("./data/range-partition");
spark.stop();
}
}
打开看其中一个分区:
所有count=50的在一个分区。
如何选择分区列
当使用partitionBy()时,必须非常小心它创建的分区数量,因为分区太多会在一个目录中创建太多子目录,这会给NameNode带来不必要的开销(如果你使用的是Hadoop),因为它必须将文件系统的所有元数据保存在内存中。因此,需要根据分区字段进行判断,比如使用country作为分区,就比使用city可能会更合适。
分区越多越好还是越少越少
分区可以提高作业的并行度从而提高spark的性能,那我们设置超级多分区岂不是美哉?
nonono,Spark必须为每个分区创建一个任务,如果大部分时间都用于创建、调度和管理任务,然后再执行,而不是大部分时间用来处理数据。同理,太少的分区根本不能充分利用集群资源,那简直就是对集群资源的一种浪费。因此,根据不同的数据量设置相应的分区才是最佳选择。
后记
相信看完这篇博客,你对spark的分区会有更加清晰的认识,无论在工作中,还是面试都将游刃有余。