cdh kerberosed hive jdbc demo

cdh启用kerberos后,通过jdbc连接hive需要额外的连接配置,具体代码如下:

System.setProperty("java.security.krb5.conf", "/Users/zhouxiang/develop/test/krb5.conf");
Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(conf);
String principal = System.getProperty("kerberosPrincipal", "hive@TEST.COM"); String keytabLocation = System.getProperty("kerberosKeytab", "/Users/zhouxiang/develop/test/hive.keytab"); UserGroupInformation.loginUserFromKeytab(principal, keytabLocation); String url = "jdbc:hive2://10.127.60.4:10000/default;auth=KERBEROS;principal=hive/bdp04@TEST.COM;"; Connection connection = DriverManager.getConnection(url, "", ""); Assert.assertNotNull(connection); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery("select count(*) from ods.test"); while(resultSet.next()) { System.out.println(resultSet.getInt(1)); }

hive multiple inserts 一次扫描多次插入

hive multiple inserts可以仅执行一次扫描,将这次扫描的数据分别根据不同的条件插入到多张表或者同一张表的不同分区中。但是不能插入同一张表!

Hive extension (multiple inserts):
FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1
[INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]
[INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;

这个语法可以降低扫描次数,如果分开多次插入,就会需要扫描多次。如果写成multiple inserts就只会扫表一次。官网解释如下:

Multiple insert clauses (also known as Multi Table Insert) can be specified in the same query.

Multi Table Inserts minimize the number of data scans required. Hive can insert data into multiple tables by scanning the input data just once (and applying different query operators) to the input data.

以下是执行计划截图:


测试情况:在20亿的数据中,执行扫描插入到六张表,只花了六分钟。

dynamic partition搬表 out of memory

在想启用dynamic partition来偷懒搬表的情况时,如果表的数据过大(例如我在六个节点上搬迁2亿数据)时,就会经常OOM。

样例SQL:

set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions= 1000;
set hive.exec.max.dynamic.partitions.pernode=1000;

CREATE TABLE B (
NAME STRING
)
partitioned BY (NAME2 string,NAME3 string,NAME4 string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
STORED AS PARQUET;

INSERT INTO TABLE B
PARTITION(NAME2,NAME3,NAME4)
select NAME2,NAME3,NAME4
FROM A;

我想将A表的数据进行分区,并导入B表。最简单的方式就启用动态分区。可是执行上面的SQL时,报出OOM错误。一般碰到OOM,我第一反应就是提高heap space。

set spark.executor.memory=25G;

结果还是一样OOM。难道是内存还不够吗?24G–>30G–>48GG。。还是不行。

最后找到一篇文章,启用hive.optimize.sort.dynamic.partition即可;

SET hive.optimize.sort.dynamic.partition=true

针对该参数和对应问题的解释:
该问题出现的原因是太多的文件同时进入写入。当启用hive.optimize.sort.dynamic.partition后面,动态分区列就会进行全局排序,这样REDUCE端,每个动态分区列的值只会有一个文件写入,从而REDUCE阶段降低了内存使用。

hive.optimize.sort.dynamic.partition
  • Default Value: true in Hive 0.13.0 and 0.13.1; false in Hive 0.14.0 and later (HIVE-8151)
  • Added In: Hive 0.13.0 with HIVE-6455

When enabled, dynamic partitioning column will be globally sorted. This way we can keep only one record writer open for each partition value in the reducer thereby reducing the memory pressure on reducers.
个人感觉就是在内部使用distribute by。

参考文章:
1、https://cloud.tencent.com/developer/article/1079007
2、https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
3、https://stackoverflow.com/questions/33147764/hive-setting-hive-optimize-sort-dynamic-partition

HIVE 几个 by的区别

参考资料:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy

Order By:
1、在hive.mapred.mode=strict模式下,order by 后面必须加limit。因为不加的话,order by会对所有的数据进行排序。而要进行全局排序,只能使用一个Reducer进行排序,如果结果集数据量异常大的话,速度异常缓慢。

Sort by VS Order by

1、Sort by是将在一个Reducer中的数据进行局部排序,如果有多Reducer,那么结果集只能是局部排序的。而Order By是所有Reducer汇总到一个Reducer进行全局排序。

Cluster By VS Distribute By

1、Cluster By 只是 Distribute By 和Sort By 的简写。即:Cluster By 字段A ==Distribute By 字段A Sort By 字段A。那么既然这样,为什么还要Distribute By和Sort By呢?那就是当Distribute By和Sort By字段不一致的时候就需要了。例如Distribute By 字段A Sort By 字段B 字段C。
2、Cluster By 因为包含Sort by,所以在每一个Reducer中的数据是有序的。
3、Distribute By 是无序的。
4、当Distribute By和Sort By的字段不一致时,就无法使用Cluster By。

hive dynamic partition 性能 优化

今天正在做分区合并的时候,需要初始化一部分测试数据,数据量共计有34亿,每10个分片的数据量合计在4亿左右,所以想偷懒直接使用动态分区来实现:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
) t;

在执行这段代码的时候,完全跑不动,直接就报错了。报错为JAVA HEAP SPACE OOM。
我也不明白什么原因,认为是数据过多,然后我改进SQL进行分批跑,但是结果还是一样,依然是JAVA HEAP SPACE OOM

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t ;

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn >= 10 and RN_T.rn < 20
) t ;

