13亿表的自关联优化

今天在做一个大表的自关联优化。这张表中共计有13亿的数据量,设表为A,具体字段只有三个

CUST_ID_A 客户A的ID
CUST_ID_B 客户B的ID
RELATION 关系

A.CUST_ID_A 其中一个值的最高重复条数有3万条,这样光这一个CUST_ID_A,如果自关联会预计会有3万*3万=9亿条。

原SQL如下:

select *
from A as left
inner join A as right on left.cust_id_b = right.cust_id_a

这个SQL会执行60分钟左右,无法满足我们的要求。

第一次优化:
我将A表复制成两张表,分别在CUST_ID_A和CUST_ID_B进行分桶,进而想通过分桶JOIN提供更高的JOIN速度。

create table bucket_a(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_A) SORTED BY(CUST_ID_A) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

create table bucket_b(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_B) SORTED BY(CUST_ID_B) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

select *
from bucket_b as left
inner join bucket_a as right on left.cust_id_b = right.cust_id_a

可惜效果依然很差,时间和之前一样,没有效果。通过这个,我觉得是因为我把握错关键点了。现在其实是数据倾斜,分桶解决不了数据倾斜,因为同一个ID还是会打入同一个通,依然是串行的。
而我的目标两个方向:
1、优化SQL计算逻辑,不再计算全量,剔除A表中倾斜数据。这个虽然逻辑上想通了可以做剔除出来单独计算,但是短期做不了。
2、目前明确倾斜的数据在执行的时候,是串行的。我们只需要将这个串行的改为并行的就行。

解决方法一:
主要逻辑是在A表增加一个字段part(int),然后将这13亿的数据打散成20份。然后使用union all 聚合。这个过程每个一个union里面的逻辑,都是并行的!整个过程只花了8分钟。

