cdh kerberosed hive jdbc demo

cdh启用kerberos后,通过jdbc连接hive需要额外的连接配置,具体代码如下:

System.setProperty("java.security.krb5.conf", "/Users/zhouxiang/develop/test/krb5.conf");
Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
String principal = System.getProperty("kerberosPrincipal", "hive@TEST.COM"); String keytabLocation = System.getProperty("kerberosKeytab", "/Users/zhouxiang/develop/test/hive.keytab"); UserGroupInformation.loginUserFromKeytab(principal, keytabLocation); String url = "jdbc:hive2://10.127.60.4:10000/default;auth=KERBEROS;principal=hive/bdp04@TEST.COM;"; Connection connection = DriverManager.getConnection(url, "", ""); Assert.assertNotNull(connection); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery("select count(*) from ods.test"); while(resultSet.next()) { System.out.println(resultSet.getInt(1)); }

hive multiple inserts 一次扫描多次插入

hive multiple inserts可以仅执行一次扫描,将这次扫描的数据分别根据不同的条件插入到多张表或者同一张表的不同分区中。但是不能插入同一张表!

Hive extension (multiple inserts):
FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1
[INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]
[INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;

这个语法可以降低扫描次数,如果分开多次插入,就会需要扫描多次。如果写成multiple inserts就只会扫表一次。官网解释如下:

Multiple insert clauses (also known as Multi Table Insert) can be specified in the same query.

Multi Table Inserts minimize the number of data scans required. Hive can insert data into multiple tables by scanning the input data just once (and applying different query operators) to the input data.

以下是执行计划截图:


测试情况:在20亿的数据中,执行扫描插入到六张表,只花了六分钟。

dynamic partition搬表 out of memory

在想启用dynamic partition来偷懒搬表的情况时,如果表的数据过大(例如我在六个节点上搬迁2亿数据)时,就会经常OOM。

样例SQL:

set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions= 1000;
set hive.exec.max.dynamic.partitions.pernode=1000;

CREATE TABLE B (
NAME STRING
)
partitioned BY (NAME2 string,NAME3 string,NAME4 string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
STORED AS PARQUET;

INSERT INTO TABLE B
PARTITION(NAME2,NAME3,NAME4)
select NAME2,NAME3,NAME4
FROM A;

我想将A表的数据进行分区,并导入B表。最简单的方式就启用动态分区。可是执行上面的SQL时,报出OOM错误。一般碰到OOM,我第一反应就是提高heap space。

set spark.executor.memory=25G;

结果还是一样OOM。难道是内存还不够吗?24G–>30G–>48GG。。还是不行。

最后找到一篇文章,启用hive.optimize.sort.dynamic.partition即可;

SET hive.optimize.sort.dynamic.partition=true

针对该参数和对应问题的解释:
该问题出现的原因是太多的文件同时进入写入。当启用hive.optimize.sort.dynamic.partition后面,动态分区列就会进行全局排序,这样REDUCE端,每个动态分区列的值只会有一个文件写入,从而REDUCE阶段降低了内存使用。

hive.optimize.sort.dynamic.partition
  • Default Value: true in Hive 0.13.0 and 0.13.1; false in Hive 0.14.0 and later (HIVE-8151)
  • Added In: Hive 0.13.0 with HIVE-6455

When enabled, dynamic partitioning column will be globally sorted. This way we can keep only one record writer open for each partition value in the reducer thereby reducing the memory pressure on reducers.
个人感觉就是在内部使用distribute by。

参考文章:
1、https://cloud.tencent.com/developer/article/1079007
2、https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
3、https://stackoverflow.com/questions/33147764/hive-setting-hive-optimize-sort-dynamic-partition

HIVE 几个 by的区别

参考资料:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy

Order By:
1、在hive.mapred.mode=strict模式下,order by 后面必须加limit。因为不加的话,order by会对所有的数据进行排序。而要进行全局排序,只能使用一个Reducer进行排序,如果结果集数据量异常大的话,速度异常缓慢。

Sort by VS Order by

1、Sort by是将在一个Reducer中的数据进行局部排序,如果有多Reducer,那么结果集只能是局部排序的。而Order By是所有Reducer汇总到一个Reducer进行全局排序。

Cluster By VS Distribute By

1、Cluster By 只是 Distribute By 和Sort By 的简写。即:Cluster By 字段A ==Distribute By 字段A Sort By 字段A。那么既然这样,为什么还要Distribute By和Sort By呢?那就是当Distribute By和Sort By字段不一致的时候就需要了。例如Distribute By 字段A Sort By 字段B 字段C。
2、Cluster By 因为包含Sort by,所以在每一个Reducer中的数据是有序的。
3、Distribute By 是无序的。
4、当Distribute By和Sort By的字段不一致时,就无法使用Cluster By。

hive dynamic partition 性能 优化

今天正在做分区合并的时候,需要初始化一部分测试数据,数据量共计有34亿,每10个分片的数据量合计在4亿左右,所以想偷懒直接使用动态分区来实现:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
) t;

在执行这段代码的时候,完全跑不动,直接就报错了。报错为JAVA HEAP SPACE OOM。
我也不明白什么原因,认为是数据过多,然后我改进SQL进行分批跑,但是结果还是一样,依然是JAVA HEAP SPACE OOM

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t ;

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn >= 10 and RN_T.rn < 20
) t ;

