HIVE 几个 by的区别

参考资料:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy

Order By:
1、在hive.mapred.mode=strict模式下,order by 后面必须加limit。因为不加的话,order by会对所有的数据进行排序。而要进行全局排序,只能使用一个Reducer进行排序,如果结果集数据量异常大的话,速度异常缓慢。

Sort by VS Order by

1、Sort by是将在一个Reducer中的数据进行局部排序,如果有多Reducer,那么结果集只能是局部排序的。而Order By是所有Reducer汇总到一个Reducer进行全局排序。

Cluster By VS Distribute By

1、Cluster By 只是 Distribute By 和Sort By 的简写。即:Cluster By 字段A ==Distribute By 字段A Sort By 字段A。那么既然这样,为什么还要Distribute By和Sort By呢?那就是当Distribute By和Sort By字段不一致的时候就需要了。例如Distribute By 字段A Sort By 字段B 字段C。
2、Cluster By 因为包含Sort by,所以在每一个Reducer中的数据是有序的。
3、Distribute By 是无序的。
4、当Distribute By和Sort By的字段不一致时,就无法使用Cluster By。

hive dynamic partition 性能 优化

今天正在做分区合并的时候,需要初始化一部分测试数据,数据量共计有34亿,每10个分片的数据量合计在4亿左右,所以想偷懒直接使用动态分区来实现:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
) t;

在执行这段代码的时候,完全跑不动,直接就报错了。报错为JAVA HEAP SPACE OOM。
我也不明白什么原因,认为是数据过多,然后我改进SQL进行分批跑,但是结果还是一样,依然是JAVA HEAP SPACE OOM

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t ;

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn >= 10 and RN_T.rn < 20
) t ;

看了下执行计划,额,确实也看不懂。只能面向搜索编程了。
经过一番搜索,都说要在动态分区后面加一个 distribute by。大致意思是,在前面的SQL中,执行动态分区的时候,每一个mapper内部都要进行排序,而用了distribute by后,就不会排序了。

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime;

追加了DISTRIBUTE BY后,不报错了。但是执行速度确是异常缓慢,处理10行就要将进半小时,780/10= 78条,这个时间难以接受。
在不断的搜索中,接近真相。。。参考文章:https://qubole.zendesk.com/hc/en-us/articles/214885263-HIVE-Dynamic-Partitioning-tips
文章里面提到:
1、使用DISTRIBUTE BY 分区字段,存在一个问题:使用DISTRIBUTE BY 分区字段后,每一个分区只有一个mapper,这样如果有的分区数据量过大,就会数据倾斜,执行缓慢。而我这个例子,明明集群资源足够,却没有充分利用。集群可以启动200个mapper,但是由于SQL只有10个分区,最终只有10个mapper。虽然每个分区最大2000万数据量,但是速度还是很慢,大致半小时左右。
2、DISTRIBUTE BY 分区字段,将每个分区的数据写入一个文件中。前面10个分区,就会有10个文件。

文中提出一个修改方案:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime,LIST_ID;

DISTRIBUTE BY 两个字段,第一个是分区字段,第二个是高基数的维度,这里我选择的是该表唯一主键。
整个执行速度很快,大致两分钟左右。通过DISTRIBUTE BY 两个字段,而且第二个字段是唯一主键,所有数据均匀的分在了每一个mapper中。
但是这里需要注意一个问题:
因为所有的数据均匀打散到每一个Mapper中,所以每一个mapper中就有10个分区的数据,这样每一个mapper就要将写入10个文件中。假设有400个mapper,设为M,10个分区,设为P,那么就会有 M*P=4000个数据文件。这样就会有太多文件,而且文件比较小。要想控制这个结果,可以人为的设置mapper数量。

set mapred.reduce.tasks=200;

PS:
1、如果两张表的分区键是一致的,那么动态分区插入是很快。因为是直接进行了文件拷贝。
2、在CDH的hive执行SQ来完成后,觉得在执行自动合并小文件,原来应该生成了200*700多个文件,紧接着又执行了些任务,最终文件数为700多个。具体还需要抽空去研究下。

solr 分片与复制

在没有使用solrcloude的时候,可以使用如下的架构图进行分片与复制:

