多读书多实践,勤思考善领悟

大数据hadoop之 二十一.Hive优化

本文于1713天之前发表,文中内容可能已经过时。

Hive优化内容

  • Fetch抓取
    • Hive 中对某些情况的查询可以不必使用 MapReduce 计算
  • 本地模式
    • 当数据量非常小的时候,通过设置本地模式在单台机器上处理所有任务,可提高效率
  • 表的优化
    • 小表join大表
    • 大表join大表
      • 空KEY过滤
      • 空Key转化
    • MapJoin
      • 注意:MapJoin的工作机制
      • mapjoin对分桶表join的优化
    • Group By
      • Map端的聚合
    • Count(Distinct) 去重统计
      • 一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换
    • 笛卡尔积
      • 避免使用笛卡尔积
    • 行列过滤
      • 避免使用select *
      • 通过子查询后,再关联表
    • 动态分区的调整
      • 注意动态分区的语法
    • 分桶
      • 分桶的理解以及插入数据时注意事项
    • 分区
      • 分区的理解
    • left semi join
      • 对 in/exists的子查询提供了更高效的优化方式
    • insert into 代替 union all
  • 数据倾斜
    • 合理设置Map数量
    • 小文件合并
    • 复杂文件增加Map数
    • 合理设置Reduce数量
  • 并行执行
  • 严格模式
  • JVM重用(同一个job中的tasks)
  • 推测执行
  • 压缩
  • 执行计划

一、Fetch抓取

  • 概念:

    • Fetch 抓取是指,Hive 中对某些情况的查询可以不必使用 MapReduce 计算。
    • 例如:
      • SELECT * FROM emp;
      • 在这种情况下,Hive 可以简单地读取 emp 对应的存储目录下的文件,然后输出查询结果到控制台。
  • 操作

    • 在 hive-default.xml.template 文件中 hive.fetch.task.conversion属性值设置

    • 修改属性值:

      • hive.fetch.task.conversion

        • CDH5.3.6中对应的hive 0.13.1中,该属性值默认为minimal

        • 之后新版中该属性值默认为more

          1
          2
          3
          4
          5
          6
          7
          8
          9
           * none : disable hive.fetch.task.conversion

          * minimal : SELECT STAR, FILTER on partition columns, LIMIT only

          * minimal格式下:只有select * ; filter分区列过滤 下不走MR

          * more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)

          * more格式下:select;filter;limit不走MR

二、本地模式(小任务)

  • 概念
    • 当数据量非常小时,在这种情况下,为查询触发执行任务时消耗可能会比实际 job 的执行时间要多的多。
    • 可以联系到hadoop运行MapReduce job时候:
      • 当job很小时,application master会在自己的本地虚拟机中运行tasks,这时相比较于allocating and running in new containers的成本而言的,也就是说相比较于开启一个新的container并耗费资源拉取数据的时间比job执行时间都长。
    • 满足以下条件时,可以使Hive 通过本地模式在单台机器上处理所有的任务。
      • job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
      • 输入文件的个数必须小于参数hive.exec.mode.local.auto.input.files.max,默认为4
      • job的reduce数必须为0或者1
  • 操作
    • set hive.exec.mode.local.auto=true; //开启本地 mr
    • set hive.exec.mode.local.auto.inputbytes.max=50000000;
      • 设置 local mr 的最大输入数据量,当输入数据量小于这个值时采用 local mr 的方式,默认为 134217728,即 128M
    • set hive.exec.mode.local.auto.input.files.max=10;
      • 设置 local mr 的最大输入文件个数,当输入文件个数小于这个值时采用 local mr 的方式,默认为 4

三、表的优化

