HIVE 几个 by的区别

参考资料:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy

Order By:
1、在hive.mapred.mode=strict模式下,order by 后面必须加limit。因为不加的话,order by会对所有的数据进行排序。而要进行全局排序,只能使用一个Reducer进行排序,如果结果集数据量异常大的话,速度异常缓慢。

Sort by VS Order by

1、Sort by是将在一个Reducer中的数据进行局部排序,如果有多Reducer,那么结果集只能是局部排序的。而Order By是所有Reducer汇总到一个Reducer进行全局排序。

Cluster By VS Distribute By

1、Cluster By 只是 Distribute By 和Sort By 的简写。即:Cluster By 字段A ==Distribute By 字段A Sort By 字段A。那么既然这样,为什么还要Distribute By和Sort By呢?那就是当Distribute By和Sort By字段不一致的时候就需要了。例如Distribute By 字段A Sort By 字段B 字段C。
2、Cluster By 因为包含Sort by,所以在每一个Reducer中的数据是有序的。
3、Distribute By 是无序的。
4、当Distribute By和Sort By的字段不一致时,就无法使用Cluster By。

hive dynamic partition 性能 优化

今天正在做分区合并的时候,需要初始化一部分测试数据,数据量共计有34亿,每10个分片的数据量合计在4亿左右,所以想偷懒直接使用动态分区来实现:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
) t;

在执行这段代码的时候,完全跑不动,直接就报错了。报错为JAVA HEAP SPACE OOM。
我也不明白什么原因,认为是数据过多,然后我改进SQL进行分批跑,但是结果还是一样,依然是JAVA HEAP SPACE OOM

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t ;

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn >= 10 and RN_T.rn < 20
) t ;

看了下执行计划,额,确实也看不懂。只能面向搜索编程了。
经过一番搜索,都说要在动态分区后面加一个 distribute by。大致意思是,在前面的SQL中,执行动态分区的时候,每一个mapper内部都要进行排序,而用了distribute by后,就不会排序了。

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime;

追加了DISTRIBUTE BY后,不报错了。但是执行速度确是异常缓慢,处理10行就要将进半小时,780/10= 78条,这个时间难以接受。
在不断的搜索中,接近真相。。。参考文章:https://qubole.zendesk.com/hc/en-us/articles/214885263-HIVE-Dynamic-Partitioning-tips
文章里面提到:
1、使用DISTRIBUTE BY 分区字段,存在一个问题:使用DISTRIBUTE BY 分区字段后,每一个分区只有一个mapper,这样如果有的分区数据量过大,就会数据倾斜,执行缓慢。而我这个例子,明明集群资源足够,却没有充分利用。集群可以启动200个mapper,但是由于SQL只有10个分区,最终只有10个mapper。虽然每个分区最大2000万数据量,但是速度还是很慢,大致半小时左右。
2、DISTRIBUTE BY 分区字段,将每个分区的数据写入一个文件中。前面10个分区,就会有10个文件。

文中提出一个修改方案:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime,LIST_ID;

DISTRIBUTE BY 两个字段,第一个是分区字段,第二个是高基数的维度,这里我选择的是该表唯一主键。
整个执行速度很快,大致两分钟左右。通过DISTRIBUTE BY 两个字段,而且第二个字段是唯一主键,所有数据均匀的分在了每一个mapper中。
但是这里需要注意一个问题:
因为所有的数据均匀打散到每一个Mapper中,所以每一个mapper中就有10个分区的数据,这样每一个mapper就要将写入10个文件中。假设有400个mapper,设为M,10个分区,设为P,那么就会有 M*P=4000个数据文件。这样就会有太多文件,而且文件比较小。要想控制这个结果,可以人为的设置mapper数量。

set mapred.reduce.tasks=200;

PS:
1、如果两张表的分区键是一致的,那么动态分区插入是很快。因为是直接进行了文件拷贝。
2、在CDH的hive执行SQ来完成后,觉得在执行自动合并小文件,原来应该生成了200*700多个文件,紧接着又执行了些任务,最终文件数为700多个。具体还需要抽空去研究下。

hive CDH5.13 增量更新数据的方法

我们使用的平台是CDH5.13

HIVE使用ORC格式,并启用事务,是可以做到更新的,但是CDH不建议使用。

在CDH中要么使用impala+kudu,要么你就得想一个办法进行增量更新。而我们采用的就是后者。