具体可以参考solr的官方指南:Combining Distribution and Replication  章节 。简单来说就是一主两从。从片的同步方式,我觉得应该是通过从主片同步快照的方式,来实现的。

Snapshot :A directory containing hard links to the data files of an index. Snapshots are distributed from the master nodes when the slaves pull them, “smart copying” any segments the slave node does not have in snapshot directory that contains the hard links to the most recent index data files.

在solr cloud里面,这种方式主要是用来进行备份使用。其实我个人觉得这种快照的复制方式应该是用来快速备份的,不应该是用来进行主从分片同步用的,更可能用的是PULL索引文件的形式。

Solr cloud提供了三种分片模式,分别为:

NRT: This is the default. A NRT replica (NRT = NearRealTime) maintains a transaction log and writes new documents to it’s indexes locally. Any replica of this type is eligible to become a leader. Traditionally, this was the only type supported by Solr.

TLOG: This type of replica maintains a transaction log but does not index document changes locally. This type helps speed up indexing since no commits need to occur in the replicas. When this type of replica needs to update its index, it does so by replicating the index from the leader. This type of replica is also eligible to become a shard leader; it would do so by first processing its transaction log. If it does become a leader, it will behave the same as if it was a NRT type of replica.

PULL: This type of replica does not maintain a transaction log nor index document changes locally. It only replicates the index from the shard leader. It is not eligible to become a shard leader and doesn’t participate in shard leader election at all.

这三种模式的主要分别是,NRT可以做主片,可以使用近实时索引(支持SOFT COMMIT),同步索引靠数据转发;TLOG也可以做主片,当为主片是和NRT一致,不能近实时索引,从片需要和主片同步的时候,只是从从片同步索引文件;PULL不能做主片,仅从主片同步索引文件。
创建分片的时候,副本默认使用的NRT模式。

Solr cloud可推荐使用的分片组合方式:

1、全部NRT:适用于小到中级的集群;更新吞吐量不太高的大型集群;

2、全部TLOG:不需要 实时索引;每一个分片的副本数较多;同时需要所有分片都能切换为主片;

3、TLOG+PULL:不需要 实时索引;每一个分片的副本数较多;提高查询能力,能够容忍短时的过期数据。

我们做了个测试:solr7.3,16个物理节点,每个节点3个实例,每个实例20G,需要索引的数据量为一亿,160个spark executors,一主两从(注:我们不需要NRT特性,我们是夜间批量)

1、全部NRT需要20分钟,大概每分钟600-800万。

2、全部TLOG需要10分钟,大概每分钟10000万。

3、TLOG+PULL:未测

PS:solr创建索引,可以先使用solr自带的zk脚本工具中uploadconfig方法上传配置文件,再使用solr的collection api里面的createcollection方法创建。

一份参考资料:

下面参考文章的原文地址:https://berlinbuzzwords.de/sites/berlinbuzzwords.de/files/media/documents/replicatypes-berlinbuzzwords.pdf

其中有启发的一张图:

CDH 上 进行hbase bulkload

在从HIVE导入全量数据到HBASE的时候,我们使用SQL的方式是insert into table hbase_a select * from A.但是这种方式在17个数据节点上速度很慢,单表5亿数据需要50分钟,即便我们做了预分区。一共有五张表大表需要导入,时间难以忍受。

我们的ROWKEY都是做了MD5的,所以预分区很好分,我们17个REGION SERVER,分了17个区(我记得哪里推荐一个REGION SERVER 10个分区)。最终导入完毕后,检查分区数,远高于这个值。

后来发现HBASE全量导入有BULKLOAD这样的方法,性能很快。以后流程主要参考两篇文章:
1、https://cwiki.apache.org/confluence/display/Hive/HBaseBulkLoad
2、https://community.cloudera.com/t5/Storage-Random-Access-HDFS/HBase-slow-bulk-loading-using-Hive/td-p/43649

首先要在HIVE上进行配置,在hive辅助JAR目录,配置这个路径。我这里设置的路径为hive用户的home目录。

这个配置是针对HIVESERVER2的,如果你有多个HIVESERVER2实例,每个实例的目录文件需要一致的。配置这个目录后,就需要拷贝多个JAR包到对应的目录下:

sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-client.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-server.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-common.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-protocol.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hive-hbase-handler.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-contrib.jar /user/hive/

