多读书多实践,勤思考善领悟

大数据hadoop之 二十.Hive的模式设计和事务性

本文于1714天之前发表,文中内容可能已经过时。

一. 概述

Hive看上去以及实际行为都像一个关系型数据库.用户对如表和列这类术语比较熟悉,而且Hive提供的查询语言和用户之前使用过的SQL方言非常相似.不过Hive实现和使用的方式和传统的关系型数据库是非常不同的.通常,用户视图移植关系型数据库中的模式,而事实上Hive是反模式

1. 按天划分的表

按天划分表就是一种模式,其通常会在表中加入一个时间戳,例如表名为upply_2011_01_01等等.这种每天一张表的方式在数据库领域是反模式的一种方式,但是因为实际情况下数据集增长得很快,这种方式应用还是比较广泛的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
0: jdbc:hive2://hadoopmaster:10000/> CREATE TABLE supply_2011_01_02(id int,part string,quantity int);
OK
No rows affected (1.279 seconds)
0: jdbc:hive2://hadoopmaster:10000/> CREATE TABLE supply_2011_01_03(id int,part string,quantity int);
OK
No rows affected (0.055 seconds)
0: jdbc:hive2://hadoopmaster:10000/> CREATE TABLE supply_2011_01_04(id int,part string,quantity int);
OK
No rows affected (0.056 seconds)
0: jdbc:hive2://hadoopmaster:10000/>

0: jdbc:hive2://hadoopmaster:10000/> select part,quantity supply_2011_01_02 from supply_2011_01_02
. . . . . . . . . . . . . . . . . .> union all
. . . . . . . . . . . . . . . . . .> select part,quantity supply_2011_01_02 from supply_2011_01_03
. . . . . . . . . . . . . . . . . .> where quantity<4;

对于Hive,这种情况下应该使用分区表.Hive通过Where子句中的表达式来选择查询所需要的指定分区,这样的查询执行效率高,而且看起来清晰明了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
0: jdbc:hive2://hadoopmaster:10000/> CREATE TABLE supplybypartition (id int,part string,quantity int)
. . . . . . . . . . . . . . . . . .> partitioned by (day int);

0: jdbc:hive2://hadoopmaster:10000/> alter table supplybypartition add partition(day=20110102);
OK
No rows affected (0.088 seconds)
0: jdbc:hive2://hadoopmaster:10000/> alter table supplybypartition add partition(day=20110103);
OK
No rows affected (0.067 seconds)
0: jdbc:hive2://hadoopmaster:10000/> alter table supplybypartition add partition(day=20110104);
OK
No rows affected (0.083 seconds)

0: jdbc:hive2://hadoopmaster:10000/> select * from supplybypartition
. . . . . . . . . . . . . . . . . .> where day>=20110102 and day<20110103 and quantity<4;
OK
+-----------------------+-------------------------+-----------------------------+------------------------+--+
| supplybypartition.id | supplybypartition.part | supplybypartition.quantity | supplybypartition.day |
+-----------------------+-------------------------+-----------------------------+------------------------+--+
+-----------------------+-------------------------+-----------------------------+------------------------+--+
No rows selected (0.162 seconds)
0: jdbc:hive2://hadoopmaster:10000/>

2. 关于分区

Hive中分区的功能是非常有用的,这是因为Hive通常要对输入进行全盘扫描,来满足查询条件,通过创建很多的分区确定可以优化一些查询,但是同时可能会对其他一些重要的查询不利:

HDFS用于设计存储数百万的大文件,而非数十亿的小文件.使用过多分区可能导致的一个问题就是会创建大量的非必须的hadoop文件和文件夹.一个分区就对应着一个包含有多个文件的文件夹.如果指定的表存在数百个分区,那么可能每天都会创建好几万个文件.如果保持这样的表很多年,那么最终就会超出NameNode对系统云数据信息的处理能力.因为NameNode必须将所有系统文件的元数据保存在内存中.虽然每个文件只需要少量字节大小的元数据(大约是150字节/文件),但是这样也会限制一个HDFS实例所能管理的文件总数的上限.而其他的文件系统,比如MapR和Amazon S3就没有这个限制.