看了下执行计划,额,确实也看不懂。只能面向搜索编程了。
经过一番搜索,都说要在动态分区后面加一个 distribute by。大致意思是,在前面的SQL中,执行动态分区的时候,每一个mapper内部都要进行排序,而用了distribute by后,就不会排序了。

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime;

追加了DISTRIBUTE BY后,不报错了。但是执行速度确是异常缓慢,处理10行就要将进半小时,780/10= 78条,这个时间难以接受。
在不断的搜索中,接近真相。。。参考文章:https://qubole.zendesk.com/hc/en-us/articles/214885263-HIVE-Dynamic-Partitioning-tips
文章里面提到:
1、使用DISTRIBUTE BY 分区字段,存在一个问题:使用DISTRIBUTE BY 分区字段后,每一个分区只有一个mapper,这样如果有的分区数据量过大,就会数据倾斜,执行缓慢。而我这个例子,明明集群资源足够,却没有充分利用。集群可以启动200个mapper,但是由于SQL只有10个分区,最终只有10个mapper。虽然每个分区最大2000万数据量,但是速度还是很慢,大致半小时左右。
2、DISTRIBUTE BY 分区字段,将每个分区的数据写入一个文件中。前面10个分区,就会有10个文件。

文中提出一个修改方案:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime,LIST_ID;

DISTRIBUTE BY 两个字段,第一个是分区字段,第二个是高基数的维度,这里我选择的是该表唯一主键。
整个执行速度很快,大致两分钟左右。通过DISTRIBUTE BY 两个字段,而且第二个字段是唯一主键,所有数据均匀的分在了每一个mapper中。
但是这里需要注意一个问题:
因为所有的数据均匀打散到每一个Mapper中,所以每一个mapper中就有10个分区的数据,这样每一个mapper就要将写入10个文件中。假设有400个mapper,设为M,10个分区,设为P,那么就会有 M*P=4000个数据文件。这样就会有太多文件,而且文件比较小。要想控制这个结果,可以人为的设置mapper数量。

set mapred.reduce.tasks=200;

PS:
1、如果两张表的分区键是一致的,那么动态分区插入是很快。因为是直接进行了文件拷贝。
2、在CDH的hive执行SQ来完成后,觉得在执行自动合并小文件,原来应该生成了200*700多个文件,紧接着又执行了些任务,最终文件数为700多个。具体还需要抽空去研究下。

hive CDH5.13 增量更新数据的方法

我们使用的平台是CDH5.13

HIVE使用ORC格式,并启用事务,是可以做到更新的,但是CDH不建议使用。

在CDH中要么使用impala+kudu,要么你就得想一个办法进行增量更新。而我们采用的就是后者。

我们首先使用的方式是:

1、对每一张表创建一个按天分区的历史拉链表。该表每天对应的增量数据都直接LOAD到该表里面。但是这样就会有一个问题,update的数据就会重复,也没法供后续脚本使用

2、所以第二步,就需要形成一个当日全量最新表。这个步骤比较简单,只是对his表进行distribute by 一个主键 并按 日期排序,只选出同主键的排序第一条数据即可。

过程很简单,但是当我们碰到五亿量级以上的单表数据的时候,一张表一次生成全量最新表的过程就会超过10分钟以上,而且使用的全集群资源,没有并行的可能性。而我们一共有十几张这样的大表,整个执行时间长的令人羞愧。让客户看着觉得,这大数据怎么这么差?!

优化,优化,绞尽脑汁的优化。可惜我能力有限,只能检索。还检索一篇文章,是informatica的一篇文章http://blog.tec13.com/wp-content/uploads/2018/12/1097-StrategiesForIncrementalUpdatesOnHIve-H2L-1.pdf。原来还有两种方式:

1、分区合并:之前使用的distribute by 是对整个表重新生成,实际上是没有必要的。因为很多业务数据会按照一定的规则聚集。我分析了一张的增量数据中存在一个insert_time,其中每天的增量,仅会大致分布在十五内,不管这是哪十五天。这样的话,如果将最新全量表按照insert_time的每天进行分区,那我们每天只需要更新这个十五个分区的数据,而不是整表数据。这个性能飞跃是毋庸置疑的。

其次,我们可以做成一个策略,小于五亿的表还是用distribute by,而大于五亿的部分表再使用前面说的按某一个日期分区。