我们首先使用的方式是:

1、对每一张表创建一个按天分区的历史拉链表。该表每天对应的增量数据都直接LOAD到该表里面。但是这样就会有一个问题,update的数据就会重复,也没法供后续脚本使用

2、所以第二步,就需要形成一个当日全量最新表。这个步骤比较简单,只是对his表进行distribute by 一个主键 并按 日期排序,只选出同主键的排序第一条数据即可。

过程很简单,但是当我们碰到五亿量级以上的单表数据的时候,一张表一次生成全量最新表的过程就会超过10分钟以上,而且使用的全集群资源,没有并行的可能性。而我们一共有十几张这样的大表,整个执行时间长的令人羞愧。让客户看着觉得,这大数据怎么这么差?!

优化,优化,绞尽脑汁的优化。可惜我能力有限,只能检索。还检索一篇文章,是informatica的一篇文章http://blog.tec13.com/wp-content/uploads/2018/12/1097-StrategiesForIncrementalUpdatesOnHIve-H2L-1.pdf。原来还有两种方式:

1、分区合并:之前使用的distribute by 是对整个表重新生成,实际上是没有必要的。因为很多业务数据会按照一定的规则聚集。我分析了一张的增量数据中存在一个insert_time,其中每天的增量,仅会大致分布在十五内,不管这是哪十五天。这样的话,如果将最新全量表按照insert_time的每天进行分区,那我们每天只需要更新这个十五个分区的数据,而不是整表数据。这个性能飞跃是毋庸置疑的。

其次,我们可以做成一个策略,小于五亿的表还是用distribute by,而大于五亿的部分表再使用前面说的按某一个日期分区。

2、当日全量表建立在hbase:很明显了,hbase是肯定是立即更新的,而且很快。但是有个顾虑,如果将hbase的表用于后续sql脚本进行大量join的话,其性能显然是不足的。除非用的次数不多那还能用这种策略。我也尝试想更新hbase表,再反抽形成一张hive表。可惜性能也是不好,因为这就和distribute by一样,要搬动全表。

后续再补充这篇文章,并给些样例代码。

13亿表的自关联优化

今天在做一个大表的自关联优化。这张表中共计有13亿的数据量,设表为A,具体字段只有三个

CUST_ID_A 客户A的ID
CUST_ID_B 客户B的ID
RELATION 关系

A.CUST_ID_A 其中一个值的最高重复条数有3万条,这样光这一个CUST_ID_A,如果自关联会预计会有3万*3万=9亿条。

原SQL如下:

select *
from A as left
inner join A as right on left.cust_id_b = right.cust_id_a

这个SQL会执行60分钟左右,无法满足我们的要求。

第一次优化:
我将A表复制成两张表,分别在CUST_ID_A和CUST_ID_B进行分桶,进而想通过分桶JOIN提供更高的JOIN速度。

create table bucket_a(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_A) SORTED BY(CUST_ID_A) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

create table bucket_b(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_B) SORTED BY(CUST_ID_B) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

select *
from bucket_b as left
inner join bucket_a as right on left.cust_id_b = right.cust_id_a

可惜效果依然很差,时间和之前一样,没有效果。通过这个,我觉得是因为我把握错关键点了。现在其实是数据倾斜,分桶解决不了数据倾斜,因为同一个ID还是会打入同一个通,依然是串行的。
而我的目标两个方向:
1、优化SQL计算逻辑,不再计算全量,剔除A表中倾斜数据。这个虽然逻辑上想通了可以做剔除出来单独计算,但是短期做不了。
2、目前明确倾斜的数据在执行的时候,是串行的。我们只需要将这个串行的改为并行的就行。

解决方法一:
主要逻辑是在A表增加一个字段part(int),然后将这13亿的数据打散成20份。然后使用union all 聚合。这个过程每个一个union里面的逻辑,都是并行的!整个过程只花了8分钟。

