CDH 上 进行hbase bulkload

在从HIVE导入全量数据到HBASE的时候,我们使用SQL的方式是insert into table hbase_a select * from A.但是这种方式在17个数据节点上速度很慢,单表5亿数据需要50分钟,即便我们做了预分区。一共有五张表大表需要导入,时间难以忍受。

我们的ROWKEY都是做了MD5的,所以预分区很好分,我们17个REGION SERVER,分了17个区(我记得哪里推荐一个REGION SERVER 10个分区)。最终导入完毕后,检查分区数,远高于这个值。

后来发现HBASE全量导入有BULKLOAD这样的方法,性能很快。以后流程主要参考两篇文章:
1、https://cwiki.apache.org/confluence/display/Hive/HBaseBulkLoad
2、https://community.cloudera.com/t5/Storage-Random-Access-HDFS/HBase-slow-bulk-loading-using-Hive/td-p/43649

首先要在HIVE上进行配置,在hive辅助JAR目录,配置这个路径。我这里设置的路径为hive用户的home目录。

这个配置是针对HIVESERVER2的,如果你有多个HIVESERVER2实例,每个实例的目录文件需要一致的。配置这个目录后,就需要拷贝多个JAR包到对应的目录下:

sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-client.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-server.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-common.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-protocol.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hive-hbase-handler.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-contrib.jar /user/hive/

赋权操作:

sudo -u hdfs hdfs dfs -chmod 554 /user/hive/.jar
sudo -u hdfs hdfs dfs -chown hive:hive /user/hive/.jar

下面这一大段操作,实际参考了HIVE的原始文章,大致流程:
1、建立存储HBASE分区键的表
2、从原始数据表中抽样出ROWKEY拆分键,它对原始数据进行排序,然后从排序中中找出对应的切分键,作为HBASE的ROWKEY SPLIT KEY。如果要生成17个分区,就要生成16个KEY.最后插入到前面建立的分区键表。

hdfs dfs -rm -r /tmp/hb_range_keys
hdfs dfs -mkdir /tmp/hb_range_keys

beeline -u “jdbc:hive2://” -e “CREATE EXTERNAL TABLE IF NOT EXISTS testzx.hb_range_keys(transaction_id_range_start string) row format serde ‘org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe’ stored as inputformat ‘org.apache.hadoop.mapred.TextInputFormat’ outputformat ‘org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat’ location ‘/tmp/hb_range_keys’;”
beeline -u “jdbc:hive2://” -e ” create temporary function row_sequence as ‘org.apache.hadoop.hive.contrib.udf.UDFRowSequence’; INSERT OVERWRITE TABLE testzx.hb_range_keys SELECT a.id FROM ( SELECT row_sequence() as num, c.id FROM cust_app.app_cust_family c order by c.id) a WHERE ( a.num % ( round( ${total} / 12) ) ) = 0;”

将生成的分区键的数据文件,通过HDFS拷贝到hb_range_key_list目录。