create table parted_a(
CUST_ID_A String,
CUST_ID_B String,
part int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE parted_a
SELECT *,FLOOR(RAND() * 20) --给每一条数据赋予一个20范围内的随机值,这就类似分区了。
FROM A;

--执行手动UNION ALL 20个分片。。。。。。。。。。。。SQL好长好长,但是可以执行
--注意右表还是原来的全量表,这样才能不会造成数据丢失
INSERT INTO TABLE RESULT
SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 1

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 2

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 3
。。。。。。。


整个过程花了8分钟,最终生成了141亿的数据。。。时间优化成功了,但是数据量还是太大了,最终还是要去优化SQL逻辑

解决方法二:
进行动态分区。可惜没有效果,很奇怪。。哎,和我预计玩不一样。整个执行时间和原来一样。

set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions= 1000;
set hive.exec.max.dynamic.partitions.pernode=1000;
create table dynamic_a(
CUST_ID_A String,
CUST_ID_B String
)
partitioned by (part int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE dynamic_a PARTITION(part)
SELECT *,FLOOR(RAND() * 500) --分区要在最后一个字段。
FROM A;

INSERT INTO TABLE RESULT
SELECT *
FROM dynamic_a AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A

原以为能完美的解决,可惜还是最后几个task变得异常慢。看来还是数据倾斜了,得抽空好好研究下HIVE具体怎么执行的了。

hive beeline 中文报错

今天在跑一个脚本的时候,死活报错:

我开始就认为就是SQL文件顶部的中文注释造成的问题,然后我删除了所有的中文注释依然报上面的错误。

原因:
SQL文件格式有问题,同事在编写SQL的时候是在WINDOWS电脑上编写的,所以文件是默认格式是WINDOWS。编码为UTF-8-BOM格式。

解决方法:
1、使用工具notepad++,将文件转换为unix文件

2、再将语言改为utf-8

beeline gc overhead limit exceeded

由于集群启用了kerberos后,hive指令无法直接使用。只能改用beeline命令。

beeline连接kerberos后,需要更改执行连接指令:

beeline -u “jdbc:hive2://10.127.60.2:10000/default;principal=hive/bdp02@*.COM;”

连接后,执行指令的逻辑和原来不一样。以前hive在执行脚本的时候,如果返回大量数据的时候,hive指令依然能够返回,但是使用beeline后就会报错。

java.lang.OutOfMemoryError: GC overhead limit exceeded

 

这个beeline的gc问题可以进行优化,优化的方法可以参考:

https://www.cloudera.com/documentation/enterprise/5-13-x/topics/admin_hive_tuning.html

hive调优除了看执行计划,还要desc 表审阅表情况

今天碰到一个性能问题。hive的在执行一段增量sql的时候,跑到中间有一段join,其并行度只有3。按照hive指导,小表应该放在最前面。但是这个sql已经放在前面了,应该不会有问题。但是这个并行度只有3,我猜测是因为这个表太小了导致无法放大资源并行度。

接着我想到用mapjoin,让小表缓存在内存,从而让大表进入stream状态。而且看来执行计划确实只有majoin,驱动表为小表。可惜效果依然和之前一样。

然后我觉得是不是换成大表在前,换了之后果然并行度大大提升到100并行。而且我还在小表上用了hint的/* +mapjoin(a) */。看到执行计划为大表驱动。只有mapjoin,没有reduce阶段。

这个过程和我以前看到资料完全不对,不都是要求小表在前吗?!难以置信,我一直反复思考是不是以前错了。可惜事实就是如此。

后面在无限的测试中,突然想去desc formatted一下表。结果一看吓一跳,原来那张1.4亿的大表是一张外表,而且查看numfiles时候,值为1。。。这令我无语。为什么没人告诉查半天是个外表,而且使用sqoop抽过来的。

发现问题,立即转表。将外表转为内表parquet,在内部desc一下,发现bumfiles变为100。这下再测试问题解决,原始SQL不用改变,依然小表在前,并行450。

总结:
1、在并发度不高的时候,除了看执行计划外,一定要desc 表,看表的数据文件情况。
2、不要怀疑,执行优化,HIVE中一定是小表在最前。

HIVE JOIN 优化:启用分桶JOIN

今天做了一次优化测试,对同一个脚本的表,针对是否启用分桶JOIN做了A/B测试。发现建立分桶表后,启用分桶JOIN。整个脚本的执行速度快了整整一倍。

1、针对大数据量表中的最常用JOIN字段作为分桶键。不一定是主键,而是要去JOIN的键作为分桶键。而且要进行排序。

2、两表拥有同样的分桶键,分桶数量需要一致(没验证过,只是看到有这个说法)

/* 插入的时候一定要设置改参数,否则你必须手动指定reducer的数量*/
set hive.enforce.bucketing = true;

/* 插入的时候一定要设置改参数,否则你的插入数据将不会排序,即便你设置了sort by.而且你使用该表进行
sort merge的时候,数据将会是错误的。我就因为没有设置该参数,导致我两张大表JOIN本来有两千万结果的,
最后只有900多条结果!!
参考资料:join-optimization-in-apache-hive*/
set hive.enforce.sorting=true;

CREATE TABLE A(
item_id string,
master_id string,
policy_id string
)
CLUSTERED BY(POLICY_ID) SORTED BY (POLICY_ID) INTO 256 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS parquet
TBLPROPERTIES ("parquet.compression"="SNAPPY");

CREATE TABLE B (
POLICY_ID string,
POLICY_CODE string
)
CLUSTERED BY(POLICY_ID) SORTED BY (POLICY_ID) INTO 256 BUCKETS
row format delimited fields terminated by ‘,’
stored as parquet
TBLPROPERTIES (“parquet.compression”=”SNAPPY”);

上面两表都是大表,都按照policy_id分桶并排序,分桶数均为256.

3、Join的时候启用分桶JOIN:

set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;

SELECT *
FROM ODS_SNAPPY_BUCKET.T_CONTRACT_PRODUCT P
INNER JOIN ODS_SNAPPY_BUCKET.T_CONTRACT_MASTER M ON M.POLICY_ID = P.POLICY_ID

4、分桶JOIN和不分桶JOIN对比:
启用分桶JOIN

STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Spark
DagName: root_20180906100101_0347c866-b18a-4712-a246-67df14874ca3:2
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: m
Statistics: Num rows: 240809092 Data size: 44308872928 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: policy_id is not null (type: boolean)
Statistics: Num rows: 120404546 Data size: 22154436464 Basic stats: COMPLETE Column stats: NONE
Sorted Merge Bucket Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 policy_id (type: string)
1 policy_id (type: string)
outputColumnNames: _col2, _col204
Statistics: Num rows: 207420605 Data size: 41691541764 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col2 (type: string), _col204 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 207420605 Data size: 41691541764 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 207420605 Data size: 41691541764 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

Time taken: 0.149 seconds, Fetched: 43 row(s)

不启用分桶JOIN

STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Spark
Edges:
Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 897), Map 3 (PARTITION-LEVEL SORT, 897)
DagName: root_20180906100101_1801dd0e-2e5c-463c-ac58-f054f223a6fc:3
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: p
Statistics: Num rows: 377128372 Data size: 75802802772 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: policy_id is not null (type: boolean)
Statistics: Num rows: 188564186 Data size: 37901401386 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: policy_id (type: string)
sort order: +
Map-reduce partition columns: policy_id (type: string)
Statistics: Num rows: 188564186 Data size: 37901401386 Basic stats: COMPLETE Column stats: NONE
Map 3
Map Operator Tree:
TableScan
alias: m
Statistics: Num rows: 242227352 Data size: 44569832768 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: policy_id is not null (type: boolean)
Statistics: Num rows: 121113676 Data size: 22284916384 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: policy_id (type: string)
sort order: +
Map-reduce partition columns: policy_id (type: string)
Statistics: Num rows: 121113676 Data size: 22284916384 Basic stats: COMPLETE Column stats: NONE
Reducer 2
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 policy_id (type: string)
1 policy_id (type: string)
outputColumnNames: _col2, _col204
Statistics: Num rows: 207420609 Data size: 41691542428 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col2 (type: string), _col204 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 207420609 Data size: 41691542428 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 207420609 Data size: 41691542428 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

Time taken: 0.092 seconds, Fetched: 65 row(s)

可以从执行计划中看到。启用分桶JOIN后,没有reduce阶段。这是一个质的飞跃。

Bucketized tables do not support INSERT INTO

今天碰到了一个错误:

FAILED: SemanticException [Error 10122]: Bucketized tables do not support INSERT INTO: Table: ecif_customer_relation_mapping

根据异常信息,我第一反应是不能使用INSERT INTO语句到一个分桶表中,但是我另外一个脚本又是没问题的。。

结果是因为我的SQL是INSERT INTO TABLE ** SELECT * FROM ….

分通表不能直接从子查询中插入。只能先将子查询固化成表后,再从改表中插入到分桶表中。

hive parquet表启用压缩

1、首先表必须是要确保是parquet表,之后的语句才会有效。因为是对hive中的parquet表进行压缩。

2、
SET parquet.compression=SNAPPY;
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;

该设置其实只对插入数据有效,对查询是默认有效的。

CREATE TABLE t_r_cust_cont0720_snappy(
ecif_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET
TBLPROPERTIES ("parquet.compression"="SNAPPY"); ---表级的压缩

3、检查压缩是否有效
desc formatted t_r_cust_cont0720_snappy;

压缩后的表:

4、经过计算压缩比为:1:6.压缩后的数据为原来的六分之一。

————————————————————————————
虽然从数据文件上看来是进行了压缩,但是不知道为什么里面的compressd依然为NO。


这里面我之前有个错误。没有分清totaldatasize和rawdatasize。简单理解就是rawdatasize是原始文件大小,即压缩前;totaldatasize是实际存储文件大小,即压缩后。

下面对一张表进行压缩前的情况

下面是压缩后的情况

比较明显的就是使用snappy压缩后文件数量降低了,同时totaldatasize降低了一倍,但是rawdatasize一致。同时compresed始终为NO,只能通过parquet.compression为Snappy辨别表是否为压缩表。

接着我使用了压缩表进行脚本测试。性能和压缩前几乎一致,同时还减少了存储空间。

blocksize为128m 不压缩 23mins

blocksize为128m 只压缩新表 26mins

blocksize为128m 压缩 24mins