create table parted_a(
CUST_ID_A String,
CUST_ID_B String,
part int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE parted_a
SELECT *,FLOOR(RAND() * 20) --给每一条数据赋予一个20范围内的随机值,这就类似分区了。
FROM A;

--执行手动UNION ALL 20个分片。。。。。。。。。。。。SQL好长好长,但是可以执行
--注意右表还是原来的全量表,这样才能不会造成数据丢失
INSERT INTO TABLE RESULT
SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 1

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 2

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 3
。。。。。。。


整个过程花了8分钟,最终生成了141亿的数据。。。时间优化成功了,但是数据量还是太大了,最终还是要去优化SQL逻辑

解决方法二:
进行动态分区。可惜没有效果,很奇怪。。哎,和我预计玩不一样。整个执行时间和原来一样。

set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions= 1000;
set hive.exec.max.dynamic.partitions.pernode=1000;
create table dynamic_a(
CUST_ID_A String,
CUST_ID_B String
)
partitioned by (part int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE dynamic_a PARTITION(part)
SELECT *,FLOOR(RAND() * 500) --分区要在最后一个字段。
FROM A;

INSERT INTO TABLE RESULT
SELECT *
FROM dynamic_a AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A

原以为能完美的解决,可惜还是最后几个task变得异常慢。看来还是数据倾斜了,得抽空好好研究下HIVE具体怎么执行的了。

hive beeline 中文报错

今天在跑一个脚本的时候,死活报错:

我开始就认为就是SQL文件顶部的中文注释造成的问题,然后我删除了所有的中文注释依然报上面的错误。

原因:
SQL文件格式有问题,同事在编写SQL的时候是在WINDOWS电脑上编写的,所以文件是默认格式是WINDOWS。编码为UTF-8-BOM格式。

解决方法:
1、使用工具notepad++,将文件转换为unix文件

2、再将语言改为utf-8

beeline gc overhead limit exceeded

由于集群启用了kerberos后,hive指令无法直接使用。只能改用beeline命令。

beeline连接kerberos后,需要更改执行连接指令:

beeline -u “jdbc:hive2://10.127.60.2:10000/default;principal=hive/bdp02@*.COM;”

连接后,执行指令的逻辑和原来不一样。以前hive在执行脚本的时候,如果返回大量数据的时候,hive指令依然能够返回,但是使用beeline后就会报错。

java.lang.OutOfMemoryError: GC overhead limit exceeded

 

这个beeline的gc问题可以进行优化,优化的方法可以参考:

https://www.cloudera.com/documentation/enterprise/5-13-x/topics/admin_hive_tuning.html

hive调优除了看执行计划,还要desc 表审阅表情况

今天碰到一个性能问题。hive的在执行一段增量sql的时候,跑到中间有一段join,其并行度只有3。按照hive指导,小表应该放在最前面。但是这个sql已经放在前面了,应该不会有问题。但是这个并行度只有3,我猜测是因为这个表太小了导致无法放大资源并行度。

接着我想到用mapjoin,让小表缓存在内存,从而让大表进入stream状态。而且看来执行计划确实只有majoin,驱动表为小表。可惜效果依然和之前一样。

然后我觉得是不是换成大表在前,换了之后果然并行度大大提升到100并行。而且我还在小表上用了hint的/* +mapjoin(a) */。看到执行计划为大表驱动。只有mapjoin,没有reduce阶段。

这个过程和我以前看到资料完全不对,不都是要求小表在前吗?!难以置信,我一直反复思考是不是以前错了。可惜事实就是如此。

后面在无限的测试中,突然想去desc formatted一下表。结果一看吓一跳,原来那张1.4亿的大表是一张外表,而且查看numfiles时候,值为1。。。这令我无语。为什么没人告诉查半天是个外表,而且使用sqoop抽过来的。

发现问题,立即转表。将外表转为内表parquet,在内部desc一下,发现bumfiles变为100。这下再测试问题解决,原始SQL不用改变,依然小表在前,并行450。

总结:
1、在并发度不高的时候,除了看执行计划外,一定要desc 表,看表的数据文件情况。
2、不要怀疑,执行优化,HIVE中一定是小表在最前。

HIVE JOIN 优化:启用分桶JOIN

今天做了一次优化测试,对同一个脚本的表,针对是否启用分桶JOIN做了A/B测试。发现建立分桶表后,启用分桶JOIN。整个脚本的执行速度快了整整一倍。

1、针对大数据量表中的最常用JOIN字段作为分桶键。不一定是主键,而是要去JOIN的键作为分桶键。而且要进行排序。

2、两表拥有同样的分桶键,分桶数量需要一致(没验证过,只是看到有这个说法)

/* 插入的时候一定要设置改参数,否则你必须手动指定reducer的数量*/
set hive.enforce.bucketing = true;

/* 插入的时候一定要设置改参数,否则你的插入数据将不会排序,即便你设置了sort by.而且你使用该表进行
sort merge的时候,数据将会是错误的。我就因为没有设置该参数,导致我两张大表JOIN本来有两千万结果的,
最后只有900多条结果!!
参考资料:join-optimization-in-apache-hive*/
set hive.enforce.sorting=true;

CREATE TABLE A(
item_id string,
master_id string,
policy_id string
)
CLUSTERED BY(POLICY_ID) SORTED BY (POLICY_ID) INTO 256 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS parquet
TBLPROPERTIES ("parquet.compression"="SNAPPY");

CREATE TABLE B (
POLICY_ID string,
POLICY_CODE string
)
CLUSTERED BY(POLICY_ID) SORTED BY (POLICY_ID) INTO 256 BUCKETS
row format delimited fields terminated by ‘,’
stored as parquet
TBLPROPERTIES (“parquet.compression”=”SNAPPY”);

上面两表都是大表,都按照policy_id分桶并排序,分桶数均为256.

3、Join的时候启用分桶JOIN:

set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;

SELECT *
FROM ODS_SNAPPY_BUCKET.T_CONTRACT_PRODUCT P
INNER JOIN ODS_SNAPPY_BUCKET.T_CONTRACT_MASTER M ON M.POLICY_ID = P.POLICY_ID

4、分桶JOIN和不分桶JOIN对比:
启用分桶JOIN

STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Spark
DagName: root_20180906100101_0347c866-b18a-4712-a246-67df14874ca3:2
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: m
Statistics: Num rows: 240809092 Data size: 44308872928 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: policy_id is not null (type: boolean)
Statistics: Num rows: 120404546 Data size: 22154436464 Basic stats: COMPLETE Column stats: NONE
Sorted Merge Bucket Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 policy_id (type: string)
1 policy_id (type: string)
outputColumnNames: _col2, _col204
Statistics: Num rows: 207420605 Data size: 41691541764 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col2 (type: string), _col204 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 207420605 Data size: 41691541764 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 207420605 Data size: 41691541764 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

Time taken: 0.149 seconds, Fetched: 43 row(s)

不启用分桶JOIN

STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Spark
Edges:
Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 897), Map 3 (PARTITION-LEVEL SORT, 897)
DagName: root_20180906100101_1801dd0e-2e5c-463c-ac58-f054f223a6fc:3
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: p
Statistics: Num rows: 377128372 Data size: 75802802772 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: policy_id is not null (type: boolean)
Statistics: Num rows: 188564186 Data size: 37901401386 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: policy_id (type: string)
sort order: +
Map-reduce partition columns: policy_id (type: string)
Statistics: Num rows: 188564186 Data size: 37901401386 Basic stats: COMPLETE Column stats: NONE
Map 3
Map Operator Tree:
TableScan
alias: m
Statistics: Num rows: 242227352 Data size: 44569832768 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: policy_id is not null (type: boolean)
Statistics: Num rows: 121113676 Data size: 22284916384 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: policy_id (type: string)
sort order: +
Map-reduce partition columns: policy_id (type: string)
Statistics: Num rows: 121113676 Data size: 22284916384 Basic stats: COMPLETE Column stats: NONE
Reducer 2
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 policy_id (type: string)
1 policy_id (type: string)
outputColumnNames: _col2, _col204
Statistics: Num rows: 207420609 Data size: 41691542428 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col2 (type: string), _col204 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 207420609 Data size: 41691542428 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 207420609 Data size: 41691542428 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

Time taken: 0.092 seconds, Fetched: 65 row(s)

可以从执行计划中看到。启用分桶JOIN后,没有reduce阶段。这是一个质的飞跃。

Bucketized tables do not support INSERT INTO

今天碰到了一个错误:

FAILED: SemanticException [Error 10122]: Bucketized tables do not support INSERT INTO: Table: ecif_customer_relation_mapping

根据异常信息,我第一反应是不能使用INSERT INTO语句到一个分桶表中,但是我另外一个脚本又是没问题的。。

结果是因为我的SQL是INSERT INTO TABLE ** SELECT * FROM ….

分通表不能直接从子查询中插入。只能先将子查询固化成表后,再从改表中插入到分桶表中。