hdfs dfs -rm -r /tmp/hb_range_key_list;
hdfs dfs -cp /tmp/hb_range_keys/* /tmp/hb_range_key_list;

注意:其实我在做这步骤的时候,没有成功。。。后来突然明白,我们的ROWKEY是MD5过后的,其实就是分区键本来就应该是既定的了。完全不需要去抽样出来!!根据这个思路,我直接将我做好的分区数据导入到前面的hb_range_keys表中,就完成了。关于HBASE如何针对字符串型的ROWKEY设计预分区键,可以参考下面:
1、https://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/
2、https://grokbase.com/t/hbase/user/128xr5amhs/md5-hash-key-and-splits
3、http://hbase.apache.org/book.html#important_configurations
4、https://groups.google.com/forum/#!topic/nosql-databases/wQkYBAYRFF0

接下来建立导出HFILE表

hdfs dfs -rm -r /tmp/hbsort;
hdfs dfs -mkdir /tmp/hbsort;
beeline -u “jdbc:hive2://” -e “drop table testzx.hbsort; CREATE TABLE testzx.hbsort (id string,agent_code string,cust_ecif_id string,real_name string,gender string,birthday string,age string,certi_type string,certi_code string,job_id string,job_zh string,relatives_ecif_id string,relatives_real_name string,relatives_gender string,relatives_birthday string,relatives_age string,relatives_certi_type string,relatives_certi_code string,relatives_job_id string,relatives_job_zh string,relation string,policy_num string) STORED AS INPUTFORMAT ‘org.apache.hadoop.mapred.TextInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.hbase.HiveHFileOutputFormat’ TBLPROPERTIES (‘hfile.family.path’ = ‘/tmp/hbsort/cf’);”

导入数据到导出HFILE表。主要设置的reduce task 数量要和前面设计的分区数多1.前面16个区,后面就应该17.


beeline -u “jdbc:hive2://” -e “set mapred.reduce.tasks=17;set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; set mapreduce.totalorderpartitioner.path=/tmp/hb_range_key_list; set hfile.compression=gz; INSERT OVERWRITE TABLE testzx.hbsort select t.* from testzx.test_bulk_load t cluster by t.id;”

执行数据导入到HBASE

sudo -u hdfs hdfs dfs -chgrp -R hbase /tmp/hbsort
sudo -u hdfs hdfs dfs -chmod -R 775 /tmp/hbsort
#下面的环境需要导出,但是应该是SSH SESSION级别的,因为我设置这个变量后。
#CDH的好多变量和这个冲突了
export HADOOP_CLASSPATH=hbase classpath

#制定导入
hadoop jar /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-server.jar completebulkload /tmp/hbsort test:test_bulk_load cf

至此完成。

hive CDH5.13 增量更新数据的方法

我们使用的平台是CDH5.13

HIVE使用ORC格式,并启用事务,是可以做到更新的,但是CDH不建议使用。

在CDH中要么使用impala+kudu,要么你就得想一个办法进行增量更新。而我们采用的就是后者。

我们首先使用的方式是:

1、对每一张表创建一个按天分区的历史拉链表。该表每天对应的增量数据都直接LOAD到该表里面。但是这样就会有一个问题,update的数据就会重复,也没法供后续脚本使用

2、所以第二步,就需要形成一个当日全量最新表。这个步骤比较简单,只是对his表进行distribute by 一个主键 并按 日期排序,只选出同主键的排序第一条数据即可。

过程很简单,但是当我们碰到五亿量级以上的单表数据的时候,一张表一次生成全量最新表的过程就会超过10分钟以上,而且使用的全集群资源,没有并行的可能性。而我们一共有十几张这样的大表,整个执行时间长的令人羞愧。让客户看着觉得,这大数据怎么这么差?!

优化,优化,绞尽脑汁的优化。可惜我能力有限,只能检索。还检索一篇文章,是informatica的一篇文章http://blog.tec13.com/wp-content/uploads/2018/12/1097-StrategiesForIncrementalUpdatesOnHIve-H2L-1.pdf。原来还有两种方式:

1、分区合并:之前使用的distribute by 是对整个表重新生成,实际上是没有必要的。因为很多业务数据会按照一定的规则聚集。我分析了一张的增量数据中存在一个insert_time,其中每天的增量,仅会大致分布在十五内,不管这是哪十五天。这样的话,如果将最新全量表按照insert_time的每天进行分区,那我们每天只需要更新这个十五个分区的数据,而不是整表数据。这个性能飞跃是毋庸置疑的。

其次,我们可以做成一个策略,小于五亿的表还是用distribute by,而大于五亿的部分表再使用前面说的按某一个日期分区。

2、当日全量表建立在hbase:很明显了,hbase是肯定是立即更新的,而且很快。但是有个顾虑,如果将hbase的表用于后续sql脚本进行大量join的话,其性能显然是不足的。除非用的次数不多那还能用这种策略。我也尝试想更新hbase表,再反抽形成一张hive表。可惜性能也是不好,因为这就和distribute by一样,要搬动全表。

后续再补充这篇文章,并给些样例代码。

solr 一主多从模式灌数据总是报错

我们有一张大宽表,大概200个字段,共有20亿左右的数据。在使用spark并行从hive导入Solr的时候,总是会报错,不是 no register leader 就是we are leader这类的错误。

后来搜索后发现这类问题有两类原因:

1、solr 内存过小,导致GC时间过长,从而让zk认为solr已经挂掉了。但是我们给了16G的内存还算不错,同时也检查了solr的gc日志,没有发现有GC时间过长的情况。

2、solr 与Zookeeper的timeout 时间过短。这正是我们的问题所在,我们在安装solr的时候没有指定solr的timeout时间。http://lucene.472066.n3.nabble.com/SOLR-zookeeper-connection-timeout-during-startup-is-hardcoded-to-10000ms-td4403341.html#a4403671

解决方法:

找到/ETC/solr.in.sh,添加配置:

ZK_CLIENT_TIMEOUT=”120000″

同时调整zookeeper的最大会话超时时间为160秒。

13亿表的自关联优化

今天在做一个大表的自关联优化。这张表中共计有13亿的数据量,设表为A,具体字段只有三个

CUST_ID_A 客户A的ID
CUST_ID_B 客户B的ID
RELATION 关系

A.CUST_ID_A 其中一个值的最高重复条数有3万条,这样光这一个CUST_ID_A,如果自关联会预计会有3万*3万=9亿条。

原SQL如下:

select *
from A as left
inner join A as right on left.cust_id_b = right.cust_id_a

这个SQL会执行60分钟左右,无法满足我们的要求。

第一次优化:
我将A表复制成两张表,分别在CUST_ID_A和CUST_ID_B进行分桶,进而想通过分桶JOIN提供更高的JOIN速度。

create table bucket_a(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_A) SORTED BY(CUST_ID_A) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

create table bucket_b(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_B) SORTED BY(CUST_ID_B) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

select *
from bucket_b as left
inner join bucket_a as right on left.cust_id_b = right.cust_id_a

可惜效果依然很差,时间和之前一样,没有效果。通过这个,我觉得是因为我把握错关键点了。现在其实是数据倾斜,分桶解决不了数据倾斜,因为同一个ID还是会打入同一个通,依然是串行的。
而我的目标两个方向:
1、优化SQL计算逻辑,不再计算全量,剔除A表中倾斜数据。这个虽然逻辑上想通了可以做剔除出来单独计算,但是短期做不了。
2、目前明确倾斜的数据在执行的时候,是串行的。我们只需要将这个串行的改为并行的就行。

解决方法一:
主要逻辑是在A表增加一个字段part(int),然后将这13亿的数据打散成20份。然后使用union all 聚合。这个过程每个一个union里面的逻辑,都是并行的!整个过程只花了8分钟。

create table parted_a(
CUST_ID_A String,
CUST_ID_B String,
part int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE parted_a
SELECT *,FLOOR(RAND() * 20) --给每一条数据赋予一个20范围内的随机值,这就类似分区了。
FROM A;

--执行手动UNION ALL 20个分片。。。。。。。。。。。。SQL好长好长,但是可以执行
--注意右表还是原来的全量表,这样才能不会造成数据丢失
INSERT INTO TABLE RESULT
SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 1

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 2

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 3
。。。。。。。


整个过程花了8分钟,最终生成了141亿的数据。。。时间优化成功了,但是数据量还是太大了,最终还是要去优化SQL逻辑

解决方法二:
进行动态分区。可惜没有效果,很奇怪。。哎,和我预计玩不一样。整个执行时间和原来一样。

set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions= 1000;
set hive.exec.max.dynamic.partitions.pernode=1000;
create table dynamic_a(
CUST_ID_A String,
CUST_ID_B String
)
partitioned by (part int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE dynamic_a PARTITION(part)
SELECT *,FLOOR(RAND() * 500) --分区要在最后一个字段。
FROM A;

INSERT INTO TABLE RESULT
SELECT *
FROM dynamic_a AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A

原以为能完美的解决,可惜还是最后几个task变得异常慢。看来还是数据倾斜了,得抽空好好研究下HIVE具体怎么执行的了。

CDH常用调优参数

–规划—减少yarn内存
–yarn占据146G,solr两个实例,各16GB;hbase40GB;

–每个节点上5个container

–yarn
yarn.nodemanager.resource.memory-mb=146GB
yarn.nodemanager.resource.cpu-vcores=32
yarn.scheduler.maximum-allocation-mb=20GB

yarn.nodemanager.resource.memory-mb=108GB
yarn.nodemanager.resource.cpu-vcores=24
yarn.scheduler.maximum-allocation-mb=20GB

–mapreduce
yarn.app.mapreduce.am.resource.mb=10GB
mapreduce.map.memory.mb=10GB
mapreduce.reduce.memory.mb=10GB

–hive
spark.executor.cores=4
spark.executor.memory=16 G
spark.executor.memoryOverhead=2 G
spark.driver.memory=10.5GB
spark.yarn.driver.memoryOverhead=1.5gb

–hive server2
hive metate server heap= 16gb
hiveserver2 heap =16gb

–zookeeper
maxClientCnxns=300
SERVER JAVA heapszie=8GB

–spark和spark2
spark.authentication = spark

–hdfs
dfs.namenode.handler.count=10 –等于数据盘的数量
dfs.datanode.sync.behind.writes=true
dfs.datanode.max.transfer.threads=8192
namenode heap size=16GB
–hbase
hbase.hstore.compactionThreshold=5
HBase RegionServer java heap = 32G