2、当日全量表建立在hbase:很明显了,hbase是肯定是立即更新的,而且很快。但是有个顾虑,如果将hbase的表用于后续sql脚本进行大量join的话,其性能显然是不足的。除非用的次数不多那还能用这种策略。我也尝试想更新hbase表,再反抽形成一张hive表。可惜性能也是不好,因为这就和distribute by一样,要搬动全表。

后续再补充这篇文章,并给些样例代码。

13亿表的自关联优化

今天在做一个大表的自关联优化。这张表中共计有13亿的数据量,设表为A,具体字段只有三个

CUST_ID_A 客户A的ID
CUST_ID_B 客户B的ID
RELATION 关系

A.CUST_ID_A 其中一个值的最高重复条数有3万条,这样光这一个CUST_ID_A,如果自关联会预计会有3万*3万=9亿条。

原SQL如下:

select *
from A as left
inner join A as right on left.cust_id_b = right.cust_id_a

这个SQL会执行60分钟左右,无法满足我们的要求。

第一次优化:
我将A表复制成两张表,分别在CUST_ID_A和CUST_ID_B进行分桶,进而想通过分桶JOIN提供更高的JOIN速度。

create table bucket_a(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_A) SORTED BY(CUST_ID_A) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

create table bucket_b(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_B) SORTED BY(CUST_ID_B) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

select *
from bucket_b as left
inner join bucket_a as right on left.cust_id_b = right.cust_id_a

可惜效果依然很差,时间和之前一样,没有效果。通过这个,我觉得是因为我把握错关键点了。现在其实是数据倾斜,分桶解决不了数据倾斜,因为同一个ID还是会打入同一个通,依然是串行的。
而我的目标两个方向:
1、优化SQL计算逻辑,不再计算全量,剔除A表中倾斜数据。这个虽然逻辑上想通了可以做剔除出来单独计算,但是短期做不了。
2、目前明确倾斜的数据在执行的时候,是串行的。我们只需要将这个串行的改为并行的就行。

解决方法一:
主要逻辑是在A表增加一个字段part(int),然后将这13亿的数据打散成20份。然后使用union all 聚合。这个过程每个一个union里面的逻辑,都是并行的!整个过程只花了8分钟。

create table parted_a(
CUST_ID_A String,
CUST_ID_B String,
part int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE parted_a
SELECT *,FLOOR(RAND() * 20) --给每一条数据赋予一个20范围内的随机值,这就类似分区了。
FROM A;

--执行手动UNION ALL 20个分片。。。。。。。。。。。。SQL好长好长,但是可以执行
--注意右表还是原来的全量表,这样才能不会造成数据丢失
INSERT INTO TABLE RESULT
SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 1

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 2

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 3
。。。。。。。


整个过程花了8分钟,最终生成了141亿的数据。。。时间优化成功了,但是数据量还是太大了,最终还是要去优化SQL逻辑

解决方法二:
进行动态分区。可惜没有效果,很奇怪。。哎,和我预计玩不一样。整个执行时间和原来一样。

set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions= 1000;
set hive.exec.max.dynamic.partitions.pernode=1000;
create table dynamic_a(
CUST_ID_A String,
CUST_ID_B String
)
partitioned by (part int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE dynamic_a PARTITION(part)
SELECT *,FLOOR(RAND() * 500) --分区要在最后一个字段。
FROM A;

INSERT INTO TABLE RESULT
SELECT *
FROM dynamic_a AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A

原以为能完美的解决,可惜还是最后几个task变得异常慢。看来还是数据倾斜了,得抽空好好研究下HIVE具体怎么执行的了。

hive beeline 中文报错

今天在跑一个脚本的时候,死活报错:

我开始就认为就是SQL文件顶部的中文注释造成的问题,然后我删除了所有的中文注释依然报上面的错误。

原因:
SQL文件格式有问题,同事在编写SQL的时候是在WINDOWS电脑上编写的,所以文件是默认格式是WINDOWS。编码为UTF-8-BOM格式。

解决方法:
1、使用工具notepad++,将文件转换为unix文件

2、再将语言改为utf-8

beeline gc overhead limit exceeded

由于集群启用了kerberos后,hive指令无法直接使用。只能改用beeline命令。

beeline连接kerberos后,需要更改执行连接指令:

beeline -u “jdbc:hive2://10.127.60.2:10000/default;principal=hive/bdp02@*.COM;”

连接后,执行指令的逻辑和原来不一样。以前hive在执行脚本的时候,如果返回大量数据的时候,hive指令依然能够返回,但是使用beeline后就会报错。

java.lang.OutOfMemoryError: GC overhead limit exceeded

 

这个beeline的gc问题可以进行优化,优化的方法可以参考:

https://www.cloudera.com/documentation/enterprise/5-13-x/topics/admin_hive_tuning.html