MapReduce会将一个任务(job)转换成多个任务(task).默认情况下,每个task都是一个新的JVM实例,都需要开启和销毁的开销.对于小文件,每个文件都会对应一个task.在一些情况下,JVM开启和销毁的时间中销毁可能会比实际处理数据的时间消耗要长

因此,一个理想的分区方案不应该导致产生太多的分区和文件夹目录,并且每个目录下的文件应该足够大,应该是文件系统中块大小的若干倍.

接时间范围进行分区的一个好的策略就是按照不同的时间颗粒度来确定合适大小的数据积累量,而且安装这个时间颗粒.随着时间的推移,分区数量的增长是均匀的,而且每个分区下包含的文件大小至少是文件系统中块或块大小的数倍.

如果用户找不到好的,大小相对合适的分区方式的话,我们可以考虑使用分桶表来解决问题

3. 关于分桶表数据存储

数据分桶的适用场景:

分区提供了一个隔离数据和优化查询的便利方式,不过并非所有的数据都可形成合理的分区,

尤其是需要确定合适大小的分区划分方式,(不合理的数据分区划分方式可能导致有的分区数据过多,而某些分区没有什么数据的尴尬情况)

试试分桶是将数据集分解为更容易管理的若干部分的另一种技术。

据分桶的原理:

跟MR中的HashPartitioner的原理一模一样
MR中:按照key的hash值去模除以reductTask的个数
Hive中:按照分桶字段的hash值去模除以分桶的个数
Hive也是 针对某一列进行桶的组织。Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。

数据分桶的作用:

好处:
1、方便抽样
2、提高join查询效率
(1)获得更高的查询处理效率。桶为表加上了额外的结构,Hive 在处理有些查询时能利用这个结构。具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用 Map 端连接 (Map-side join)高效的实现。比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量。
(2)使取样(sampling)更高效。在处理大规模数据集时,在开发和修改查询的阶段,如果能在数据集的一小部分数据上试运行查询,会带来很多方便。

创建数据分桶表:

创建数据分桶表与普通表的表区别并不太大,如下为一个创建数据分桶表的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use clickcube;

CREATE EXTERNAL TABLE `clickcube_mid`(
`logtype` bigint,
`date` string,
`hour` bigint,
`projectid` bigint,
`campaignid` bigint,
`templateid` bigint,
`mediaid` bigint,
`slotid` bigint,
`channeltype` bigint,
`regioncode` string,
`campclick` bigint,
`campimp` bigint,
`mediaclick` bigint,
`mediaimp` bigint,
`templateimp` bigint,
`templatecampimp` bigint,
`mediaclickcost` double,
`campclickcost` double)
PARTITIONED BY (
`day` string)
CLUSTERED BY (
`campaignid`, `mediaid` ) INTO 100 BUCKETS
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
TBLPROPERTIES (
'last_modified_by'='cloudera-scm',
'last_modified_time'='1530676367',
'transient_lastDdlTime'='1530676367')

其实主要注意的地方就如下的点:
CLUSTERED BY (campaignid, mediaid ) INTO 100 BUCKETS

如何将数据插入分桶表

将数据导入分桶表主要通过以下步骤
第一步:
从hdfs或本地磁盘中load数据,导入中间表
第二步:
通过从中间表查询的方式的完成数据导入

分桶的实质就是对 分桶的字段做了hash 然后存放到对应文件中,所以说如果原有数据没有按key hash ,

需要在插入分桶的时候hash, 也就是说向分桶表中插入数据的时候必然要执行一次MAPREDUCE,

这也就是分桶表的数据基本只能通过从结果集查询插入的方式进行导入

这里我们主要讲解第二步:

