CDH 上 进行hbase bulkload

在从HIVE导入全量数据到HBASE的时候,我们使用SQL的方式是insert into table hbase_a select * from A.但是这种方式在17个数据节点上速度很慢,单表5亿数据需要50分钟,即便我们做了预分区。一共有五张表大表需要导入,时间难以忍受。

我们的ROWKEY都是做了MD5的,所以预分区很好分,我们17个REGION SERVER,分了17个区(我记得哪里推荐一个REGION SERVER 10个分区)。最终导入完毕后,检查分区数,远高于这个值。

后来发现HBASE全量导入有BULKLOAD这样的方法,性能很快。以后流程主要参考两篇文章:
1、https://cwiki.apache.org/confluence/display/Hive/HBaseBulkLoad
2、https://community.cloudera.com/t5/Storage-Random-Access-HDFS/HBase-slow-bulk-loading-using-Hive/td-p/43649

首先要在HIVE上进行配置,在hive辅助JAR目录,配置这个路径。我这里设置的路径为hive用户的home目录。

这个配置是针对HIVESERVER2的,如果你有多个HIVESERVER2实例,每个实例的目录文件需要一致的。配置这个目录后,就需要拷贝多个JAR包到对应的目录下:

sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-client.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-server.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-common.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-protocol.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hive-hbase-handler.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-contrib.jar /user/hive/

赋权操作:

sudo -u hdfs hdfs dfs -chmod 554 /user/hive/.jar
sudo -u hdfs hdfs dfs -chown hive:hive /user/hive/.jar

下面这一大段操作,实际参考了HIVE的原始文章,大致流程:
1、建立存储HBASE分区键的表
2、从原始数据表中抽样出ROWKEY拆分键,它对原始数据进行排序,然后从排序中中找出对应的切分键,作为HBASE的ROWKEY SPLIT KEY。如果要生成17个分区,就要生成16个KEY.最后插入到前面建立的分区键表。

hdfs dfs -rm -r /tmp/hb_range_keys
hdfs dfs -mkdir /tmp/hb_range_keys

beeline -u “jdbc:hive2://” -e “CREATE EXTERNAL TABLE IF NOT EXISTS testzx.hb_range_keys(transaction_id_range_start string) row format serde ‘org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe’ stored as inputformat ‘org.apache.hadoop.mapred.TextInputFormat’ outputformat ‘org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat’ location ‘/tmp/hb_range_keys’;”
beeline -u “jdbc:hive2://” -e ” create temporary function row_sequence as ‘org.apache.hadoop.hive.contrib.udf.UDFRowSequence’; INSERT OVERWRITE TABLE testzx.hb_range_keys SELECT a.id FROM ( SELECT row_sequence() as num, c.id FROM cust_app.app_cust_family c order by c.id) a WHERE ( a.num % ( round( ${total} / 12) ) ) = 0;”

将生成的分区键的数据文件,通过HDFS拷贝到hb_range_key_list目录。