看了下执行计划,额,确实也看不懂。只能面向搜索编程了。
经过一番搜索,都说要在动态分区后面加一个 distribute by。大致意思是,在前面的SQL中,执行动态分区的时候,每一个mapper内部都要进行排序,而用了distribute by后,就不会排序了。

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime;

追加了DISTRIBUTE BY后,不报错了。但是执行速度确是异常缓慢,处理10行就要将进半小时,780/10= 78条,这个时间难以接受。
在不断的搜索中,接近真相。。。参考文章:https://qubole.zendesk.com/hc/en-us/articles/214885263-HIVE-Dynamic-Partitioning-tips
文章里面提到:
1、使用DISTRIBUTE BY 分区字段,存在一个问题:使用DISTRIBUTE BY 分区字段后,每一个分区只有一个mapper,这样如果有的分区数据量过大,就会数据倾斜,执行缓慢。而我这个例子,明明集群资源足够,却没有充分利用。集群可以启动200个mapper,但是由于SQL只有10个分区,最终只有10个mapper。虽然每个分区最大2000万数据量,但是速度还是很慢,大致半小时左右。
2、DISTRIBUTE BY 分区字段,将每个分区的数据写入一个文件中。前面10个分区,就会有10个文件。

文中提出一个修改方案:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime,LIST_ID;

DISTRIBUTE BY 两个字段,第一个是分区字段,第二个是高基数的维度,这里我选择的是该表唯一主键。
整个执行速度很快,大致两分钟左右。通过DISTRIBUTE BY 两个字段,而且第二个字段是唯一主键,所有数据均匀的分在了每一个mapper中。
但是这里需要注意一个问题:
因为所有的数据均匀打散到每一个Mapper中,所以每一个mapper中就有10个分区的数据,这样每一个mapper就要将写入10个文件中。假设有400个mapper,设为M,10个分区,设为P,那么就会有 M*P=4000个数据文件。这样就会有太多文件,而且文件比较小。要想控制这个结果,可以人为的设置mapper数量。

set mapred.reduce.tasks=200;

PS:
1、如果两张表的分区键是一致的,那么动态分区插入是很快。因为是直接进行了文件拷贝。
2、在CDH的hive执行SQ来完成后,觉得在执行自动合并小文件,原来应该生成了200*700多个文件,紧接着又执行了些任务,最终文件数为700多个。具体还需要抽空去研究下。