主要的过程我们写为一个SQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use clickcube;

set hive.enforce.bucketing = true;

INSERT OVERWRITE TABLE clickcube_mid_bucket
PARTITION( day = '2018-07-03' )
SELECT
clickcube_mid.logtype,
clickcube_mid.`date`,
clickcube_mid.`hour`,
clickcube_mid.projectid,
clickcube_mid.campaignid,
clickcube_mid.templateid,
clickcube_mid.mediaid,
clickcube_mid.slotid,
clickcube_mid.channeltype,
clickcube_mid.regioncode,
clickcube_mid.campclick,
clickcube_mid.campimp,
clickcube_mid.mediaclick,
clickcube_mid.mediaimp,
clickcube_mid.templateimp,
clickcube_mid.templatecampimp,
clickcube_mid.mediaclickcost,
clickcube_mid.campclickcost
FROM clickcube_mid
WHERE day = '2018-07-03'

这里我们需要注意几点
我们需要确保reduce 的数量与表中的bucket 数量一致,为此有两种做法
1.让hive强制分桶,自动按照分桶表的bucket 进行分桶。(推荐)

1
set  hive.enforce.bucketing = true;

2.手动指定reduce数量

1
2
3
set mapreduce.job.reduces = num;
/
set mapreduce.reduce.tasks = num;

并在 SELECT 后增加CLUSTER BY 语句
下面展示下整体的数据导入脚本
主要分为3个文件:

-rw-r–r–. 1 root root 637 7月 4 20:37 insert_into_bucket.hql
-rw-r–r–. 1 root root 37 7月 4 20:26 insert_into_bucket.init
-rwxr-xr-x. 1 root root 1788 7月 4 20:27 insert_into_bucket.sh

insert_into_bucket.hql 数据导入HQL
insert_into_bucket.init 设置初始环境
insert_into_bucket.sh 主体执行脚本
insert_into_bucket.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#! /bin/bash

set -o errexit

source /etc/profile
source ~/.bashrc

ROOT_PATH=$(dirname $(readlink -f $0))
echo $ROOT_PATH

date_pattern_old='^[0-9]{4}-[0-9]{1,2}-[0-9]{1,2}$'
date_pattern='^[0-9]{4}-((0([1-9]{1}))|(1[1|2]))-(([0-2]([0-9]{1}))|(3[0|1]))$'

#参数数量
argsnum=$#

#一些默认值
curDate=`date +%Y%m%d`
partitionDate=`date -d '-1 day' +%Y-%m-%d`
fileLocDate=`date -d '-1 day' +%Y-%m-%d`

#日志存放位置
logdir=insert_bucket_logs

function tips() {
echo "Usage : insert_into_bucket.sh [date]"
echo "Args :"
echo "date"
echo " date use this format yyyy-MM-dd , ex : 2018-06-02"
echo "============================================================"
echo "Example :"
echo " example1 : sh insert_into_bucket.sh"
echo " example2 : sh insert_into_bucket.sh 2018-06-02"
}