hdfs dfs -rm -r /tmp/hb_range_key_list;
hdfs dfs -cp /tmp/hb_range_keys/* /tmp/hb_range_key_list;

注意:其实我在做这步骤的时候,没有成功。。。后来突然明白,我们的ROWKEY是MD5过后的,其实就是分区键本来就应该是既定的了。完全不需要去抽样出来!!根据这个思路,我直接将我做好的分区数据导入到前面的hb_range_keys表中,就完成了。关于HBASE如何针对字符串型的ROWKEY设计预分区键,可以参考下面:
1、https://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/
2、https://grokbase.com/t/hbase/user/128xr5amhs/md5-hash-key-and-splits
3、http://hbase.apache.org/book.html#important_configurations
4、https://groups.google.com/forum/#!topic/nosql-databases/wQkYBAYRFF0

接下来建立导出HFILE表

hdfs dfs -rm -r /tmp/hbsort;
hdfs dfs -mkdir /tmp/hbsort;
beeline -u “jdbc:hive2://” -e “drop table testzx.hbsort; CREATE TABLE testzx.hbsort (id string,agent_code string,cust_ecif_id string,real_name string,gender string,birthday string,age string,certi_type string,certi_code string,job_id string,job_zh string,relatives_ecif_id string,relatives_real_name string,relatives_gender string,relatives_birthday string,relatives_age string,relatives_certi_type string,relatives_certi_code string,relatives_job_id string,relatives_job_zh string,relation string,policy_num string) STORED AS INPUTFORMAT ‘org.apache.hadoop.mapred.TextInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.hbase.HiveHFileOutputFormat’ TBLPROPERTIES (‘hfile.family.path’ = ‘/tmp/hbsort/cf’);”

导入数据到导出HFILE表。主要设置的reduce task 数量要和前面设计的分区数多1.前面16个区,后面就应该17.


beeline -u “jdbc:hive2://” -e “set mapred.reduce.tasks=17;set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; set mapreduce.totalorderpartitioner.path=/tmp/hb_range_key_list; set hfile.compression=gz; INSERT OVERWRITE TABLE testzx.hbsort select t.* from testzx.test_bulk_load t cluster by t.id;”

执行数据导入到HBASE

sudo -u hdfs hdfs dfs -chgrp -R hbase /tmp/hbsort
sudo -u hdfs hdfs dfs -chmod -R 775 /tmp/hbsort
#下面的环境需要导出,但是应该是SSH SESSION级别的,因为我设置这个变量后。
#CDH的好多变量和这个冲突了
export HADOOP_CLASSPATH=hbase classpath

#制定导入
hadoop jar /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-server.jar completebulkload /tmp/hbsort test:test_bulk_load cf

至此完成。

hbase常用命令

1、hbase 创建namespace

create_namespace ‘test’

2、给一个用户赋权namespace完全管理权限:

grant ‘hive’,’RWXCA’,’@test’

PS:在cdh启用了kerberos后,hive在映射表到hbase的时候。会报权限问题:

我当前kinit的用户hive用户,但是报错的时候说的是hive/bdp04@PICC.COM这个用户没有权限。然后我执行了:

grant ‘hive/bdp04′,’RWXCA’,’@default’
grant ‘hive/bdp04@PICC.COM’,’RWXCA’,’@default’

都还是报错。接着我在hbae shell输入whoami,结果显示当前的用户是:

hive@PICC.COM

这个和kinit是一直。然后我赋权:

grant ‘hive@PICC.COM’,’RWXCA’,’@default’

也是无效的。。。哎。。最后一通乱赋权成功了。下面语句有效:

grant ‘hive’,’RWXCA’,’@default’

3、在hive映射一张表到hbae,指定的namespace

CREATE EXTERNAL TABLE hbase_image_20181124(rk string,
IMAGE_ID STRING)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,
image:IMAGE_ID STRING”)
TBLPROPERTIES(“hbase.table.name” = “test:hbase_image_20181124″,”hbase.mapred.output.outputtable” = “hbase_image_20181124”);

4、hbase删除表:

disable ‘test:hbase_image_20181124’
drop ‘test:hbase_image_20181124’

5、hbase创建到namespace为test的表:
create ‘test:hbase_image_20181124′,’image’,SPLITS => [‘0000000000000000000fffffffffffffffffffff’,’1000000000000000000000000000000000000000′,’2000000000000000000000000000000000000000′,’3000000000000000000000000000000000000000′,’4000000000000000000000000000000000000000′,’5000000000000000000000000000000000000000′,’6000000000000000000000000000000000000000′,’7000000000000000000000000000000000000000′,’8000000000000000000000000000000000000000′,’9000000000000000000000000000000000000000′,’a000000000000000000000000000000000000000′,’b000000000000000000000000000000000000000′,’c000000000000000000000000000000000000000′,’d000000000000000000000000000000000000000′,’e000000000000000000000000000000000000000′,’f000000000000000000000000000000000000000′]

PS:SPLITS是一种预分区的方法,防止在第一次建表的时候由于rowkey不均衡,在region server上造成热点数据,同时也可以加快数据加载速度。hive到hbase有一种blukload的方法。

6、将一个字符串传给hbase shell执行:
echo “create ‘hbase_test3′,’family_base’,SPLITS => [‘2′,’4′,’6′,’8′,’a’,’d’]” | hbase shell

7、hbase shell执行一个代码文件:
hbase shell ./sample_commands.txt
参考地址

HBase和Solr的性能对比

以前一直没有搞懂既然Solr可以做多维检索,为什么还要用HBase呢?HBase的Rowkey只能做一个行键查询,远不如Solr的多维检索,而且性能也不差。然后我们就做了一个测试,测试结果如下:

测试结论:如果查询可以做成基于rowkey查询的话,最好使用hBase,这个性能比Solr快太多。而且在1000个并发的时候,solr无法承受住压力,而hbase性能依旧良好。

 

从HIVE导入数据到HBase

从hive到hbase导入数据,最快的方式是使用bulkload。这种方式正好已经被hive实现了,可以从这个链接中找到原文解释。https://cwiki.apache.org/confluence/display/Hive/HBaseBulkLoad

整个代码非常简单,只有如下几步:

1、创建映射表。
CREATE TABLE new_hbase_table(rowkey string, x int, y int)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,cf:x,cf:y”);
SET hive.hbase.bulk=true;
INSERT OVERWRITE TABLE new_hbase_table
SELECT rowkey_expression, x, y FROM …any_hive_query…;
上面的代码只是先建表,然后再将数据插入到HBASE中。官方文档对上面的代码所做的背后处理,进行了简单的解释。
1、Decide how you want the data to look once it has been loaded into HBase.
决定hbase中的数据格式和模式。
2、Decide on the number of reducers you’re planning to use for parallelizing the sorting and HFile creation. This depends on the size of your data as well as cluster resources available.
根据数据量大小和集群可用资源,决定reucer数量。
3、Run Hive sampling commands which will create a file containing “splitter” keys which will be used for range-partitioning the data during sort.
计算预分区键。
4、Prepare a staging location in HDFS where the HFiles will be generated.
准备HFile的存放路径
5、Run Hive commands which will execute the sort and generate the HFiles.
(Optional: if HBase and Hive are running in different clusters, distcp the generated files from the Hive cluster to the HBase cluster.)
执行HIVE命令,生成HFile。
6、Run HBase script loadtable.rb to move the files into a new HBase table.
(Optional: register the HBase table as an external table in Hive so you can access it from there.)
将HFile装载入HBase表。
大致为以上步骤。建议看下官方文档,查阅每步的具体藐视。
但是经过性能测试后,实际的导入性能,只做到了每分钟100万的导入量。后来我尝试将原来的HIVE表做成parquet+snappy压缩,使spark的task数从513降低到了70,但是性能还是每分钟100万。说明这个性能是卡在HBASE。
—————————————————————————-
后来发现在导入的时候实际上,最好要提前在HBASE做预分区。虽然HIVE的官方指南中说明了它确实做了预分区,但是我发现CDH5.13应该是没有做。下图是在执行插入时,通过检测hbase的表的分区信息截图:
scan ‘hbase:meta’,{FILTER=>”PrefixFilter(‘testzx.h_cust_cont_zx’)”}
为什么要做预分区呢?我记得官方解释说是,做了预分区后,导入一开始的时候压力负载就会被打散。不会造成压力会集中。
除了预分区,另外一种有可能的时候数据倾斜。数据倾斜也会导致部分region server压力过大。
了解以上后,改造代码。我将rowkey做了md5,然后按照md5进行了预分区。参考资料:https://stackoverflow.com/questions/28165833/how-can-i-pre-split-in-hbase
1、在HBase中建立预分区表
MD5开头只有0,1,2,3,4,5,6,7,8,9,a,b,c,d,e,f,我有6个region server,分区信息如下:
create ‘h_cust_cont_zx’, ‘cf’,SPLITS => [‘3′,’6′,’9′,’c’,’e’]
2、然后再在HIVE中用外表映射到已建立好的HBase表中:
–这两个指令不知道有没有用
SET hive.hbase.bulk=true;
set hive.hbase.wal.enabled=false;
CREATE EXTERNAL TABLE h_cust_cont_zx(rk string,
ecif_id string,
customer_id string)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,cf:ecif_id
,cf:customer_id”)
TBLPROPERTIES(“hbase.table.name” = “h_cust_cont_zx”,”hbase.mapred.output.outputtable” = “h_cust_cont_zx”);
INSERT OVERWRITE TABLE h_cust_cont_zx
select md5(concat(ecif_id,customer_id)),* from t_r_cust_cont0720_snappy;
最后的导入性能是1.5小时3亿,每分钟333万。
—————————————————————————-
接着我追加了一个region server,将分区提升到了7个后。再进行导入,执行速度为每分钟接近700万。
使用了预分区,只是是负载在一开始的时候,使分区较为均衡。但是最终还是会进行自动分区。
上面图可以看见,虽然预分区7个。但是最终还是重分区了。