赋权操作:

sudo -u hdfs hdfs dfs -chmod 554 /user/hive/.jar
sudo -u hdfs hdfs dfs -chown hive:hive /user/hive/.jar

下面这一大段操作,实际参考了HIVE的原始文章,大致流程:
1、建立存储HBASE分区键的表
2、从原始数据表中抽样出ROWKEY拆分键,它对原始数据进行排序,然后从排序中中找出对应的切分键,作为HBASE的ROWKEY SPLIT KEY。如果要生成17个分区,就要生成16个KEY.最后插入到前面建立的分区键表。

hdfs dfs -rm -r /tmp/hb_range_keys
hdfs dfs -mkdir /tmp/hb_range_keys

beeline -u “jdbc:hive2://” -e “CREATE EXTERNAL TABLE IF NOT EXISTS testzx.hb_range_keys(transaction_id_range_start string) row format serde ‘org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe’ stored as inputformat ‘org.apache.hadoop.mapred.TextInputFormat’ outputformat ‘org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat’ location ‘/tmp/hb_range_keys’;”
beeline -u “jdbc:hive2://” -e ” create temporary function row_sequence as ‘org.apache.hadoop.hive.contrib.udf.UDFRowSequence’; INSERT OVERWRITE TABLE testzx.hb_range_keys SELECT a.id FROM ( SELECT row_sequence() as num, c.id FROM cust_app.app_cust_family c order by c.id) a WHERE ( a.num % ( round( ${total} / 12) ) ) = 0;”

将生成的分区键的数据文件,通过HDFS拷贝到hb_range_key_list目录。

