您现在的位置是:首页 >其他 >SparkSQL倾斜处理网站首页其他
SparkSQL倾斜处理
- 过滤倾斜的key,比如业务中爬虫异常key,null
可以使用df.sample(n)
抽样找出倾斜异常key,再df.filter(func)
过滤掉倾斜key如果key中的null倾斜需要保留,我们可以将null数据单独拿出来进行处理,比如:
-- 假设a.mobile存在大量的null倾斜
select a.id
,a.mobile
,b.fields
from table_a
left join table_b
on nvl(a.mobile,rand()) = b.mobile
-- 或者使用union all改写,单独摘出来再拼接上去
select a.id
,a.mobile
,b.fields_name
from table_a a
left join table_b b
on a.mobile=b.mobile
where a.mobile is not null
union all
select a.id,a.mobile,null as fields_name
from table_a a
where a.mobile is null
-- 过滤异常key
select groupby_field
,count(1) as cnt
from table_a
where groupby_field <> '异常key'
group by groupby_field
-
增加shuffle并行度,如果各个key的数据量整体差异不大,task < executor_num(executor个数) * executor_cores(每个executor的核心数),我们可以考虑增加task数量,来充分利用计算资源;
spark.sql.shuffle.partitions
参数可以设置并行度(默认是200),一般设置每核跑1-3个task,磁盘io时可以充分利用计算资源。
spark中有很多算子有指定并行度参数,比如:
textFile(xx,minnumpartition)
sc.parallelize(xx,num)
sc.makeRDDD(xx,num)
sc.parallelizePairs(List[Tuple2],num)
redduceByKey(xx,num)
,groupByKey(xx,num)
,join
,distinct
repartition
,coalesce
spark.default.parallelism
在sparksql中并行度由spark.sql.shuffle.partitions
决定 -
双重聚合,类似于hive中的groupby倾斜参数
set hive.groupby.skewindata=true
,用两个mr计算作业,第一个mr中的key随机分发聚合,第二个mr做全局聚合;比如:
select groupby_field
,sum(cnt) as cnt -- 全局聚合
from
( -- key打散聚合
select ceiling(rand() * 10) as rnd -- 添加随机数打散
,groupby_field -- 分组字段
,count(1) as cnt
from table_name
group by ceiling(rand() * 10),groupby_field
) t
group by groupby_field
- reduce的joion改写成mapjoin,如果存在小表情况下,可以使用mapjoin,将小表回收到driver端,再广播到各个执行的executor上应用map关联;此场景使用于大表join小表的情况;
这里需要注意,在外连接时,比如left join
或者right join
,小表是主表,mapjoin不生效
select /*+mapjoin(b)*/ a.id
,b.fields_name
from table_a a
join table_b b -- b小表
on a.id=b.id
- join中倾斜key单独摘出来操作。在hive中会有join倾斜参数,
hive.optimize.skewjoin=true;
它会将join作业拆分成两个MR,对于倾斜健部分单独摘出来使用mapjoin处理,非倾斜键走正常reduce的join。在spark中,如果倾斜键数据符合大表+小表原则,也可以使用该策略。如果倾斜健两个表的数据都比较大,大表倾斜部分同一个key添加n种前缀,小表膨胀倾斜健部分膨胀n倍,倾斜部分join,再union 非倾斜部分join
select a.id,a.field_a,b.field_b
from
( -- 加入随机数
select id,field_a,ceiling(rand()*10) as rnd_name
from table_a
where a.id in ('倾斜健')
) a
join
( -- 数据膨胀
select id,subview.rnd_name,field_b
from table_b b
lateral view explode(array(1,2,3,4,5,6,7,8,9,10)) subview as rnd_name
where b.id in ('倾斜健')
) b
on a.id=b.id and a.rnd_name=b.rnd_name
union all -- 拼接非倾斜部分的join
select a.id,a.field_a,b.field_b
from table_a a
join table_b b
on a.id=b.id
where a.id not in ('倾斜健') and b.id not in ('倾斜健')
对于rdd计算优化,在代码层面,
如果rdd多次使用使用cache()
,persist()
持久化
尽量避免shuffle类算子,尽量使用有map端聚合算子,比如reduceByKey,aggregateByKey(可以自定义map端聚合函数,自定义初始记录),combineByKey(类同aggregateByKey,初始记录为rdd数据行):可以减少shuffle write数据量,shuffle读数据量,redduce端聚合次数;
尽量使用高性能算子,比如用reduceByKey取代groupByKey;使用mapPartitions取代map;数据filter过滤后使用coalse减少分区
使用广播变量,比如mapjoin