hive dynamic partition 性能 优化

今天正在做分区合并的时候,需要初始化一部分测试数据,数据量共计有34亿,每10个分片的数据量合计在4亿左右,所以想偷懒直接使用动态分区来实现:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
) t;

在执行这段代码的时候,完全跑不动,直接就报错了。报错为JAVA HEAP SPACE OOM。
我也不明白什么原因,认为是数据过多,然后我改进SQL进行分批跑,但是结果还是一样,依然是JAVA HEAP SPACE OOM

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t ;

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn >= 10 and RN_T.rn < 20
) t ;

看了下执行计划,额,确实也看不懂。只能面向搜索编程了。
经过一番搜索,都说要在动态分区后面加一个 distribute by。大致意思是,在前面的SQL中,执行动态分区的时候,每一个mapper内部都要进行排序,而用了distribute by后,就不会排序了。

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime;

追加了DISTRIBUTE BY后,不报错了。但是执行速度确是异常缓慢,处理10行就要将进半小时,780/10= 78条,这个时间难以接受。
在不断的搜索中,接近真相。。。参考文章:https://qubole.zendesk.com/hc/en-us/articles/214885263-HIVE-Dynamic-Partitioning-tips
文章里面提到:
1、使用DISTRIBUTE BY 分区字段,存在一个问题:使用DISTRIBUTE BY 分区字段后,每一个分区只有一个mapper,这样如果有的分区数据量过大,就会数据倾斜,执行缓慢。而我这个例子,明明集群资源足够,却没有充分利用。集群可以启动200个mapper,但是由于SQL只有10个分区,最终只有10个mapper。虽然每个分区最大2000万数据量,但是速度还是很慢,大致半小时左右。
2、DISTRIBUTE BY 分区字段,将每个分区的数据写入一个文件中。前面10个分区,就会有10个文件。

文中提出一个修改方案:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime,LIST_ID;

DISTRIBUTE BY 两个字段,第一个是分区字段,第二个是高基数的维度,这里我选择的是该表唯一主键。
整个执行速度很快,大致两分钟左右。通过DISTRIBUTE BY 两个字段,而且第二个字段是唯一主键,所有数据均匀的分在了每一个mapper中。
但是这里需要注意一个问题:
因为所有的数据均匀打散到每一个Mapper中,所以每一个mapper中就有10个分区的数据,这样每一个mapper就要将写入10个文件中。假设有400个mapper,设为M,10个分区,设为P,那么就会有 M*P=4000个数据文件。这样就会有太多文件,而且文件比较小。要想控制这个结果,可以人为的设置mapper数量。

set mapred.reduce.tasks=200;

PS:
1、如果两张表的分区键是一致的,那么动态分区插入是很快。因为是直接进行了文件拷贝。
2、在CDH的hive执行SQ来完成后,觉得在执行自动合并小文件,原来应该生成了200*700多个文件,紧接着又执行了些任务,最终文件数为700多个。具体还需要抽空去研究下。

发表评论

电子邮件地址不会被公开。 必填项已用*标注