优化常用手段
- 1.好的模型设计
- 对大表进行分区和分桶
NULL
值问题- 字段类型的一致性
- 2.解决数据倾斜问题
- 3.减少job数
- 4.在数据量大的情况下,慎用
count(distinct xxx)
,group by
,容易产生数据倾斜 - 5.小文件合并
- 6.单个job最优不如整体最优
- 7.合理利用文件存储格式(ORC),进行数据压缩
排序选择
cluster by
: 对同一字段分桶并排序,不能和 sort by 连用distribute by + sort by
: 分桶,保证同一字段值只存在一个结果文件当中,结合sort by
保证 每个reduceTask
结果有序sort by
: 单机排序,单个reduce
结果有序order by
: 全局排序,缺陷是只能使用一个reduce
任务花费时间很短,又要多次启动JVM
的情况下,通过重用JVM
来解决
set mapred.job.reuse.jvm.num.tasks=5
小文件合并
set hive.merge.mapfiles = true ##在 map only 的任务结束时合并小文件
set hive.merge.mapredfiles = false ## true 时在 MapReduce 的任务结束时合并小文件
set hive.merge.size.per.task = 256*1000*1000 ##合并文件的大小
set mapred.max.split.size=256000000; ##每个 Map 最大分割大小
set mapred.min.split.size.per.node=1; ##一个节点上 split 的最少值
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##执行 Map 前进行小文件合并
Join 优化
总体原则:
- 1.优先过滤后再进行
Join
操作,最大限度的减少参与join
的数据量 - 2.小表
join
大表,最好启动mapjoin
- 3.
Join on
的条件相同的话,最好放入同一个job
,并且join
表的排列顺序从小到大
场景说明:
在map端产生join
- 1.当链接的两个表是一个比较小的表和一个特别大的表的时候,我们把比较小的table直接放到内存中去,然后再对比较大的表格进行map操作
- 2.join就发生在map操作的时候,每当扫描一个大的table中的数据,就要去去查看小表的数据,哪条与之相符,继而进行连接
- 3.这里的join并不会涉及reduce操作,优势就是在于没有shuffle
- 4.在本质上mapjoin根本就没有运行MR进程,仅仅是在内存就进行了两个表的联合
set hive.auto.convert.join=true; select count(*) from a join b on a.id=b.id;
common join
/shuffle join
两个table的大小相当,但是又不是很大的情况下使用
SMBJoin
1.smb是sort merge bucket操作,首先进行排序,继而合并,然后放到所对应的bucket中去
2.bucket是hive中和分区表类似的技术,就是按照key进行hash,相同的hash值都放到相同的buck中去
3.在进行联合的时候,是table1中的一小部分和table1中的一小部分进行联合,table联合都是等值连接,相同的key都放到了同一个bucket中去了,那么在联合的时候就会大幅度的减小无关项的扫描set hive.auto.convert.sortmerge.join=true; set hive.optimize.bucketmapjoin = true; set hive.optimize.bucketmapjoin.sortedmerge = true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; set hive.optimize.bucketmapjoin=true; set hive.optimize.bucketmapjoin.sortedmerge=true;
如果明确知道由于
join
出现的数据倾斜,设置skew join
set hive.optimize.skewjoin = true; # 如果是join过程出现倾斜应该设置为true set hive.skewjoin.key = skew_key_threshold; # 这个是join的键对应的记录条数超过这个值则会进行 分拆,值根据具体数据量设置 select category_id ,goods_id ,count(distinct order_id) order_cnt from dw.dw_t_trade_order_item group by category_id,goods_id order by category_id,order_cnt desc limit 5 -- by category_id ;
并行化处理
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8; # 同一个 sql 允许并行任务的最大线程数
partition by
执行很慢解决思路
当列ca发生偏差时,cast(rand()*20 as int)
在第一阶段添加了更多的并行性。
在此之后,将执行谓词where rn=1
,从而显著减少稍后要处理的数据量。
with as(
select
ca,
cb,
row_number() over(partition by ca, cast(rand()*20 as int) order by cb desc) as rn1
from test
)
select * from (
select
ca,
cb,
row_number() over(partition by ca order by cb desc) as rn2
from tt1
where rn1=1
) tmp
where rn2=1;