13亿表的自关联优化

今天在做一个大表的自关联优化。这张表中共计有13亿的数据量,设表为A,具体字段只有三个

CUST_ID_A 客户A的ID
CUST_ID_B 客户B的ID
RELATION 关系

A.CUST_ID_A 其中一个值的最高重复条数有3万条,这样光这一个CUST_ID_A,如果自关联会预计会有3万*3万=9亿条。

原SQL如下:

select *
from A as left
inner join A as right on left.cust_id_b = right.cust_id_a

这个SQL会执行60分钟左右,无法满足我们的要求。

第一次优化:
我将A表复制成两张表,分别在CUST_ID_A和CUST_ID_B进行分桶,进而想通过分桶JOIN提供更高的JOIN速度。

create table bucket_a(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_A) SORTED BY(CUST_ID_A) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

create table bucket_b(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_B) SORTED BY(CUST_ID_B) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

select *
from bucket_b as left
inner join bucket_a as right on left.cust_id_b = right.cust_id_a

可惜效果依然很差,时间和之前一样,没有效果。通过这个,我觉得是因为我把握错关键点了。现在其实是数据倾斜,分桶解决不了数据倾斜,因为同一个ID还是会打入同一个通,依然是串行的。
而我的目标两个方向:
1、优化SQL计算逻辑,不再计算全量,剔除A表中倾斜数据。这个虽然逻辑上想通了可以做剔除出来单独计算,但是短期做不了。
2、目前明确倾斜的数据在执行的时候,是串行的。我们只需要将这个串行的改为并行的就行。

解决方法一:
主要逻辑是在A表增加一个字段part(int),然后将这13亿的数据打散成20份。然后使用union all 聚合。这个过程每个一个union里面的逻辑,都是并行的!整个过程只花了8分钟。

create table parted_a(
CUST_ID_A String,
CUST_ID_B String,
part int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE parted_a
SELECT *,FLOOR(RAND() * 20) --给每一条数据赋予一个20范围内的随机值,这就类似分区了。
FROM A;

--执行手动UNION ALL 20个分片。。。。。。。。。。。。SQL好长好长,但是可以执行
--注意右表还是原来的全量表,这样才能不会造成数据丢失
INSERT INTO TABLE RESULT
SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 1

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 2

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 3
。。。。。。。


整个过程花了8分钟,最终生成了141亿的数据。。。时间优化成功了,但是数据量还是太大了,最终还是要去优化SQL逻辑

解决方法二:
进行动态分区。可惜没有效果,很奇怪。。哎,和我预计玩不一样。整个执行时间和原来一样。

set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions= 1000;
set hive.exec.max.dynamic.partitions.pernode=1000;
create table dynamic_a(
CUST_ID_A String,
CUST_ID_B String
)
partitioned by (part int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE dynamic_a PARTITION(part)
SELECT *,FLOOR(RAND() * 500) --分区要在最后一个字段。
FROM A;

INSERT INTO TABLE RESULT
SELECT *
FROM dynamic_a AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A

原以为能完美的解决,可惜还是最后几个task变得异常慢。看来还是数据倾斜了,得抽空好好研究下HIVE具体怎么执行的了。

发表评论

电子邮件地址不会被公开。 必填项已用*标注