一、Hive基本概念
Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并提供类 SQL 查询功能。
本质是:将 HQL 转化成 MapReduce 程序

架构原理

- 用户接口(Client):CLI(hive shell)、JDBC/ODBC(java 访问 hive)、WebUI(浏览器访问 hive)
- 元数据(Metastore):包括表名、表所属的数据库(默认是 default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;默认存储在自带的 derby 数据库中,推荐使用 MySQL 存储 Metastore
- Hadoop:使用 HDFS 进行存储,使用 MapReduce 进行计算。
- 驱动器(Driver):
(1)解析器(SQL Parser):将 SQL 字符串转换成抽象语法树 AST,这一步一般都用第三方工具库完成,比如 antlr;对 AST 进行语法分析,比如表是否存在、字段是否存在、SQL 语义是否有误。
(2)编译器(Physical Plan):将 AST 编译生成逻辑执行计划。
(3)优化器(Query Optimizer):对逻辑执行计划进行优化。
(4)执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。对于 Hive 来说,就是 MR/Spark。

Hive 通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的 Driver,
结合元数据(MetaStore),将这些指令翻译成 MapReduce,提交到 Hadoop 中执行,最后,将
执行返回的结果输出到用户交互接口。
二、Hive基本操作
2.1 启动hive
启动配置的MySQL,不然会报错
启动metastore和hiveserver2nohup ./hive --service metastore &
nohup ./hive --service hiveserver2 &
可以通过/bin/hive
访问hive客户端,也可以通过beeline远程访问bin/beeline
beeline> !connect jdbc:hive2://bigdata1:10000
或直接远程登陆
[hxr@bigdata1 bi_hr]$ beeline -u jdbc:hive2://bigdata1:10000 -n hxr -p hxr --showHeader=false --outputformat=utf-8
或直接远程执行命令
beeline -u jdbc:hive2://bigdata1:10000 -n hxr -p hxr --showHeader=false --outputformat=utf-8 -e 'show databases;'
hive常用的交互命令
- -e 不进入 hive 的交互窗口执行 sql 语句
bin/hive -e "select id from student;"
- -f 执行脚本中 sql 语句
bin/hive -f /opt/module/datas/hivef.sql
hive基本数据类型
Hive数据类型 | Java数据类型 | 长度 | 例子 |
---|---|---|---|
TINYINT | byte | 1byte有符号整数 | 20 |
SMALLINT | short | 2byte有符号整数 | 20 |
INT | int | 4byte有符号整数 | 20 |
BIGINT | long | 8byte有符号整数 | 20 |
BOOLEAN | boolean | 布尔类型 | TRUE,FALSE |
FLOAT | float | 单精度浮点数 | 3.14159 |
DOUBLE | double | 双精度浮点数 | 3.14159 |
STRING | string | 字符 | “hello hive” |
TIMESTAMP | 时间类型 | ||
BINARY | 字节数组 |
NOTE:对于 Hive 的 String 类型相当于数据库的 varchar 类型,该类型是一个可变的字符串,不过它不能声明其中最多能存储多少个字符,理论上它可以存储 2GB 的字符数。
集合数据类型
数据类型 | 描述 | 语法示例 |
---|---|---|
STRUCT | struct() | |
MAP | map() | |
ARRAY | struct() |
Hive 有三种复杂数据类型 ARRAY、MAP 和 STRUCT。ARRAY 和 MAP 与 Java 中的Array 和 Map 类似,而 STRUCT 与 C 语言中的 Struct 类似,它封装了一个命名字段集合,复杂数据类型允许任意层次的嵌套。
案例实操
1) 假设某表有如下一行,我们用 JSON 格式来表示其数据结构。在 Hive 下访问的格式为
{
"name": "songsong",
"friends": ["bingbing" , "lili"] , //列表 Array,
"children": { //键值 Map,
"xiao song": 18 ,
"xiaoxiao song": 19
}
"address": { //结构 Struct,
"street": "hui long guan" ,
"city": "beijing"
} }
2)基于上述数据结构,我们在 Hive 里创建对应的表,并导入数据。
创建本地测试文件 test.txt
songsong,bingbing_lili,xiao song:18_xiaoxiao song:19,hui long
guan_beijing
yangyang,caicai_susu,xiao yang:18_xiaoxiao yang:19,chao
yang_beijing
注意:MAP,STRUCT 和 ARRAY 里的元素间关系都可以用同一个字符表示,这里用“_”。
3)Hive 上创建测试表 test
create table test(
name string,
friends array,
children map,
address struct
)
row format delimited
fields terminated by ','
collection items terminated by '_'
map keys terminated by ':'
lines terminated by 'n';
字段解释:
row format delimited fields terminated by ‘,’ — 列分隔符
collection items terminated by ‘_’ –MAP STRUCT 和 ARRAY 的>>分隔符(数据分割符号)
map keys terminated by ‘:’ — MAP 中的 key 与 value 的分隔符
lines terminated by ‘n’; — 行分隔符
4)导入文本数据到测试表
hive (default)> load data local inpath
“/opt/module/datas/test.txt” into table test; 5)访问三种集合列里的数据,以下分别是 ARRAY,MAP,STRUCT 的访问方式
hive (default)> select friends[1],children['xiao
song'],address.city from test
where name="songsong";
OK
_c0 _c1 city
lili 18 beijing
Time taken: 0.076 seconds, Fetched: 1 row(s)
类型转化
Hive 的原子数据类型是可以进行隐式转换的,类似于 Java 的类型转换,例如某表达式使用 INT 类型,TINYINT 会自动转换为 INT 类型,但是 Hive 不会进行反向转化,例如,某表达式使用 TINYINT 类型,INT 不会自动转换为 TINYINT 类型,它会返回错误,除非使用 CAST 操作。
隐式类型转换规则如下
- 任何整数类型都可以隐式地转换为一个范围更广的类型,如 TINYINT 可以转换 成 INT,INT 可以转换成 BIGINT。
- 所有整数类型、FLOAT 和 STRING 类型都可以隐式地转换成 DOUBLE。
- TINYINT、SMALLINT、INT 都可以转换为 FLOAT。
- BOOLEAN 类型不可以转换为任何其它的类型。
可以使用 CAST 操作显示进行数据类型转换CAST('1' AS INT)
如果转换失败,返回null。
2.2 DDL
2.2.1 数据库操作
查看数据库show databases;
show databases like 'db_hive*';
查看数据库详情
desc database db_hive;
desc database extended db_hive;
创建数据库
CREATE DATABASE [IF NOT EXISTS] database_name
[COMMENT database_comment]
[LOCATION hdfs_path]
[WITH DBPROPERTIES (property_name=property_value, ...)];
使用数据库use [database_name];
修改数据库alter database db_hive set dbproperties('createtime'='20170830');
删除空数据库drop database [IF NOT EXISTS] db_hive;
数据库不为空,强制删除数据库drop database db_hive cascade;
2.2.2 表操作
查看表 show tables;
创建表
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type `创建分区表`
[COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...) `创建分桶表`
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] `对桶中的一个或多个列另外排序`
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path] `指定表在HDFS上的存储位置`
[TBLPROPERTIES (property_name=property_value, ...)]
[AS select_statement]
`LIKE允许用户复制现有的表结构,但是不复制数据`
删除表 drop table student;
查看表的结构 desc student;
查看表的详细信息desc formatted student;
或 describe extended student;
查看分区表有多少分区show partitions dept_partition;
修改内部表为外部表 alter table xxx set tblproperties('external'='true')
修改外部表为内部表 alter table xxx set tblproperties('external'='false')
增加单个分区 alter table ods_q6_log add partition(dt='2020-06-15');
增加多个分区 alter table ods_q6_log add partition(dt='2020-06-15'), partition(dt='2020-06-16');
删除单个分区 alter table ods_q6_log drop if exists partition(dt='2020-06-15');
删除多个分区 alter table ods_q6_log drop if exists partition(dt='2020-06-15'), partition(dt='2020-06-16');
重命名表 alter table xxxx rename to yyyy
修改列(修改列类型可能不生效) alter table xxx change column [col_old_name] [col_new_name] [column_type] comment [col_comment]
增加列/替换alter table xxx add/replace columns ([col_name] [data_type] comment [col_comment], ......)
注:ADD 是代表新增一字段,字段位置在所有列后面(partition 列前),REPLACE 则是表示替换表中所有字段。
交换列名(只是交换了元数据的名字,数据对应关系不变)
alter table student change name name string after nickname;
增加分区
hive (default)> alter table dept_partition add partition(month=’201705′) partition(month=’201704′);
二级分区表
# 创建二级分区表
create table dept_partition2(
deptno int, dname string, loc string
)
partitioned by (month string, day string)
row format delimited fields terminated by 't';
# 导入数据
load data local inpath '/opt/module/datas/dept.txt' into table default.dept_partition2 partition(month='201709', day='13');
2.3 DCL
2.4 DML
2.4.1 数据导入
- 向表中装载数据(Load)
# 创建student表, 并声明文件分隔符’t’(hive默认的分隔符01 )
hive> create table student(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY 't';
hive> load data local inpath '/opt/module/datas/student.txt' into table student partition(dt='xxxx');
- local:表示从本地加载数据到 hive 表;否则从 HDFS 加载数据到 hive 表
- inpath:表示加载数据的路径
- overwrite:表示覆盖表中已有数据,否则表示追加
- partition:表示上传到指定分区
-
Import 数据到指定 Hive 表中
注意:先用 export 导出后,再将数据导入。import table student2 partition(month='201709') from '/user/hive/warehouse/export/student';
2.4.2 数据导出
①Insert 导出
导出到本地insert overwrite local directory '/opt/module/datas/export/student1' ROW FORMAT DELIMITED FIELDS TERMINATED BY 't' select * from student;
导出到hdfsinsert overwrite directory '/user/atguigu/student2' ROW FORMAT DELIMITED FIELDS TERMINATED BY 't' select * from student;
②Hadoop 命令导出到本地dfs -get /user/hive/warehouse/student/month=201709/000000_0 /opt/module/datas/export/student3.txt;
③Hive Shell 命令导出bin/hive -e 'select * from default.student;' > /opt/module/datas/export/student4.txt;
④Export 导出到 HDFS 上export table default.student to '/user/hive/warehouse/export/student';
⑤Sqoop 导出sqoop export xxx
2.4.3 一般操作
插入数据insert into student values(1000,"ss");
清除表中数据(Truncate)truncate table student;
查询语句中创建表并加载数据(As Select)create table if not exists xxxx as select id, name from yyyy;
查询表记录并插入原表insert into table xxxxx partition(dt='xxxx') select * from xxx;
查询表记录并覆盖回原表,实现表的更新insert overwrite table xxxxx partition(dt='xxxx') select * from xxx;
2.5 DQL
查询语句
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
FROM table_reference
[WHERE where_condition]
[GROUP BY col_list]
[ORDER BY col_list]
[CLUSTER BY col_list
| [DISTRIBUTE BY col_list] [SORT BY col_list]
]
[LIMIT number]
算术运算符select sal+1 from emp;
注:包括+
、-
、*
、/
、%
、&
、|
、^
、~
等运算符
常用函数select count(*)/max(sal)/min(sal)/sum(sal)/avg(sal)/round(xxx,n) from emp;
比较运算符select * from emp where sal RLIKE '[2]';
注:包括=
、 (如果都为null,返回true;如果任一为null,返回null)、
/!=
(任一为null返回null)、、
、
>
、>=
、between and
(包括两边边界)、is null
、is not null
、in
、like
(同mysql)、rlike/regexp
(正则匹配)
分组
- Group By 语句
select t.deptno, avg(t.sal) avg_sal from emp t group by t.deptno;
- Having 语句 (聚合后进行筛选)
select deptno, avg(sal) avg_sal from emp group by deptno having avg_sal > 2000;
Join 语句
- 内连接
select e.empno, e.ename, d.deptno from emp e join dept d on e.deptno = d.deptno;
- 左外连接
select e.empno, e.ename, d.deptno from emp e left join dept d on e.deptno = d.deptno;
- 右外连接
select e.empno, e.ename, d.deptno from emp e right join dept d on e.deptno = d.deptno;
- 满外连接
select e.empno, e.ename, d.deptno from emp e full join dept d on e.deptno = d.deptno;
排序
- Order by
select ename, deptno, sal from emp order by deptno, sal desc;
- 按照别名排序
select ename, sal*2 twosal from emp order by twosal;
- 每个 MapReduce 内部排序(Sort By)
Sort By:每个 Reducer 内部进行排序,对全局结果集来说不是排序。
# 设置 reduce 个数
set mapreduce.job.reduces=3;
# 根据部门编号降序查看员工信息(每个reduce中的部门编号是降序的,汇总后的结果是部分有序的)
select * from emp sort by empno desc
# 将查询结果导入到文件中,不同于上个命令结果是杂乱无章的,导出后有三个文件,保存了每个reduce的排序结果
insert overwrite local directory '/opt/module/datas/sortby-result' select * from emp sort by deptno desc;
- 分区排序(Distribute By)
Distribute By:需要配合sort by使用,即对记录进行分区,在每个分区中进行排序。注意,Hive 要求 DISTRIBUTE BY 语句要写在 SORT BY 语句之前。
# 设置 reduce 个数
set mapreduce.job.reduces=3;
# 先按照部门编号分区,再按照员工编号降序排序
insert overwrite local directory '/opt/module/datas/distribute-result' select * from emp distribute by deptno sort by empno desc;
- Cluster By
当 distribute by 和 sorts by 字段相同时,可以使用 cluster by 代替,但是排序只能是升序排序,不能指定排序规则为 ASC 或者 DESC。
即select * from emp cluster by deptno;
等价于select * from emp distribute by deptno sort by deptno;
注意:分区时,可能多个key会进入一个分区中,如可能 20 号和 30 号部门分到一个分区里面去。
分桶及抽样查询
- 分桶表数据存储
分区针对的是数据的存储路径;分桶针对的是数据文件。
# 设置属性
set hive.enforce.bucketing=true;
# 创建分桶表
create table stu_buck(id int, name string)
clustered by(id) into 4 buckets
row format delimited fields terminated by 't';
# 查看表结构
desc formatted stu_buck;
# 导入数据
insert into table stu_buck select id, name from stu;
- 分桶抽样查询
对于非常大的数据集,有时用户需要使用的是一个具有代表性的查询结果而不是全部结果。Hive 可以通过对表进行抽样来满足这个需求。select * from stu_buck tablesample(bucket 1 out of 2 on id)
解释:table 总 bucket 数为 4,tablesample(bucket 1 out of 2),表示总共抽取(4/2=)2 个bucket 的数据,抽取第 1(x)个和第 3(x+y)个 bucket 的数据。
根据结果可知:Hive的分桶采用对分桶字段的值进行哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。
三、函数
查看系统自带的函数,支持模糊查询和正则查询show functions;
show functions like “nvl”;
显示自带的函数的用法desc function upper;
详细显示自带的函数的用法desc function extended upper;
3.1 常用函数
数据类型转换
select '1'+2, cast('1' as int) + 2;
format_number
规定数字的小数位数并输出string类型format_number(x,n)
NVL
NVL( string1, string2)
如果string1不为null,则结果为null;如果string1为null,则结果为string2,如果string1为null且string2为null,则返回null。
COALESCE
NVL的强化版,可以出入任意个参数。如果第一个参数为null则判断第二个,依次类推,如果全是null则返回null;
IF
if(boolean testCondition, T valueTrue, F valueFalseOrNull) 如果表达式结果为true则值为T ,如果表达式结果为false则值为F。
时间类
①日期格式化:date_format、date_add、next_day、last_day等函数只能识别”yyyy-MM-dd”,所以其他日期格式需要转化为”yyyy-MM-dd”格式。
-
regexp_replace('2019/06/29',/,-);
用-替换/ -
date_format('2019-06-29','yyyy-MM-dd');
date_format(regexp_replace( '2019/06/29', '/' , '-' ) , 'yyyy-MM' )
- 参数类型可以是string/date/timestamp
unix_timestamp(visitdate,'yyyy/MM/dd')
from_unixtime(unix_timestamp(visitdate,'yyyy/MM/dd') , 'yyyy-MM-dd')
只要年月可以把dd删去
②日期
- current_date:获取当前日期
select current_date
- current_timestamp:获取当前系统时间(包括毫秒数)
select current_timestamp;
- to_date:日期时间转日期
select to_date('2017-09-15 11:12:00') from dual;
- date_add:时间跟天数相加
select date_add('2019-06-29', -5);
- date_sub:时间跟天数相减
select date_sub('2019-06-29',5);
- datediff:两个时间相减
select datediff('2019-06-29','2019-06-24');
select datediff('2019-06-24 12:12:12','2019-06-29');
- next_day:取当前天的下一个周一
select next_day('2019-02-12','MO');
说明:星期一到星期日的英文(Monday,Tuesday、Wednesday、Thursday、Friday、Saturday、Sunday) - last_day:当月最后一天日期
select last_day('2019-02-10');
mysql中的事件格式是 +%Y-%m-%d 或 +%F - 获取日期中的年/月/日/时/分/秒/周
with dtime as (select from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') as dt) select year(dt), month(dt), day(dt), hour(dt), minute(dt), second(dt), weekofyear(dt) from dtime;
③时间戳
-
日期转时间戳:从1970-01-01 00:00:00 UTC到指定时间的秒数
select unix_timestamp();
–获得当前时区的UNIX时间戳select unix_timestamp('2017-09-15 14:23:00');
select unix_timestamp('2017-09-15 14:23:00','yyyy-MM-dd HH:mm:ss');
select unix_timestamp('20170915 14:23:00','yyyyMMdd HH:mm:ss');
-
时间戳转日期
select from_unixtime(1505456567);
select from_unixtime(1505456567,'yyyyMMdd');
select from_unixtime(1505456567,'yyyy-MM-dd HH:mm:ss');
select from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss');
–获取系统当前时间
④trunc函数
-
截取日期
select trunc('2020-06-10','MM');
— 2020-06-01 返回当月第一天.select trunc('2020-06-10','YY');
— 2020-01-01 返回当年第一天select trunc('2020-06-10','Q');
— 2020-04-01 -
截取数字
select trunc(123.458);
–123select trunc(123.458,0);
–123select trunc(123.458,1);
–123.4select trunc(123.458,-1);
–120select trunc(123.458,-4);
–0select trunc(123.458,4);
–123.458select trunc(123);
–123select trunc(123,1);
–123select trunc(123,-1);
–120
⑤月份函数
- 查询当前月份
select month(current_date);
- 查询当月第几天
select dayofmonth(current_date);
- 当月第1天
date_sub(current_date,dayofmonth(current_date)-1);
- 下个月第1天
add_months(date_sub(current_date,dayofmonth(current_date)-1),1);
CASE WHEN
select dept_id, sum(case sex when '男' then 1 else 0 end) male_count, sum(case sex when '女' then 1 else 0 end) female_count from emp_sex group by dept_id;
行转列

将如上数据转化为如下结构

select
t1.base,
concat_ws('|', collect_set(t1.name)) name
from
(select
name,
concat(constellation, ",", blood_type) base
from
person_info) t1
group by
t1.base;
concat_ws与concat区别
concat(“a”,”b”,”c”):将所有直接拼接起来得到abc,如果有一个元素为null,则结果为null;
concat_ws():第一个元素为分隔符,将元素通过分隔符拼接起来,如果有一个元素为null,则自动忽略;
collect_set与collect_list区别
collect_set去重,collect_list不去重;
STR_TO_MAP函数
MAP STR_TO_MAP(VARCHAR text,VARCHAR listDelimiter,VARCHAR keyValueDelimiter) 将字符换text通过listDelimiter分割为多个元素,再通过keyValueDelimiter将元素分割为key和value。
列转行
LATERAL VIEW udtf(expression) tableAlias AS columnAlias。
EXPLODE(col):将hive一列中复杂的array或者map结构拆分成多行,默认分隔符为逗号。
将如下数据

转化为如下结构

select
movie,
category_name
from
movie_info lateral view explode(category) table_tmp as category_name;
开窗函数
在其他查询执行完之后才会执行窗口函数,在查询结果表上添加一列用于记录处理结果。对查询得到的表作相应的over(..)处理:partition by对表进行分区;order by将分区中的数据进行排序;然后按行处理,rows between.. and..生成所指定的行的临时表(缺省为整表),将该临时表的数据通过聚合函数进行处理,得到当前所在行的处理结果,将结果填入到当前行的新增列中。
常见的开窗函数:
- 聚合开窗函数:count、sum、min、max、avg、first_value、last_value、lag、lead、cume_dist
- 排序开窗函数:rank、dense_rank、ntile、row_number、percent_rank
其中
- RANK() 排序相同时会重复,总数不会变;
- DENSE_RANK() 排序相同时会重复,总数会减少;
- ROW_NUMBER() 会根据顺序计算;
①相关函数说明
- OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化。
- CURRENT ROW:当前行
- n PRECEDING:往前n行数据
- n FOLLOWING:往后n行数据
- UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点。
- LAG(col,n, [default_val]):往前第 n 行数据,如果没写default_val默认为null;
- LEAD(col,n, [default_val]):往后第 n 行数据,如果没写default_val默认为null;
- NTILE(n):先根据某一属性排序,然后通过ntile(n)给其分n组,再添加一列,值是每组编号(从1开始)。 注意:n必须为int类型。如想取20%,可以NTILE(5)分成五组,取第一组。
注意:
- 窗口函数只在order by和limit之前执行
- 如果窗口函数排序,那么就默认有一个rows between 最前行到当前行。
sum(cost) over(partition by name order by orderdate)
和下句表达的意思是一样的sum(cost) over(partition by name order by orderdate rows between UNBOUNDED PRECEDING and current row )
即由起点到当前行的聚合。 - 如果窗口函数没有排序,那么默认从最前行到最后行.
sum(cost) over()
和下句表达的意思是一样的sum(cost) over(rows between unbounded preceding and current row)
- partition by .. order by ..和group by ..order by .. 区别:partition by和order by的执行顺序是连着的,分区后进入不同的reduce以关键字为key进行排序,输出时同一分区在一起且各个分区内的数据有序。而group by和order by的执行顺序不是连着的,order by是对分组后的表进行的整表排序,进入一个reduce中,所以order by一般和limit连用。
get_json_object函数
用于获取json字符串中的属性的值,参数是字段名和外部json名.json属性名。
insert overwrite table dwd_start_log
PARTITION (dt='2019-02-10')
select
get_json_object(line,'$.mid') mid_id,
get_json_object(line,'$.uid') user_id,
get_json_object(line,'$.vc') version_code
from ods_start_log
where dt='2019-02-10';
3.2 自定义函数
Hive 自带了一些函数,比如:max/min 等,但是数量有限,自己可以通过自定义 UDF来方便的扩展。
- UDF(User-Defined-Function):一进一出
- UDAF(User-Defined Aggregation Function):聚集函数,多进一出,类似于:count/max/min
- UDTF(User-Defined Table-Generating Functions):一进多出,如 lateral view explore()
依赖
org.apache.hivehive-exec2.3.0provided
3.2.1 UDF函数
如需要解析如下字符串0-115.581|1-83.281|33-2.448|36-5.677|38-1.358
获取key对应的value。
public class TimeCountUDF extends UDF {
public String evaluate(String str, String key) {
String[] kvs = str.split("\|");
for (String kv : kvs) {
String[] split = kv.split("-");
String k = split[0];
if (key.equals(k)) {
return split[1];
}
}
return "0";
}
}
步骤:
- 继承UDF类
- 实现evaluate方法
3.2.2 UDTF函数
public class ModelJsonUDTF extends GenericUDTF {
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 定义UDTF返回值类型和名称
ArrayList fieldName = new ArrayList();
fieldName.add("event_name");
fieldName.add("event_value");
ArrayList fieldType = new ArrayList();
fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldName, fieldType);
}
@Override
public void process(Object[] objects) throws HiveException {
String input = objects[0].toString();
if (StringUtils.isEmpty(input)) {
return;
}
JSONObject jsonObject = JSONObject.parseObject(input);
Set keys = jsonObject.keySet();
for (String key : keys) {
String[] result = new String[2];
try {
JSONObject json = jsonObject.getJSONObject(key);
result[0] = key;
result[1] = json.getString("value") + "|" + json.getString("time");
} catch (JSONException e) {
continue;
}
forward(result);
}
}
@Override
public void close() throws HiveException {
}
}
步骤:
- 继承GenericUDTF类
- initialize方法控制输入参数的类型
- process方法对输入进行处理并返回输出的字段
3.2.3 导入hive
打包后将jar包放置到hdfs上
hadoop fs -mkdir /user/hive/jars
hadoop fs -put /opt/module/packages/hivefunction-1.0-SNAPSHOT-jar-with-dependencies.jar /user/hive/jars/
创建永久函数
create function device_udf as 'com.cj.hive.TimeCountUDF ' using jar 'hdfs://bigdata1:9000/user/hive/jars/hivefunction-1.0-SNAPSHOT-jar-with-dependencies.jar';
create function addr_udtf as 'com.cj.hive.ModelJsonUDTF ' using jar 'hdfs://bigdata1:9000/user/hive/jars/hivefunction-1.0-SNAPSHOT-jar-with-dependencies.jar';
show functions like "*udtf*"; # 模糊匹配搜索函数
删除函数drop [temporary] function [if exists] [dbname.]function_name;
四、hive调优
- 非mapreduce查询
hive.fetch.task.conversion默认是more,全局查找、字段查找、limit查找等都不走mapreduce。 - 本地模式
Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
用户可以通过设置hive.exec.mode.local.auto的值为true来使hive自动启动本地模式。 - 小表join大表
如果是join on ,会将左表先加入内存,然后通过on条件,筛选右表的符合条件项,建立一张临时表,避免了如果使用where,会先产生笛卡尔积表,再进行筛选的危险。
map join优化:
如果是map join ,那么就要大表join小表(25MB)。因为MR底层会调用本地任务将join后面的表缓存到内存中,所以将小表放在后面可以减轻缓存和计算压力。如果是join,优化器会自动交换位置;如果是left join不会交换位置,需要注意大表join小表。
①设置自动选择Mapjoin
set hive.auto.convert.join = true; 默认为true
②大表小表的阈值设置(默认25M一下认为是小表)
set hive.mapjoin.smalltable.filesize=25000000;
reduce join 优化:
如果关闭了map join功能(默认是打开的)
set hive.auto.convert.join = false;
或者没有小表(25MB以下),那么会进行reduce join。一般来说都是小表join大表,小表join大表,如果是内连接,hive会自动优化,将小表放在左边join大表,效率高。
但是如果是外连接,就需要将小表放在join左边,将大表放在join右边。
数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率。
set hive.auto.convert.join = true; 默认为true
set hive.mapjoin.smalltable.filesize=25000000;大表小表的阈值设置(默认25M一下认为是小表)
MapJoin把小表全部加载到内存在map端进行join,将join后的结果直接输出,避免reducer处理。
-
大表join大表
Join前先过滤表中的脏数据,如果某一个key对应的数据过多,可以进行加盐使其随机分不到不同reduce中。
例1:滤空
hive (default)> insert overwrite table jointable select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id = o.id;
例2:随机分布空null值
insert overwrite table jointable
select n.* from nullidtable n full join ori o on
case when n.id is null then concat(‘hive’, rand()) else n.id end = o.id; -
预聚合功能
group by会先通过key值分区,然后再通过key值再进行分组。如果某个分区有过多的数据,会进入一个reduce,则会造成某一个reduce数据量过大,即数据倾斜。可以开启预聚合功能:会先进行一个MR任务,该阶段随机进行分组,然后数据均匀的进入reduce处理,该步先将相同的key进行聚合,然后将得到的结果作为输入给到下一个MR任务,该任务将根据key进行分区,进入reduce输出最终的group by结果。
由于先进行了预分区,所以两次MR任务都不会出现严重的数据倾斜。
set hive.map.aggr = true 开启Map端聚合参数设置,默认为true
set hive.groupby.mapaggr.checkinterval = 100000 在Map端进行聚合操作的条目数目
set hive.groupby.skewindata = true 有数据倾斜的时候进行负载均衡,默认为false -
避免使用distinct
distinct去重和group by 去重是一回事。但是count(distinct ..)是全聚合操作,最终会进入一个reduce中,造成效率低下。可以先用group by 过滤,然后在过滤表的基础上再进行count.
select count(distinct id) from bigtable; 只进行一个MR任务,但是进入一个reduce中处理。
替换为
select count(id) from (select id from bigtable group by id) a; 两个MR任务处理,每个MR任务中多个reduce进行处理。虽然会多用一个Job来完成,但在数据量大的情况下,这个绝对是值得的
-
避免笛卡尔积
where是单个表用的,如果多个表用where,会先产生笛卡尔积表再进行筛选,效率低且消耗内存。多表用join on 连接,会先进行on的筛选,不会产生笛卡尔积表。
尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积。因为没有on条件,所有的数据都会进入一个reduce中,reduce压力过大,所以禁止笛卡尔积表生成。有on条件,相同的字段进入一个reduce,多reduce并行处理。
8.行列过滤:
列过滤:少用select * ,要什么就选什么
行过滤:在进行join之前将表进行过滤
select o.id from bigtable b join ori o on o.id = b.id where o.id
替换为
select b.id from bigtable b join (select id from ori where id -
动态分区调整
关系型数据库中,对分区表Insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用Hive的动态分区,需要进行相应的配置。
1.开启动态分区参数设置
(1)开启动态分区功能(默认true,开启)
hive.exec.dynamic.partition=true
(2)设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。)
hive.exec.dynamic.partition.mode=nonstrict
(3)在所有执行MR的节点上,最大一共可以创建多少个动态分区。默认1000
hive.exec.max.dynamic.partitions=1000
(4)在每个执行MR的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。
hive.exec.max.dynamic.partitions.pernode=100
(5)整个MR Job中,最大可以创建多少个HDFS文件。默认100000
hive.exec.max.created.files=100000
(6)当有空分区生成时,是否抛出异常。一般不需要设置。默认false
hive.error.on.empty.partition=false
2.案例实操
需求:将dept表中的数据按照地区(loc字段),插入到目标表dept_partition的相应分区中。
(1)创建目标分区表
hive (default)> create table dept_partition(id int, name string) partitioned
by (location int) row format delimited fields terminated by ‘t’;
(2)设置动态分区
set hive.exec.dynamic.partition.mode = nonstrict;
hive (default)> insert into table dept_partition partition(location) select deptno, dname, loc from dept;
(3)查看目标分区表的分区情况
hive (default)> show partitions dept_partition;
思考:目标分区表是如何匹配到分区字段的? 三个字段数据按顺序匹配。 -
合理设置Map及Reduce数
概述:
map数量由切片数量决定,而reduce数量是人为设置的。如果将reduce数量设为-1(set mapreduce.job.reduces=-1),那么hive会将需要处理的数据大小除以256mb,预估需要的reduce数量。
在设置map数量时(切片数量),需要注意①map不能过多,也不能过少。过多会导致启动任务的时间大于执行时间,过少导致执行效率过低。②大多情况下,127mb左右的数据大小切片较合适,但是也要根据数据的特点进行切片:如果数据块中的数据计算复杂或每行的字段很少但是行数过多,那么应该减小切片的大小。
增加map的方法为:
根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。
通过下述公式调整切片maxsize和minsize大小:
set mapreduce.input.fileinputformat.split.maxsize=100; 将切片大小设为100mb -
小文件进行合并:
在map执行前合并小文件:
CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
在Map-Reduce的任务结束时合并小文件的设置:
在map-only任务结束时合并小文件,默认true
SET hive.merge.mapfiles = true;
在map-reduce任务结束时合并小文件,默认false
SET hive.merge.mapredfiles = true;
合并文件的大小,默认256M
SET hive.merge.size.per.task = 268435456;
当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
SET hive.merge.smallfiles.avgsize = 16777216; -
合理设置Reduce数
reduce数量如果设为-1,那么hive会根据需要处理的数据大小除以256mb来预估需要的reduce数量。
①调整reduce个数方法一
(1)每个Reduce处理的数据量默认是256MB
hive.exec.reducers.bytes.per.reducer=256000000
(2)每个任务最大的reduce数,默认为1009
hive.exec.reducers.max=1009
(3)计算reducer数的公式
N=min(参数2,总输入数据量/参数1)
2.调整reduce个数方法二
在hadoop的mapred-default.xml文件中修改
设置每个job的Reduce个数
set mapreduce.job.reduces = 15;
3.reduce个数并不是越多越好
1)过多的启动和初始化reduce也会消耗时间和资源;
2)另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置reduce个数的时候也需要考虑这两个原则:
1.处理大数据量利用合适的reduce数;2.使单个reduce任务处理数据量大小要合适;
13.并行执行
多个map和reduce阶段可以并行执行。
set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。
14.严格模式
在hive-site.xml中,通过设置属性hive.mapred.mode值为默认是非严格模式nonstrict 。开启严格模式需要修改hive.mapred.mode值为strict,开启严格模式可以禁止3种类型的查询。
set hive.mapred.mode = strict
①对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。即不允许查找所有的分区。
②对于使用了order by语句的查询,要求必须使用limit语句。这样每个map只需写出limit个数据,在一个reduce中排序,避免了对全部数据在一个reduce中进行排序。
③限制笛卡尔积的查询。多表查询必须用join on语句。关系型数据库在执行JOIN查询的时候不使用ON语句而是使用where语句,关系数据库的执行优化器就可以高效地将WHERE语句转化成ON语句。但是hive中不会这样转化。
15.JVM重用
hive的底层是MR实现的,所以jvm重用同样适用于hive。正常情况下,每个任务都会开启一个jvm在container中执行任务,任务执行完后关闭。对应小任务过多的情况,开启jvm的时间占比过大。开启jvm重用,jvm使用完不会关闭,而是给下一个任务使用,这样就没有开启jvm的时间浪费。缺点是会造成资源的闲置浪费。
16.推测执行(具体见hadoop调优)
如果并行任务中有任务因为bug、数据倾斜、阻塞等原因造成进度过慢,推测执行会开启另一个线程来执行相同的任务,并最终选择先执行完的任务的数据。
Hadoop的mapred-site.xml文件中进行配置,默认是true
mapreduce.map.speculative -> true
hive本身也提供了配置项来控制reduce-side的推测执行:默认是true
hive.mapred.reduce.tasks.speculative.execution -> true -
分区分桶
-
每对JOIN连接对象启动一个MapReduce任务,当对3个或者更多表进行join连接时,如果每个on子句都使用相同的连接键的话,那么只会产生一个MapReduce job。
-
Order By:全局排序,只有一个Reducer
配置参数 参数说明
mapreduce.map.memory.mb 一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。
mapreduce.reduce.memory.mb 一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。
mapreduce.map.cpu.vcores 每个MapTask可使用的最多cpu core数目,默认值: 1
mapreduce.reduce.cpu.vcores 每个ReduceTask可使用的最多cpu core数目,默认值: 1
mapreduce.reduce.shuffle.parallelcopies 每个Reduce去Map中取数据的并行数。默认值是5
mapreduce.reduce.shuffle.merge.percent Buffer中的数据达到多少比例开始写入磁盘。默认值0.66
mapreduce.reduce.shuffle.input.buffer.percent Buffer大小占Reduce可用内存的比例。默认值0.7
mapreduce.reduce.input.buffer.percent 指定多少比例的内存用来存放Buffer中的数据,默认值是0.0
(2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)
表6-2
配置参数 参数说明
yarn.scheduler.minimum-allocation-mb 给应用程序Container分配的最小内存,默认值:1024
yarn.scheduler.maximum-allocation-mb 给应用程序Container分配的最大内存,默认值:8192
yarn.scheduler.minimum-allocation-vcores 每个Container申请的最小CPU核数,默认值:1
yarn.scheduler.maximum-allocation-vcores 每个Container申请的最大CPU核数,默认值:32
yarn.nodemanager.resource.memory-mb 给Containers分配的最大物理内存,默认值:8192
(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)
表6-3
配置参数 参数说明
mapreduce.task.io.sort.mb Shuffle的环形缓冲区大小,默认100m
mapreduce.map.sort.spill.percent 环形缓冲区溢出的阈值,默认80%
2.容错相关参数(MapReduce性能优化)
表6-4
配置参数 参数说明
mapreduce.map.maxattempts 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.reduce.maxattempts 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.task.timeout Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。
hive配置参数
1.配置配置文件
2.启动Hive时,可以在命令行添加-hiveconf param=value来设定参数
hive -hiveconf mapred.reduce.tasks=10;
3.hive (default)> set mapred.reduce.tasks=100;
不进入hive交互窗口执行hql语句
hive -e “select id from student;”
执行脚本中sql语句
hive -f /opt/module/datas/hivef.sql
hive -f /opt/module/datas/hivef.sql > /opt/module/datas/hive_result.txt
在hive交互窗口中查看hdfs文件系统
dfs -ls /;
在hive交互窗口中查看本地文件系统
! ls /opt/module/datas;
4.查看在hive中输入的所有历史命令
cat ~/.hivehistory
hive2.3配置
hive-env.sh
# Set HADOOP_HOME to point to a specific hadoop install directoryHADOOP_HOME=${HADOOP_HOME}
export HADOOP_HOME=$HADOOP_HOME
# Hive Configuration Directory can be controlled by:
export HIVE_CONF_DIR=$HIVE_HOME/conf
# Folder containing extra libraries required for hive compilation/execution can be controlled by:
# export HIVE_AUX_JARS_PATH=
export TEZ_HOME=/opt/module/tez-0.9.1
export TEZ_JARS=""
for jar in `ls $TEZ_HOME | grep jar`;do
export TEZ_JARS=$TEZ_JARS:$TEZ_HOME/$jar
done
for jar in `ls $TEZ_HOME/lib`;do
export TEZ_JARS=$TEZ_JARS:$TEZ_HOME/lib/$jar
done
export HIVE_AUX_JARS_PATH=$HADOOP_HOME/share/hadoop/common/hadoop-lzo-0.4.20.jar$TEZ_JARS
hive-site.xml
javax.jdo.option.ConnectionURLjdbc:mysql://bigdata3:3306/metastore?createDatabaseIfNotExist=trueJDBC connect string for a JDBC metastorejavax.jdo.option.ConnectionDriverNamecom.mysql.jdbc.DriverDriver class name for a JDBC metastorejavax.jdo.option.ConnectionUserNamerootusername to use against metastore databasejavax.jdo.option.ConnectionPasswordhxrpassword to use against metastore databasehive.metastore.warehouse.dir/user/hive/warehouselocation of default database for the warehousehive.cli.print.headertruehive.cli.print.current.dbtruehive.metastore.schema.verificationfalsedatanucleus.schema.autoCreateAlltruehive.metastore.uristhrift://bigdata1:9083hive.execution.enginetez
hive-log4j2.properties
#将原日志路径/tmp/hxr/hive.log改到hive-2.3.6文件下
property.hive.log.dir = /opt/module/hive-2.3.6/logs
需要将jdbc包放到hive的库中
cp mysql-connector-java-5.1.27-bin.jar
/opt/module/hive/lib/
hive表支持中文
//修改字段注释字符集
alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
//修改表注释字符集
alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
//修改分区注释字符集
alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
五、参考
https://cwiki.apache.org/confluence/display/Hive/HivePlugins