1. 小表join大表

  • 概念
    • 大表放在后面
      • hive在处理每一个MR阶段的join时,the last table in the sequence is streamed through the reducers whereas the others are buffered。即最后一张表是通过reducer拉取数据得到的,而前面的表均是缓存到内存中的,因此,为了减少reducer内存的使用,选择将数据量大的表放在最后边,其他表缓存到reducer中)
      • 因此通常需要将小表放前面,或者标记哪张表是大表:/streamtable(table_name) /
      • 将 key 相对分散,并且数据量小的表放在 join 的左边,这样可以有效减少内存溢出错误发生的几率;再进一步,可以使用 Group 让小的维度表(1000 条以下的记录条数)先进内存。在 map 端完成 reduce。(也就是开启map端的聚合)、
    • 使用相同连接键
      • 首先要明白: hive是将joins转化为MR任务去执行的,当多张表的连接条件为同一个join clause则开启一个MR任务,当不同的join clauses时,则开启多个MR任务。
      • 因此,当对3个或者更多个表进行join连接时,如果每个on子句都使用相同的连接键的话,那么只会产生一个MapReduce job。
    • 实际测试发现:新版的 hive 已经对小表 JOIN 大表和大表 JOIN 小表进行了优化。小表放在左边和右边已经没有明显区别。
    • 其实开启map端join后,还可以避免数据倾斜问题
  • 操作
    • 设置属性值hive.auto.convert.join,默认为true

2. 大表join大表

2.1 空KEY过滤

  • 概念

    • 有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同的 reducer 上,从而导致内存不够,

    • 此时我们应该仔细分析这些异常的 key,很多情况下,这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。例如 key 对应的字段为空

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      insert overwrite table jointable 
      select
      n.*
      from (
      select
      *
      from
      nullidtable
      where id is not null ) n
      left join ori o on n.id = o.id;

2.2 空KEY转化

  • 概念

    • 有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上。
    • 此时注意:空key表full join表,防止数据丢失
  • 操作

    • 设置reduce个数

      set mapreduce.job.reduces = 5;

    • join(空KEY转化)

      1
      2
      3
      4
      5
      6
      7
      insert overwrite table jointable
      select
      n.*
      from
      nullidtable n
      full join ori o
      on case when n.id is null then concat('hive', rand()) else n.id end = o.id;

3. MapJoin

  • 概念

    • 如果不指定 MapJoin 或者不符合 MapJoin 的条件,那么 Hive 解析器会将 Join 操作转换,成 Common Join
    • 即:在 Reduce 阶段完成 join。容易发生数据倾斜。可以用 MapJoin 把小表全部加载到内存在 map 端进行 join,避免 reducer 处理。
  • 参数设置

    • hive.auto.convert.join 设置mapjoin打开关闭;默认为true
    • hive.mapjoin.smalltable.filesize 设置小表阈值,默认为25000000;25M
  • 深入理解

    • 当满足mapjoin条件时,会自动更改为mapJoin

    • 也可以手动指定为MapJoin

    • But the mapjoin hint should only be used for the following query

      • if all the inputs are bucketed or sorted,and the join should be converted to a bucketed map-side join or bucketized sort-merge join.

      • 当两张join的表根据同一列进行分桶,且表的分桶数相同或者成倍数,则两表连接时会对应join

        1
        select /* + MAPJOIN(b)*/ a.key,a.value from a join b on a.key = b.key
        • join can be done on the mapper only
        • Instead of fetching B completely for each mapper of A,only the required buckets are fetched.
        • the mapper processing bucket 1 for A will only fetch bucket 1 of B
        • 需要设置属性:set hive.optimize.bucketmapjoin = true;
      • 当两张join的表根据同一列进行分桶且桶内根据此列有序,且表的分桶数相同,a sort-merge join can be performed. The corresponding buckets are joined with each other at the mapper.

4. Group By

  • 概念

    • 默认情况下,Map 阶段同一 Key 数据分发给一个 reduce,当一个 key 数据过大时就倾斜了。
    • 并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果。
    • 可对应于hadoop的MapReduce中的combine操作,与reduce实现相同业务逻辑,运行每一个map task中,减轻shuffle中从map到reduce的传输。
    • 不是所有的聚合都需要进行此项优化。当group by 的字段没有相同的时,则无效
      • select * from emp group by empno;
      • 因为empno没有重复的,因此map聚合没有太大意义,并且浪费资源。
  • 参数设置

    • 是否在 Map 端进行聚合,默认为 True

      1
      hive.map.aggr = true
    • 在 Map 端进行聚合操作的条目数目

      1
      hive.groupby.mapaggr.checkinterval = 100000
    • 有数据倾斜的时候进行负载均衡(默认是 false)

      1
      2
      3
      4
      5
      6
      7
      hive.groupby.skewindata = true

      * 当选项设定为 true,生成的查询计划会有两个 MR Job。

      * 第一个 MR Job 中,Map 的输出结果会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;、

      * 第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