if [ $argsnum -eq 0 ] ; then
echo "No argument, use default value"
elif [ $argsnum -eq 1 ] ; then
echo "One argument, check date pattern"
arg1=$1
if ! [[ "$arg1" =~ $date_pattern ]] ; then
echo -e "\033[31m Please specify valid date in format like 2018-06-02"
echo -e "\033[0m"
tips
exit 1
fi
dateArr=($(echo $arg1 |tr "-" " "))
echo "dateArr length is "${#dateArr[@]}
partitionDate=${dateArr[0]}-${dateArr[1]}-${dateArr[2]}
else
echo -e "\033[31m Not valid num of arguments"
echo -e "\033[0m"
tips
exit 1
fi


if [ ! -d "$logdir" ]; then
mkdir -p $logdir
fi


cd $ROOT_PATH

#nohup hive -hivevar p_date=${partitionDate} -hivevar f_date=${fileLocDate} -f hdfs_add_partition_dmp_clearlog.hql >> $logdir/load_${curDate}.log

nohup beeline -u jdbc:hive2://master:10000 -n root --color=true --silent=false --hivevar p_date=${partitionDate} -i insert_into_bucket.init -f insert_into_bucket.hql >> $logdir/insert_bucket_${curDate}.log

insert_into_bucket.init

1
set hive.enforce.bucketing = true;

insert_into_bucket.hql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use clickcube;

INSERT OVERWRITE TABLE clickcube_mid_bucket
PARTITION( day = '${hivevar:p_date}' )
SELECT
clickcube_mid.logtype,
clickcube_mid.`date`,
clickcube_mid.`hour`,
clickcube_mid.projectid,
clickcube_mid.campaignid,
clickcube_mid.templateid,
clickcube_mid.mediaid,
clickcube_mid.slotid,
clickcube_mid.channeltype,
clickcube_mid.regioncode,
clickcube_mid.campclick,
clickcube_mid.campimp,
clickcube_mid.mediaclick,
clickcube_mid.mediaimp,
clickcube_mid.templateimp,
clickcube_mid.templatecampimp,
clickcube_mid.mediaclickcost,
clickcube_mid.campclickcost
FROM clickcube_mid
WHERE day = '${hivevar:p_date}'

针对于分桶表的数据抽样:

分桶的一个主要优势就是数据抽样,
主要有两种方式:基于桶抽样、基于百分比抽样。

1)基于桶抽样:

1
2
3
4
hive> SELECT * FROMbucketed_users 
> TABLESAMPLE(BUCKET 1 OUT OF 4 ON id);
0 Nat
4 Ann

桶的个数从1开始计数。因此,前面的查询从4个桶的第一个中获取所有的用户。 对于一个大规模的、均匀分布的数据集,这会返回表中约四分之一的数据行。我们 也可以用其他比例对若干个桶进行取样(因为取样并不是一个精确的操作,因此这个 比例不一定要是桶数的整数倍)。

说法一:

注:tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUTOF y)

  • y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32时,抽取(64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。

  • x表示从哪个bucket开始抽取。例如,table总bucket数为32,tablesample(bucket 3 out of 16),表示总共抽取(32/16=)2个bucket的数据,分别为第3个bucket和第(3+16=)19个bucket的数据。

说法二:

分桶语句中的分母表示的是数据将会被散列的桶的个数,

分子表示将会选择的桶的个数。
示例:

1
2
3
4
SELECT COUNT(1
FROM clickcube_mid_bucket 
TABLESAMPLE(BUCKET 10 OUT OF 100 ON rand()) 
WHERE day='2018-07-03';


2)基于百分比抽样:

hive另外一种按照抽样百分比进行抽样的方式,该种方式基于行数,按照输入路径下的数据块的百分比进行抽样。

这种抽样的最小单元是一个hdfs数据块,如果表的数据大小小于普通块大小128M,将返回所有行。

基于百分比的抽样方式提供了一个变量,用于控制基于数据块的调优种子信息:

1
2
3
4
5
6
7
<property>

    <name>hive.sample.seednumber</name>

    <value>0</value>

</property>

A number userd for percentage sampling. By changing this number, user will change the subsets of data sampled.

数据分桶存在的一些缺陷:

  • 如果通过数据文件LOAD 到分桶表中,会存在额外的MR负担。
  • 实际生产中分桶策略使用频率较低,更常见的还是使用数据分区。

二. 事务

1. 建表

1
2
3
4
5
6
7
8
9
hive> create table test_trancaction
> (user_id Int,name String)
> clustered by (user_id) into 3 buckets
> stored as orc TBLPROPERTIES ('transactional'='true');
OK
Time taken: 0.813 seconds
hive> create table test_insert_test(id int,name string) row format delimited fields TERMINATED BY ',';
OK
Time taken: 0.11 seconds

2. 导入数据

1
2
3
4
hive> insert into test_insert_test values(3,"ma");

hive> delete from test_insert_test where id=1;
FAILED: SemanticException [Error 10294]: Attempt to do update or delete using transaction manager that does not support these operations.

修改配置文件hive-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<!--start for trancaction -->

<property>

<name>hive.support.concurrency</name>

<value>true</value>

</property>

<property>

<name>hive.enforce.bucketing</name>

<value>true</value>

</property>

<property>

<name>hive.exec.dynamic.partition.mode</name>

<value>nonstrict</value>

</property>

<property>

<name>hive.txn.manager</name>

<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>

</property>

<property>

<name>hive.compactor.initiator.on</name>

<value>true</value>

</property>

<property>

<name>hive.compactor.worker.threads</name>

<value>1</value>

</property>

查看分桶

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
hadoop@hadoopmaster:/usr/local/hive/conf$ hdfs dfs -ls /user/hive/warehouse/test_insert_test
Found 3 items
-rwxrwxr-x 2 hadoop supergroup 6 2016-08-10 10:39 /user/hive/warehouse/test_insert_test/000000_0
-rwxrwxr-x 2 hadoop supergroup 5 2016-08-10 10:40 /user/hive/warehouse/test_insert_test/000000_0_copy_1
-rwxrwxr-x 2 hadoop supergroup 5 2016-08-10 10:40 /user/hive/warehouse/test_insert_test/000000_0_copy_2

hive> hadoop@hadoopmaster:/usr/local/hive/conf$ hdfs dfs -ls /user/hive/warehouse/test_trancaction
Found 3 items
drwxr-xr-x - hadoop supergroup 0 2016-08-10 10:45 /user/hive/warehouse/test_trancaction/delta_0000001_0000001_0000
drwxr-xr-x - hadoop supergroup 0 2016-08-10 10:46 /user/hive/warehouse/test_trancaction/delta_0000002_0000002_0000
drwxr-xr-x - hadoop supergroup 0 2016-08-10 10:46 /user/hive/warehouse/test_trancaction/delta_0000003_0000003_0000


hive> delete from test_trancaction where user_id=1;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. tez, spark) or using Hive 1.X releases.
Query ID = hadoop_20160810104829_0e78e0cd-2bc9-4741-89c1-7a8d1f384682
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 3
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1470228460967_0010, Tracking URL = http://hadoopmaster:8088/proxy/application_1470228460967_0010/
Kill Command = /usr/local/hadoop/bin/hadoop job -kill job_1470228460967_0010
Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 3
2016-08-10 10:48:36,463 Stage-1 map = 0%, reduce = 0%
2016-08-10 10:48:41,784 Stage-1 map = 33%, reduce = 0%, Cumulative CPU 0.97 sec
2016-08-10 10:48:46,913 Stage-1 map = 67%, reduce = 0%, Cumulative CPU 2.0 sec
2016-08-10 10:48:48,970 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.0 sec
2016-08-10 10:48:50,020 Stage-1 map = 100%, reduce = 33%, Cumulative CPU 4.1 sec
2016-08-10 10:48:54,117 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 5.76 sec
MapReduce Total cumulative CPU time: 5 seconds 760 msec
Ended Job = job_1470228460967_0010
Loading data to table default.test_trancaction
MapReduce Jobs Launched:
Stage-Stage-1: Map: 3 Reduce: 3 Cumulative CPU: 5.76 sec HDFS Read: 32745 HDFS Write: 701 SUCCESS
Total MapReduce CPU Time Spent: 5 seconds 760 msec
OK
Time taken: 26.074 seconds

最后总结一下,做Hive的Transaction其实不合适,资源耗用量大,意义不大,本身hive做离线查询还是可以的.ACID支持你饶了我吧….二点:1)需要分桶表 2)需要修改hive-site.xml文件,剩余的还是很简单的.

参考链接

ACID and Transactions in Hive

1
2