HIVE JOIN 优化:启用分桶JOIN

今天做了一次优化测试,对同一个脚本的表,针对是否启用分桶JOIN做了A/B测试。发现建立分桶表后,启用分桶JOIN。整个脚本的执行速度快了整整一倍。

1、针对大数据量表中的最常用JOIN字段作为分桶键。不一定是主键,而是要去JOIN的键作为分桶键。而且要进行排序。

2、两表拥有同样的分桶键,分桶数量需要一致(没验证过,只是看到有这个说法)

/* 插入的时候一定要设置改参数,否则你必须手动指定reducer的数量*/
set hive.enforce.bucketing = true;

/* 插入的时候一定要设置改参数,否则你的插入数据将不会排序,即便你设置了sort by.而且你使用该表进行
sort merge的时候,数据将会是错误的。我就因为没有设置该参数,导致我两张大表JOIN本来有两千万结果的,
最后只有900多条结果!!
参考资料:join-optimization-in-apache-hive*/
set hive.enforce.sorting=true;


CREATE TABLE A(
item_id string,
master_id string,
policy_id string
)
CLUSTERED BY(POLICY_ID) SORTED BY (POLICY_ID) INTO 256 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS parquet
TBLPROPERTIES ("parquet.compression"="SNAPPY");

CREATE TABLE B (
POLICY_ID string,
POLICY_CODE string
)
CLUSTERED BY(POLICY_ID) SORTED BY (POLICY_ID) INTO 256 BUCKETS
row format delimited fields terminated by ‘,’
stored as parquet
TBLPROPERTIES (“parquet.compression”=”SNAPPY”);

上面两表都是大表,都按照policy_id分桶并排序,分桶数均为256.

3、Join的时候启用分桶JOIN:

set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;

SELECT *
FROM ODS_SNAPPY_BUCKET.T_CONTRACT_PRODUCT P
INNER JOIN ODS_SNAPPY_BUCKET.T_CONTRACT_MASTER M ON M.POLICY_ID = P.POLICY_ID

4、分桶JOIN和不分桶JOIN对比:
启用分桶JOIN

STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Spark
DagName: root_20180906100101_0347c866-b18a-4712-a246-67df14874ca3:2
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: m
Statistics: Num rows: 240809092 Data size: 44308872928 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: policy_id is not null (type: boolean)
Statistics: Num rows: 120404546 Data size: 22154436464 Basic stats: COMPLETE Column stats: NONE
Sorted Merge Bucket Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 policy_id (type: string)
1 policy_id (type: string)
outputColumnNames: _col2, _col204
Statistics: Num rows: 207420605 Data size: 41691541764 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col2 (type: string), _col204 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 207420605 Data size: 41691541764 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 207420605 Data size: 41691541764 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

Time taken: 0.149 seconds, Fetched: 43 row(s)

不启用分桶JOIN

STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Spark
Edges:
Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 897), Map 3 (PARTITION-LEVEL SORT, 897)
DagName: root_20180906100101_1801dd0e-2e5c-463c-ac58-f054f223a6fc:3
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: p
Statistics: Num rows: 377128372 Data size: 75802802772 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: policy_id is not null (type: boolean)
Statistics: Num rows: 188564186 Data size: 37901401386 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: policy_id (type: string)
sort order: +
Map-reduce partition columns: policy_id (type: string)
Statistics: Num rows: 188564186 Data size: 37901401386 Basic stats: COMPLETE Column stats: NONE
Map 3
Map Operator Tree:
TableScan
alias: m
Statistics: Num rows: 242227352 Data size: 44569832768 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: policy_id is not null (type: boolean)
Statistics: Num rows: 121113676 Data size: 22284916384 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: policy_id (type: string)
sort order: +
Map-reduce partition columns: policy_id (type: string)
Statistics: Num rows: 121113676 Data size: 22284916384 Basic stats: COMPLETE Column stats: NONE
Reducer 2
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 policy_id (type: string)
1 policy_id (type: string)
outputColumnNames: _col2, _col204
Statistics: Num rows: 207420609 Data size: 41691542428 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col2 (type: string), _col204 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 207420609 Data size: 41691542428 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 207420609 Data size: 41691542428 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

Time taken: 0.092 seconds, Fetched: 65 row(s)

可以从执行计划中看到。启用分桶JOIN后,没有reduce阶段。这是一个质的飞跃。

5、另一个启用分桶join后的执行计划对比

启用参数:

set hive.enforce.bucketing = true;
set hive.enforce.sorting=true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;

Bucketized tables do not support INSERT INTO

今天碰到了一个错误:

FAILED: SemanticException [Error 10122]: Bucketized tables do not support INSERT INTO: Table: ecif_customer_relation_mapping

根据异常信息,我第一反应是不能使用INSERT INTO语句到一个分桶表中,但是我另外一个脚本又是没问题的。。

结果是因为我的SQL是INSERT INTO TABLE ** SELECT * FROM ….

分通表不能直接从子查询中插入。只能先将子查询固化成表后,再从改表中插入到分桶表中。

网卡情况检测

在Centos7中检测网卡情况,比如是万兆网还是千兆网,以及服务器之间的传输速度可以采用如下的方式进行检测。

1、检测网卡的传输速度

先使用ifconfig查看网卡名字。

然后使用/sbin/ethtool 网卡名,查看传输速度。

speed可以看出网卡的传输速度,这里可以看出是万兆网卡;duplex需要为full,代表启用全双工模式。

2、检测服务器间传输速度

选择两台服务器,一台作为server,一台作为client。

启动服务器:

启动客户端,即开始测试带宽:

使用的指令iperf -c ip地址 -f m(显示结果为MB) -d(双向测试)

结果中4代表从服务器到客户端的吞吐量,5代表客户端到服务器端吞吐量。