5. count(distinct) 去重统计

  • 概念

    • 数据量小的时候无所谓,数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成,
    • 一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换:
  • 参数设置

    • set mapreduce.job.reduces = xxxx; 对reduce个数进行设置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      select 
      count(id)
      from (
      select
      id
      from
      bigtable
      group by
      id) a;
    • 先根据id进行部分聚合,然后统计个数

6. 笛卡尔积

1
* 尽量避免笛卡尔积,join 的时候不加 on 条件,或者无效的 on 条件,Hive 只能使用 1个 reducer 来完成笛卡尔积

7. 行列过滤

  • 列处理:

    • 在 SELECT 中,只拿需要的列,如果有,尽量使用分区过滤,少用 SELECT *。
  • 行处理:

    • 在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在 Where 后面那么就会先全表关联,之后再过滤,应该通过子查询后,再关联表

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      select 
      b.id
      from
      bigtable b
      join (
      select
      id
      from
      ori
      where id <= 10 ) o
      on b.id = o.id;

8. 动态分区调整

  • 概念

    • 关系型数据库中,对分区表 Insert 数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中
    • Hive 中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用 Hive 的动态分区,需要进行相应的配置。
  • 参数设置

    • 开启动态分区功能(默认 true,开启)

      1
      hive.exec.dynamic.partition=true
    • 设置为非严格模式(动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。)

      1
      hive.exec.dynamic.partition.mode=nonstrict
    • 在所有执行 MR 的节点上,最大一共可以创建多少个动态分区。

      1
      hive.exec.max.dynamic.partitions=1000
    • 在每个执行 MR 的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即 day 字段有 365 个值,那么该参数就需要设置成大于 365,如果使用默认值 100,则会报错。

      1
      hive.exec.max.dynamic.partitions.pernode=100
    • 整个 MR Job 中,最大可以创建多少个 HDFS 文件。

      1
      hive.exec.max.created.files=100000
    • 当有空分区生成时,是否抛出异常。一般不需要设置。

      1
      hive.error.on.empty.partition=false
  • 操作

    1
    2
    3
    4
    5
    6
    insert overwrite table ori_partitioned_target
    partition (p_time) //指定分区列
    select
    id, time, uid, keyword, url_rank, click_num, click_url, p_time //分区列也要作为一个字段查出
    from
    ori_partitioned;

9. 分区

  • 分区的理解:

    • 分区是一种根据“分区列”的值对表进行粗略划分的机制

    • Hive中的每个分区对应着数据库中相应分区列的一个索引;

      • 每个分区对应着表下的一个目录
      • 分区在HDFS上的表现形式与表在 HDFS上的表现形式相同,都是以子目录的形式存在
    • 查询时通过where选择查询的指定分区,可提高查询效率。

      • 当只需要遍历小范围内的数据时或一定条件下的数据,可有效减少扫描数据的数量
    • 一个表可在多个维度上进行分区,且分区可以嵌套使用,建表时通过partitioned by 来创建分区

      • partitioned by (year string)
      • partitioned by (year string,month string)
    • 将分区加载到表内之前,需要指定添加分区列,否则报错。

      • 此时联系到动分区的调整

      • 动态分区需要:

        ①开启动态分区

        1
        hive.exec.dynamic.partition=true

        ②设置为非严格模式

        1
        hive.exec.dynamic.partition.mode=nonstrict
    • partitioned by 子句中的列是表中正式的列(分区列,通过select * 可查到),但是表数据中并不包含这些列

10. 分桶

  • 分桶的理解
    • 桶为表提供了额外的结构,hive在处理某些查询时利用这个结构,能提高查询效率
    • 桶通过对指定列进行哈希计算来实现的,通过对哈希值将一个列名下的数据切分为一组桶,并使每个桶对应于该列名下的存储文件
    • 建立桶之前,需要设置hive.enforce.bucketing = true;
    • 分区和分桶其中一个区别在于
      • 分区中的指定列在表数据中不存在,即数据文件中不存在
      • 分桶中的指定列在表数据中存在
    • 向桶中插入数据,若分为4个桶,则在插入数据时,对应于4个reduce操作,输出4个文件
  • 分区中的分桶
    • 注意分区中分桶的数据插入
    • clustered by (id) sorted by (age) into 4 buckets
      • clustered by 和 sorted by 不会影响数据的导入,插入的数据若需要排序,则需要手动定义

