您现在的位置是:首页 >其他 >SparkSQL倾斜处理网站首页其他

SparkSQL倾斜处理

爱弥儿er 2023-05-28 08:00:03
简介SparkSQL倾斜处理
  1. 过滤倾斜的key,比如业务中爬虫异常key,null
    可以使用df.sample(n)抽样找出倾斜异常key,再df.filter(func)过滤掉倾斜key如果key中的null倾斜需要保留,我们可以将null数据单独拿出来进行处理,比如:
-- 假设a.mobile存在大量的null倾斜
select a.id
  ,a.mobile
  ,b.fields
from table_a
left join table_b
on nvl(a.mobile,rand()) = b.mobile  

-- 或者使用union all改写,单独摘出来再拼接上去
select a.id
  ,a.mobile
  ,b.fields_name
from table_a a 
left join table_b b
on a.mobile=b.mobile  
where a.mobile is not null 
union all 
select a.id,a.mobile,null as fields_name
from table_a a
where a.mobile is null 

-- 过滤异常key
select groupby_field
  ,count(1) as cnt 
from table_a
where groupby_field <> '异常key'
group by groupby_field
  1. 增加shuffle并行度,如果各个key的数据量整体差异不大,task < executor_num(executor个数) * executor_cores(每个executor的核心数),我们可以考虑增加task数量,来充分利用计算资源;spark.sql.shuffle.partitions参数可以设置并行度(默认是200),一般设置每核跑1-3个task,磁盘io时可以充分利用计算资源。
    spark中有很多算子有指定并行度参数,比如:
    textFile(xx,minnumpartition)
    sc.parallelize(xx,num)
    sc.makeRDDD(xx,num)
    sc.parallelizePairs(List[Tuple2],num)
    redduceByKey(xx,num)groupByKey(xx,num),join,distinct
    repartition,coalesce
    spark.default.parallelism在sparksql中并行度由spark.sql.shuffle.partitions决定

  2. 双重聚合,类似于hive中的groupby倾斜参数set hive.groupby.skewindata=true,用两个mr计算作业,第一个mr中的key随机分发聚合,第二个mr做全局聚合;比如:

select groupby_field
  ,sum(cnt) as cnt   -- 全局聚合
from 
(  -- key打散聚合
  select ceiling(rand() * 10) as rnd  -- 添加随机数打散
    ,groupby_field   -- 分组字段
    ,count(1) as cnt  
  from table_name
  group by ceiling(rand() * 10),groupby_field
) t 
group by groupby_field
  1. reduce的joion改写成mapjoin,如果存在小表情况下,可以使用mapjoin,将小表回收到driver端,再广播到各个执行的executor上应用map关联;此场景使用于大表join小表的情况;
    这里需要注意,在外连接时,比如left join或者right join,小表是主表,mapjoin不生效
select /*+mapjoin(b)*/ a.id
  ,b.fields_name
from table_a a
join table_b b  -- b小表
on a.id=b.id
  1. join中倾斜key单独摘出来操作。在hive中会有join倾斜参数,hive.optimize.skewjoin=true;它会将join作业拆分成两个MR,对于倾斜健部分单独摘出来使用mapjoin处理,非倾斜键走正常reduce的join。在spark中,如果倾斜键数据符合大表+小表原则,也可以使用该策略。如果倾斜健两个表的数据都比较大,大表倾斜部分同一个key添加n种前缀,小表膨胀倾斜健部分膨胀n倍,倾斜部分join,再union 非倾斜部分join
select  a.id,a.field_a,b.field_b
from 
( -- 加入随机数
  select id,field_a,ceiling(rand()*10) as rnd_name
  from table_a
  where  a.id in ('倾斜健')
) a 
join 
( -- 数据膨胀
  select id,subview.rnd_name,field_b
  from table_b b
  lateral view explode(array(1,2,3,4,5,6,7,8,9,10))  subview as rnd_name
  where b.id in ('倾斜健')
) b 
on a.id=b.id  and  a.rnd_name=b.rnd_name
union all   -- 拼接非倾斜部分的join
select  a.id,a.field_a,b.field_b
from table_a a 
join table_b b 
on a.id=b.id 
where a.id not in ('倾斜健') and b.id  not in ('倾斜健')

对于rdd计算优化,在代码层面,
如果rdd多次使用使用cache(),persist()持久化
尽量避免shuffle类算子,尽量使用有map端聚合算子,比如reduceByKey,aggregateByKey(可以自定义map端聚合函数,自定义初始记录),combineByKey(类同aggregateByKey,初始记录为rdd数据行):可以减少shuffle write数据量,shuffle读数据量,redduce端聚合次数;
尽量使用高性能算子,比如用reduceByKey取代groupByKey;使用mapPartitions取代map;数据filter过滤后使用coalse减少分区
使用广播变量,比如mapjoin

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。