`
hugh.wangp
  • 浏览: 288033 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

HIVE表数据量和数据记录数的矛与盾

阅读更多

 

HIVE作为在Hadoop分布式框架下的数据仓库技术,处理大数据量是最基本的诉求,这种海量处理是基于分布式框架,利用分布式存储,分布式计算,利用大集群的资源并行处理海量数据。但是一旦我们不能利用这种分布式并行处理,那么海量数据只能是低效处理了。再往细处说,就是一份海量数据需要多少map来处理,一个map能处理多少数据,这些都制约着数据处理的效率。

HIVE的执行效率问题可以大概归结:
1.如何使大数据量利用合适的map数
2.如何使单个map处理合适的数据记录数

这两点是相辅相成的,在保证单个map处理合适的记录数后,就能确定整个数据量需要多少map数。貌似好像解决了,但是这是个矛与盾的问题,不是那么好解决的,因为hive是根据数据量和文件数来确定map数,不会去考虑一个map处理多少记录数。我们要先去影响map数,然后再调整单个map处理的记录数,在特定情况下是要两手都要抓,两手都要硬,在矛与盾之间找到平衡。

1.如何使大数据量利用合适的map数
HIVE利用的默认split策略,基本上是使一个map处理近似于dfs.block.size大小的数据量,一般利用此策略基本可以完成大部分的数据处理。也就是说,通常情况下,能使我们的数据按照默认的split策略切分数据,得到的map数就是比较合适的,但是在使用HIVE中,我们不可能避免HQL不产生小文件(数据量远小于dfs.block.size),因为是否是小文件跟HQL的逻辑,数据分布,数据量等等都一定程度的关系,但小文件会浪费掉很多map,一个文件就需要一个map,并且一个文件可能就只包含几千,几万条记录,对map资源是个浪费。而且每个HQL如果占用太多map,在整个集群中说不定分配资源就成个问题,看似给了很多map,但是根本就抢不到资源。所以我们要合并小文件,那么使用数据时可以分配比较合理的map数。

合并小文件的方式不少,但各有利弊,不同的数据需求可以采用不同的方式:
1.设置HIVE的参数,使HIVE在数据生成时自动进行小文件合并,方法请参看:http://hugh-wangp.iteye.com/blog/1508506
2.设置HIVE的参数,使HIVE在获取数据是先进行小文件合并,set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
3.自己开发小文件合并工具,这个是自由度最高的方法
合并小文件是会消耗资源,何时合并小文件,哪些数据需要合并小文件一定要考虑全面点。

合并小文件的目的基本上要达到是一个map处理的数据量近似等于dfs.block.size。这样从整个namenode的quota,整个系统map/reduce资源利用率上能达到较好的效果。但如果单单追求消除小文件,针对于有些数据仓库应用是不利的。矛盾就产生了。例如HIVE上的一张表,数据量为240MB,那么hive应该给他一个文件,占用一个block(dfs.block.size=268435456),但是这张表只有2个字段,记录长度平均24B,那么这张表就有超过1千万的记录数,但是这么多记录数只有一个map来处理,一个map的效率是可想而知的。所以我们要在数据处理的map数和单map处理的记录数上做到平衡,这种平衡的依据更多是要考虑此数据处理在整个数据仓库模型中的地位,如果是个核心的表,会被N个数据处理所用到,在不更改数据模型的前提下,我们要更偏向于这张表的数据存储分散在多个小文件中,这样利用此表时就能利用更多的map,并且单个map处理记录数少一点,利用效率会更高。


2.如何使单个map/reduce处理合适的数据记录数
为了解决矛与盾的平衡,在合并小文件的同时,对特殊的数据处理,我们是要一定程度地拆分成小文件,但拆分的标准是:保证单个map处理的记录数是合适的,单个map处理的数据量在100W-300W之间是比较好的,同时也要考虑数据处理的复杂度。
简单的办法:数据冗余
接着上面提到的240MB表的例子,如果我们冗余一个没有任何意义的字段,使记录的长度增长到100B,表大小增长到1GB,那么需要4个map处理这1千万的数据,平均1个map处理250W左右的记录数,相较1个map处理1000W的记录数,效率是提升很多的。
但这种方法应用到数据处理的临时表刷新和使用时没问题的,但在最终的事实表或者汇总表冗余这么个个没有意义的字段就太不应该了。

另一种办法:
1.set mapred.reduce.tasks(预计一个文件包含的记录数,确定计划要输出的文件数)
2.利用distribute by rand(123)把数据分散到各个reduce里

举例说明,如何调整一个map处理的记录数。
hive> select count(1) from shaka01;
OK
29022319
Time taken: 92.263 seconds
--HDFS的block.size
hive>  set dfs.block.size;
dfs.block.size=268435456
--shaka01对应的文件,每个文件大于block.size,利用此表时会有4个map执行,平均1个map处理750W条记录
336495295 /group/hive/shaka01/attempt_201206121523_2432943_m_000000_0
338594924 /group/hive/shaka01/attempt_201206121523_2432943_m_000001_0
处理这种复杂程度的如下SQL,按现在的数据分布是够呛的,一个job半个小时都没有执行完
--在第一个job(number of mappers: 4; number of reducers: 1)就执行将近30分钟,但是还处于map = 71%,  reduce = 0%
hive>     create table shaka02 as
    > select  '2012-06-26'   col1
    >         ,a.col3
    >         ,a.col4
    >         ,a.col5
    >         ,count(distinct case when  date_add(to_date(col2),7) > '2012-06-26'   then  a.col7 else null END) as col6
    >         ,count(distinct case when  date_add(to_date(col2),30)> '2012-06-26'   then  a.col7 else null END ) as col7
    >         ,count(distinct case when  date_add(to_date(col2),90)> '2012-06-26'   then  a.col7 else null END ) as col8
    >         ,count(distinct case when  date_add(to_date(col2),7) >'2012-06-26' and  date_add(to_date(col2),7)>to_date(a.col8)  and to_date(a.col8)<='2012-06-26'  then a.col7 else null END) col9 
    >         ,count(distinct case when  date_add(to_date(col2),30)>'2012-06-26' and  date_add(to_date(col2),7)>to_date(a.col8)  and to_date(a.col8)<='2012-06-26'  then a.col7 else null END) col10
    >         ,count(distinct case when  date_add(to_date(col2),90)>'2012-06-26' and  date_add(to_date(col2),7)>to_date(a.col8)  and to_date(a.col8)<='2012-06-26'  then a.col7 else null END) col11
    > from shaka01 a
    > group by a.col3
    >          ,a.col4
    >          ,a.col5;
改变数据存储的文件数量,为了使1个map处理的记录数减少
hive> set mapred.reduce.tasks = 10;
hive> set mapred.reduce.tasks;
mapred.reduce.tasks=10
--这样就能使shaka01的数据打散到10个文件中
hive> create table shaka01
    > as
    > select  stat_date                       col1               
    >         ,mc_sent_time                   col2               
    >         ,receiver_admin_member_id       col3
    >         ,receiver_company_id            col4       
    >         ,category_id                    col5               
    >         ,sender_country_id              col6       
    >         ,sender_email                   col7               
    >         ,first_reply_time               col8       
    >         ,dw_ins_date                    col9
    > from    test
    > distribute by rand(123);  
--每个文件都少于256MB,所以利用这张表的时候会起10个map
88469251 /group/hive/shaka01/attempt_201206121523_2440189_r_000000_0
89634660 /group/hive/shaka01/attempt_201206121523_2440189_r_000001_0
88117390 /group/hive/shaka01/attempt_201206121523_2440189_r_000002_0
87820171 /group/hive/shaka01/attempt_201206121523_2440189_r_000003_0
89219537 /group/hive/shaka01/attempt_201206121523_2440189_r_000004_0
90928398 /group/hive/shaka01/attempt_201206121523_2440189_r_000005_0
86772252 /group/hive/shaka01/attempt_201206121523_2440189_r_000006_0
87524942 /group/hive/shaka01/attempt_201206121523_2440189_r_000007_0
88125909 /group/hive/shaka01/attempt_201206121523_2440189_r_000008_0
86613799 /group/hive/shaka01/attempt_201206121523_2440189_r_000009_0
执行刚提到的复杂SQL

--第一个job多了6个map,达到10个map资源,26分钟就执行完毕整个SQL
hive>     create table shaka02 as
    > select  '2012-06-26'   col1
    >         ,a.col3
    >         ,a.col4
    >         ,a.col5
    >         ,count(distinct case when  date_add(to_date(col2),7) > '2012-06-26'   then  a.col7 else null END) as col6
    >         ,count(distinct case when  date_add(to_date(col2),30)> '2012-06-26'   then  a.col7 else null END ) as col7
    >         ,count(distinct case when  date_add(to_date(col2),90)> '2012-06-26'   then  a.col7 else null END ) as col8
    >         ,count(distinct case when  date_add(to_date(col2),7) >'2012-06-26' and  date_add(to_date(col2),7)>to_date(a.col8)  and to_date(a.col8)<='2012-06-26'  then a.col7 else null END) col9 
    >         ,count(distinct case when  date_add(to_date(col2),30)>'2012-06-26' and  date_add(to_date(col2),7)>to_date(a.col8)  and to_date(a.col8)<='2012-06-26'  then a.col7 else null END) col10
    >         ,count(distinct case when  date_add(to_date(col2),90)>'2012-06-26' and  date_add(to_date(col2),7)>to_date(a.col8)  and to_date(a.col8)<='2012-06-26'  then a.col7 else null END) col11
    > from shaka01 a
    > group by a.col3
    >          ,a.col4
    >          ,a.col5;
OK
Time taken: 1580.314 seconds

 



PS:

补充一种方法,工作中遇到的,而且已经作为我们的最终方案。

set dfs.block.size

不管是32MB,64MB,128MB或者256MB,只要保证一个block处理的记录数是自身环境所适应的就OK了。



2
6
分享到:
评论
1 楼 superlxw1234 2012-07-06  
好文!
如何合理的控制map/reduce数量,一直是hive优化中最头疼的问题。

相关推荐

Global site tag (gtag.js) - Google Analytics