hdfs dfs -rm -r /tmp/hb_range_key_list;
hdfs dfs -cp /tmp/hb_range_keys/* /tmp/hb_range_key_list;

注意:其实我在做这步骤的时候,没有成功。。。后来突然明白,我们的ROWKEY是MD5过后的,其实就是分区键本来就应该是既定的了。完全不需要去抽样出来!!根据这个思路,我直接将我做好的分区数据导入到前面的hb_range_keys表中,就完成了。关于HBASE如何针对字符串型的ROWKEY设计预分区键,可以参考下面:
1、https://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/
2、https://grokbase.com/t/hbase/user/128xr5amhs/md5-hash-key-and-splits
3、http://hbase.apache.org/book.html#important_configurations
4、https://groups.google.com/forum/#!topic/nosql-databases/wQkYBAYRFF0

接下来建立导出HFILE表

hdfs dfs -rm -r /tmp/hbsort;
hdfs dfs -mkdir /tmp/hbsort;
beeline -u “jdbc:hive2://” -e “drop table testzx.hbsort; CREATE TABLE testzx.hbsort (id string,agent_code string,cust_ecif_id string,real_name string,gender string,birthday string,age string,certi_type string,certi_code string,job_id string,job_zh string,relatives_ecif_id string,relatives_real_name string,relatives_gender string,relatives_birthday string,relatives_age string,relatives_certi_type string,relatives_certi_code string,relatives_job_id string,relatives_job_zh string,relation string,policy_num string) STORED AS INPUTFORMAT ‘org.apache.hadoop.mapred.TextInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.hbase.HiveHFileOutputFormat’ TBLPROPERTIES (‘hfile.family.path’ = ‘/tmp/hbsort/cf’);”

导入数据到导出HFILE表。主要设置的reduce task 数量要和前面设计的分区数多1.前面16个区,后面就应该17.


beeline -u “jdbc:hive2://” -e “set mapred.reduce.tasks=17;set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; set mapreduce.totalorderpartitioner.path=/tmp/hb_range_key_list; set hfile.compression=gz; INSERT OVERWRITE TABLE testzx.hbsort select t.* from testzx.test_bulk_load t cluster by t.id;”

执行数据导入到HBASE

sudo -u hdfs hdfs dfs -chgrp -R hbase /tmp/hbsort
sudo -u hdfs hdfs dfs -chmod -R 775 /tmp/hbsort
#下面的环境需要导出,但是应该是SSH SESSION级别的,因为我设置这个变量后。
#CDH的好多变量和这个冲突了
export HADOOP_CLASSPATH=hbase classpath

#制定导入
hadoop jar /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-server.jar completebulkload /tmp/hbsort test:test_bulk_load cf

至此完成。

hive CDH5.13 增量更新数据的方法

我们使用的平台是CDH5.13

HIVE使用ORC格式,并启用事务,是可以做到更新的,但是CDH不建议使用。

在CDH中要么使用impala+kudu,要么你就得想一个办法进行增量更新。而我们采用的就是后者。

我们首先使用的方式是:

1、对每一张表创建一个按天分区的历史拉链表。该表每天对应的增量数据都直接LOAD到该表里面。但是这样就会有一个问题,update的数据就会重复,也没法供后续脚本使用

2、所以第二步,就需要形成一个当日全量最新表。这个步骤比较简单,只是对his表进行distribute by 一个主键 并按 日期排序,只选出同主键的排序第一条数据即可。

过程很简单,但是当我们碰到五亿量级以上的单表数据的时候,一张表一次生成全量最新表的过程就会超过10分钟以上,而且使用的全集群资源,没有并行的可能性。而我们一共有十几张这样的大表,整个执行时间长的令人羞愧。让客户看着觉得,这大数据怎么这么差?!

优化,优化,绞尽脑汁的优化。可惜我能力有限,只能检索。还检索一篇文章,是informatica的一篇文章http://blog.tec13.com/wp-content/uploads/2018/12/1097-StrategiesForIncrementalUpdatesOnHIve-H2L-1.pdf。原来还有两种方式:

1、分区合并:之前使用的distribute by 是对整个表重新生成,实际上是没有必要的。因为很多业务数据会按照一定的规则聚集。我分析了一张的增量数据中存在一个insert_time,其中每天的增量,仅会大致分布在十五内,不管这是哪十五天。这样的话,如果将最新全量表按照insert_time的每天进行分区,那我们每天只需要更新这个十五个分区的数据,而不是整表数据。这个性能飞跃是毋庸置疑的。

其次,我们可以做成一个策略,小于五亿的表还是用distribute by,而大于五亿的部分表再使用前面说的按某一个日期分区。

2、当日全量表建立在hbase:很明显了,hbase是肯定是立即更新的,而且很快。但是有个顾虑,如果将hbase的表用于后续sql脚本进行大量join的话,其性能显然是不足的。除非用的次数不多那还能用这种策略。我也尝试想更新hbase表,再反抽形成一张hive表。可惜性能也是不好,因为这就和distribute by一样,要搬动全表。

后续再补充这篇文章,并给些样例代码。

solr 一主多从模式灌数据总是报错

我们有一张大宽表,大概200个字段,共有20亿左右的数据。在使用spark并行从hive导入Solr的时候,总是会报错,不是 no register leader 就是we are leader这类的错误。

后来搜索后发现这类问题有两类原因:

1、solr 内存过小,导致GC时间过长,从而让zk认为solr已经挂掉了。但是我们给了16G的内存还算不错,同时也检查了solr的gc日志,没有发现有GC时间过长的情况。

2、solr 与Zookeeper的timeout 时间过短。这正是我们的问题所在,我们在安装solr的时候没有指定solr的timeout时间。http://lucene.472066.n3.nabble.com/SOLR-zookeeper-connection-timeout-during-startup-is-hardcoded-to-10000ms-td4403341.html#a4403671

解决方法:

找到/ETC/solr.in.sh,添加配置:

ZK_CLIENT_TIMEOUT=”120000″

同时调整zookeeper的最大会话超时时间为160秒。

13亿表的自关联优化

今天在做一个大表的自关联优化。这张表中共计有13亿的数据量,设表为A,具体字段只有三个

CUST_ID_A 客户A的ID
CUST_ID_B 客户B的ID
RELATION 关系

A.CUST_ID_A 其中一个值的最高重复条数有3万条,这样光这一个CUST_ID_A,如果自关联会预计会有3万*3万=9亿条。

原SQL如下:

select *
from A as left
inner join A as right on left.cust_id_b = right.cust_id_a

这个SQL会执行60分钟左右,无法满足我们的要求。

第一次优化:
我将A表复制成两张表,分别在CUST_ID_A和CUST_ID_B进行分桶,进而想通过分桶JOIN提供更高的JOIN速度。

create table bucket_a(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_A) SORTED BY(CUST_ID_A) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

create table bucket_b(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_B) SORTED BY(CUST_ID_B) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

select *
from bucket_b as left
inner join bucket_a as right on left.cust_id_b = right.cust_id_a

可惜效果依然很差,时间和之前一样,没有效果。通过这个,我觉得是因为我把握错关键点了。现在其实是数据倾斜,分桶解决不了数据倾斜,因为同一个ID还是会打入同一个通,依然是串行的。
而我的目标两个方向:
1、优化SQL计算逻辑,不再计算全量,剔除A表中倾斜数据。这个虽然逻辑上想通了可以做剔除出来单独计算,但是短期做不了。
2、目前明确倾斜的数据在执行的时候,是串行的。我们只需要将这个串行的改为并行的就行。

解决方法一:
主要逻辑是在A表增加一个字段part(int),然后将这13亿的数据打散成20份。然后使用union all 聚合。这个过程每个一个union里面的逻辑,都是并行的!整个过程只花了8分钟。

create table parted_a(
CUST_ID_A String,
CUST_ID_B String,
part int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE parted_a
SELECT *,FLOOR(RAND() * 20) --给每一条数据赋予一个20范围内的随机值,这就类似分区了。
FROM A;

--执行手动UNION ALL 20个分片。。。。。。。。。。。。SQL好长好长,但是可以执行
--注意右表还是原来的全量表,这样才能不会造成数据丢失
INSERT INTO TABLE RESULT
SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 1

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 2

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 3
。。。。。。。


整个过程花了8分钟,最终生成了141亿的数据。。。时间优化成功了,但是数据量还是太大了,最终还是要去优化SQL逻辑

解决方法二:
进行动态分区。可惜没有效果,很奇怪。。哎,和我预计玩不一样。整个执行时间和原来一样。

set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions= 1000;
set hive.exec.max.dynamic.partitions.pernode=1000;
create table dynamic_a(
CUST_ID_A String,
CUST_ID_B String
)
partitioned by (part int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE dynamic_a PARTITION(part)
SELECT *,FLOOR(RAND() * 500) --分区要在最后一个字段。
FROM A;

INSERT INTO TABLE RESULT
SELECT *
FROM dynamic_a AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A

原以为能完美的解决,可惜还是最后几个task变得异常慢。看来还是数据倾斜了,得抽空好好研究下HIVE具体怎么执行的了。

CDH常用调优参数

–规划—减少yarn内存
–yarn占据146G,solr两个实例,各16GB;hbase40GB;

–每个节点上5个container

–yarn
yarn.nodemanager.resource.memory-mb=146GB
yarn.nodemanager.resource.cpu-vcores=32
yarn.scheduler.maximum-allocation-mb=20GB

yarn.nodemanager.resource.memory-mb=108GB
yarn.nodemanager.resource.cpu-vcores=24
yarn.scheduler.maximum-allocation-mb=20GB

–mapreduce
yarn.app.mapreduce.am.resource.mb=10GB
mapreduce.map.memory.mb=10GB
mapreduce.reduce.memory.mb=10GB

–hive
spark.executor.cores=4
spark.executor.memory=16 G
spark.executor.memoryOverhead=2 G
spark.driver.memory=10.5GB
spark.yarn.driver.memoryOverhead=1.5gb

–hive server2
hive metate server heap= 16gb
hiveserver2 heap =16gb

–zookeeper
maxClientCnxns=300
SERVER JAVA heapszie=8GB

–spark和spark2
spark.authentication = spark

–hdfs
dfs.namenode.handler.count=10 –等于数据盘的数量
dfs.datanode.sync.behind.writes=true
dfs.datanode.max.transfer.threads=8192
namenode heap size=16GB
–hbase
hbase.hstore.compactionThreshold=5
HBase RegionServer java heap = 32G

hive beeline 中文报错

今天在跑一个脚本的时候,死活报错:

我开始就认为就是SQL文件顶部的中文注释造成的问题,然后我删除了所有的中文注释依然报上面的错误。

原因:
SQL文件格式有问题,同事在编写SQL的时候是在WINDOWS电脑上编写的,所以文件是默认格式是WINDOWS。编码为UTF-8-BOM格式。

解决方法:
1、使用工具notepad++,将文件转换为unix文件

2、再将语言改为utf-8

hive 分桶优化实例

项目进行到快要上线了,一直有一个问题存在。就是整个项目做的客户画像指标过于复杂,导致短时间内无法对指标分析,编出增量的SQL代码,所以导致每天都要全量跑所有的客户画像指标,简化下就是每日跑全量,整个全量跑批在17个节点上大概三个小时,还能接受。。。然后我们又需要将每日最终的三张全量画像导入hbase和solr,hbase大概每分钟一百万,solr大概每分钟一千万。这个就难以接受了。因为hbase每分钟才一百万(没有做bulkload,使用hive自带的bulkload没有生效,以后再研究),整个数据量是在10亿左右,这个时间算下来难以想象。。。

1、最原始的流程

整个计算都是基于全量,导入HBASE也都是全量数据,大概是要每天导入10亿左右。
同事在使用华为平台的bulkload,大概能做到30分钟导入4个亿数据,集群大概四十台。但是我们使用CDH,没法使用华为的工具,最后采用hive原生的bulkload,集群只有二十台,原以为会很快,结果导入速度只达到了每分钟100万。

参考连接:hive原生bulkload到hbase

使用bulkload方案,肯定是最快,但是应该是cdh使用的hive版本低,达不到那个连接的效果,以后再试试其他方式
2、变更hbase导入为增量导入

由于我们使用的bulkload没法生效,时间又紧张。所以反复思考,突然灵光闪现。。。今天的增量数据其实不一定是前面的计算逻辑必须改成增量计算,只需比较两天的全量数据差异,即可拿到今日的增量。虽然前面的计算逻辑依然慢,但导入hbase的速度提高N倍,毕竟一天增量变化最多100万以内。
先举例说明如何对比今日全量表A和昨日全量B,得出数据差异。假设字段只有两个f1和f2。,对比sql非常简单:

insert into table t_increment
select A.*
from A
where not exists (
select 1
from B
where CONCAT(A.f1,A.f2) = CONCAT(B.f1,B.f2)
);

执行测试后,这里A,B两表都各有14亿左右的数据,发现对比速度异常缓慢。预估得要几个小时吧。
通过查看执行计划,发现hive在map阶段会对两表对比字段进行排序,然后进行reduce。

原以为这条路走不通,后来又想到not exists其实是做的outer join等实现的,所以利用分桶优化join的想法自然就冒出来。
首先在A表和B表加一个字段叫COMPARE_DIFF_KEY,然后在插入两表的时候就先把两个字段contact插入到COMPARE_DIFF_KEY中。然后在使用这一个字段关联,14亿的数据8-15分钟就能比对出结果。只是分桶建表会多花十分钟左右。

512桶执行计划

1024桶执行计划

a、建表语句

--这两个环境变量必须设置,不然分桶排序不会生效。
set hive.enforce.bucketing = true;
set hive.enforce.sorting=true;
create table A(
COMPARE_DIFF_KEY string,
f1 string,
f2 string
)
CLUSTERED BY(COMPARE_DIFF_KEY) SORTED BY (COMPARE_DIFF_KEY) INTO 1024 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

create table B(
COMPARE_DIFF_KEY string,
f1 string,
f2 string
)
CLUSTERED BY(COMPARE_DIFF_KEY) SORTED BY (COMPARE_DIFF_KEY) INTO 1024 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

b、插入数据的时候提前concat,做好比较键.拼出来的字段会很长,你也可以在concat后使用md5

insert into table A
select concat(x.f1,x.f2),x.* from x;

insert into table B
select concat(x.f1,x.f2),x.* from x;

c、执行比对:

insert into table t_increment
select A.*
from A
where not exists (
select 1
from B
where A.COMPARE_DIFF_KEY = B.COMPARE_DIFF_KEY
)

不幸的是,在执行的过程又会报错:

查看表情况,确实是num buchket为2048,但是numfiles为4096。这个报错只是说明不能使用map join,只能采用reducejoin。所以只能禁用掉map join,虽然我觉得性能会下降,但是相比原来还是会快很快。

set hive.auto.convert.sortmerge.join=false;
set hive.optimize.bucketmapjoin = false;
set hive.optimize.bucketmapjoin.sortedmerge = false;

3、以后的两个目标