11. left semi join

  • 概念

    • left semi join是对in/exists子查询提供了更高效的方式

    • 下面两个语句等价

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      select 
      a.key,a.value
      from
      a
      where
      a.key in(
      select b.key from B);

      等价于

      select
      a.key,a.value
      from
      a
      left semi join b
      on (a.key = b.key)
    • The restrictions of using left semi join are that the right-side join table should only be referened in the join condition, but not in where or select clause etc.(left semi join中右边的表只能在on 中被引用,不可在where,select中等使用)

12. insert into 代替 union all

  • 不同表的union all相当于multiple inputs,同一个表的union all,相当map一次输出多条。

  • 如果union all的部分个数大于2,或者每个union部分数据量大,应该拆成多个insert into 语句,实际测试过程中,执行时间能提升50%

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    insert overwite table tablename partition (dt= ....)    
    select ..... from ( select ... from A
    union all  
    select ... from B  union all select ... from C ) R  
    where ...;

    可以改写为:

    insert into table tablename partition (dt= ....) select .... from A WHERE ...;

    insert into table tablename partition (dt= ....) select .... from B  WHERE ...;

    insert into table tablename partition (dt= ....) select .... from C WHERE ...;

四、数据倾斜

概述

  • 数据倾斜表现:

    • 任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。
  • 数据倾斜原因:

    • key分布不均匀

    • 业务数据本身的特性

    • 建表时考虑不周

    • 某些SQL语句本身就有数据倾斜

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      关键词                  情形                                          后果
      -----------------|----------------------------------------------|--------------------------------------------------
      join 其中一个表较小,但是key集中 分发到某一个或几个Reduce上的数据远高于平均值
      -----------------|----------------------------------------------|-------------------------------------------------
      join 大表与大表,但是分桶的判断字段0值或空值过多 这些空值都由一个reduce处理,非常慢
      -----------------|----------------------------------------------|------------------------------------------------
      group by group by 维度过小,某值的数量过多 处理某值的reduce非常耗时
      -----------------|----------------------------------------------|-------------------------------------------------
      count distinct 某特殊值过多 处理此特殊值reduce耗时
      -----------------|----------------------------------------------|-------------------------------------------------

1. 合理设置map数量

  • 概念
    • 通常情况下,作业会通过 input 的目录产生一个或者多个 map 任务
      • 主要的决定因素有:input 的文件总个数,input 的文件大小,集群设置的文件块大小。
      • 涉及到split切片机制
        • size = Math.max(minSize,Math.min(maxSize,blockSize));
      • 如果集群性能好的话,可设置blockSize
      • 开启map端的聚合
    • 也可以适当调整环形缓冲区的大小以提高效率

2. 小文件进行合并

  • 在 map 执行前合并小文件,减少 map 数:

    • CombineHiveInputFormat 具有对小文件进行合并的功能(系统默认的格式)。

    • HiveInputFormat 没有对小文件合并功能。

      1
      set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

3. 复杂文件增加 Map 数

  • 当 input 的文件都很大,任务逻辑复杂,map 执行非常慢的时候,可以考虑增加 Map数,来使得每个 map 处理的数据量减少,从而提高任务的执行效率。

  • 增加 map 的方法为:根据

    1
    computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式
  • 调整 maxSize 最大值。让 maxSize 最大值低于 blocksize 就可以增加 map 的个数。

4. 合理设置 Reduce 数

4.1 调整 reduce 个数方法一

  • 每个 Reduce 处理的数据量默认是 256MB

    1
    hive.exec.reducers.bytes.per.reducer=256000000
  • 每个任务最大的 reduce 数,默认为 1009

    1
    hive.exec.reducers.max=1009
  • 计算 reducer 数的公式

    1
    N=min(参数 2,总输入数据量/参数 1)

4.2 调整 reduce 个数方法二

  • 在 hadoop 的 mapred-default.xml 文件中修改

    1
    2
    设置每个 job 的 Reduce 个数
    set mapreduce.job.reduces = 15;
  • 具体的个数需要经过测试来得知

五、并行执行

  • 概念

    • Hive 会将一个查询转化成一个或者多个阶段。这样的阶段可以是 MapReduce 阶段、抽样阶段、合并阶段、limit 阶段。或者 Hive 执行过程中可能需要的其他阶段。
    • 默认情况下,Hive 一次只会执行一个阶段。
    • 不过,如果某些阶段不是互相依赖,是可以并行执行的。
  • 参数设置

    1
    2
    set hive.exec.parallel=true; //打开任务并行执行
    set hive.exec.parallel.thread.number=16; //同一个 sql 允许最大并行度,默认为 8。
  • 不过,在共享集群中,需要注意下,如果 job 中并行阶段增多,那么集群利用率就会增加。当然,得是在系统资源比较空闲的时候才有优势,否则,没资源,并行也起不来

六、严格模式

开启严格模式可以禁止 3 种类型的查询。

1. 分区表——where过滤

  • 对于分区表,除非 where 语句中含有分区字段过滤条件来限制范围,否则不允许执行。
  • 就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。

2. order by 搭配limit使用

  • 对于使用了 order by 语句的查询,要求必须使用 limit 语句。
  • 因为 order by 为了执行排序过程会将所有的结果数据分发到同一个 Reducer 中进行处理,强制要求用户增加这个 LIMIT语句可以防止 Reducer 额外执行很长一段时间。

3. 限制笛卡尔积的查询。

  • 多表join时,指定join条件

七、JVM重用(针对同一个job的tasks而言的)

  • 概念
    • JVM 重用是 Hadoop 调优参数的内容,其对 Hive 的性能具有非常大的影响,特别是对于很难避免小文件的场景或 task 特别多的场景,这类场景大多数执行时间都很短。
    • Hadoop中有个参数是mapred.job.reuse.jvm.num.tasks,默认是1,表示一个JVM上最多可以顺序执行的task数目(属于同一个Job)是1。也就是说一个task启一个JVM。
    • 为每个task启动一个新的JVM将耗时1秒左右,对于运行时间较长(比如1分钟以上)的job影响不大,但如果都是时间很短的task,那么频繁启停JVM会有开销。
  • 参数设置
    • 如果我们想使用JVM重用技术来提高性能,那么可以将mapred.job.reuse.jvm.num.tasks设置成大于1的数。这表示属于同一job的顺序执行的task可以共享一个JVM,也就是说第二轮的map可以重用前一轮的JVM,而不是第一轮结束后关闭JVM,第二轮再启动新的JVM。
    • 那么最多一个JVM能顺序执行多少个task才关闭呢?
      • 这个值就是mapred.job.reuse.jvm.num.tasks。如果设置成-1,那么只要是同一个job的task(无所谓多少个),都可以按顺序在一个JVM上连续执行
    • 如果task属于不同的job,那么JVM重用机制无效,不同job的task需要不同的JVM来运行
  • 注意事项
    • JVM重用技术不是指同一Job的两个或两个以上的task可以同时运行于同一JVM上,而是排队按顺序执行。
    • 一个tasktracker最多可以同时运行的task数目由mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum决定,并且这两个参数在mapred-site.xml中设置。
    • 其他方法,如在JobClient端通过命令行-Dmapred.tasktracker.map.tasks.maximum=number或者conf.set(“mapred.tasktracker.map.tasks.maximum”,”number”)设置都是无效的。
    • 这个功能的缺点是,开启 JVM 重用将一直占用使用到的 task 插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job 中有某几个 reduce task 执行的时间要比其他 Reduce task 消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的 job使用,直到所有的 task 都结束了才会释放。

八、推测执行

  • 概念

    • 在分布式集群环境下,因为程序 Bug(包括 Hadoop 本身的 bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有 50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。
    • 为了避免这种情况发生,Hadoop 采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
  • 参数设置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    <property>
    <name>mapreduce.map.speculative</name>
    <value>true</value>
    <description>If true, then multiple instances of some map tasks may be executed in parallel.</description>
    </property>

    <property>
    <name>mapreduce.reduce.speculative</name>
    <value>true</value>
    <description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description>
    </property>

    <property>
    <name>hive.mapred.reduce.tasks.speculative.execution</name>
    <value>true</value>
    <description>Whether speculative execution for reducers should be turned on.</description>
    </property>
  • 注意事项

    • 如果用户因为输入数据量很大而需要执行长时间的 map 或者 Reduce task 的话,那么启动推测执行造成的浪费是非常巨大大。