Snappy 是不可切分的,小心使用

之前一直都认为只要是大数据,只要选择Snappy作为压缩引擎就不会有问题。虽然压缩率不高,但是CPU消耗不高,解压速度快。所以特别适合用作热点数据表的压缩引擎。

但是今天在读 《Hadoop硬实战》的时候,发现了一个误解。原来Snappy是不可切分的。什么意思呢?简单来讲,就是千万别用来压缩Text文件。否则性能急剧下降。

———————————————————————————————

下面慢慢解释。。。。

1、首先得确认一点Snappy确实是不可切分的。

参考文章:https://stackoverflow.com/questions/32382352/is-snappy-splittable-or-not-splittable

但是为什么又有人说Snappy可以切分呢?其实我理解下来,一定要说明Snappy是不可切分的,之所以有人说可切分的,是因为把Snappy用在block级别,这样文件就可以切分了。

A、参考文章:http://blog.cloudera.com/blog/2011/09/snappy-and-hadoop/

This use alone justifies installing Snappy, but there are other places Snappy can be used within Hadoop applications. For example, Snappy can be used for block compression in all the commonly-used Hadoop file formats, including Sequence Files, Avro Data Files, and HBase tables.

One thing to note is that Snappy is intended to be used with a container format, like Sequence Files or Avro Data Files, rather than being used directly on plain text, for example, since the latter is not splittable and can’t be processed in parallel using MapReduce. This is different to LZO, where is is possible to index LZO compressed files to determine split points so that LZO files can be processed efficiently in subsequent processing.

B、参考文章:https://boristyukin.com/is-snappy-compressed-parquet-file-splittable/

For MapReduce, if you need your compressed data to be splittable, BZip2 and LZO formats can be split. Snappy and GZip blocks are not splittable, but files with Snappy blocks inside a container file format such as SequenceFile or Avro can be split. Snappy is intended to be used with a container format, like SequenceFiles or Avro data files, rather than being used directly on plain text, for example, since the latter is not splittable and cannot be processed in parallel using MapReduce. Splittability is not relevant to HBase data.

C、参考资料:https://stackoverflow.com/questions/32382352/is-snappy-splittable-or-not-splittable

This means that if a whole text file is compressed with Snappy then the file is NOT splittable. But if each record inside the file is compressed with Snappy then the file could be splittable, for example in Sequence files with block compression.

所以Snappy是不可切分,所以不要将Snappy用到大文件上。如果大文件的文件大小超过了HDFS的block size,即大文件由HDFS的多个block组成的时候。Map阶段就必须等到整个大文件全部解压完毕后才能执行。

2、parquet文件是可以切分的,所有snappy可以用在block级别的压缩。所以parquet+snappy是可压缩可切分的。

The consequence of storing the metadata in the footer is that reading a Parquet file requires an initial seek to the end of the file (minus 8 bytes) to read the footer metadata length, then a second seek backward by that length to read the footer metadata. Unlike sequence files and Avro datafiles, where the metadata is stored in the header and sync markers are used to separate blocks, Parquet files don’t need sync markers since the block boundaries are stored in the footer metadata. (This is possible because the metadata is written after all the blocks have been written, so the writer can retain the block boundary positions in memory until the file is closed.) Therefore, Parquet files are splittable, since the blocks can be located after reading the footer and can then be processed in parallel (by MapReduce, for example).

3、snappy+hbase也是没问题的。

4、snappy+textfile,就别考虑了。

HBase和Solr的性能对比

以前一直没有搞懂既然Solr可以做多维检索,为什么还要用HBase呢?HBase的Rowkey只能做一个行键查询,远不如Solr的多维检索,而且性能也不差。然后我们就做了一个测试,测试结果如下:

测试结论:如果查询可以做成基于rowkey查询的话,最好使用hBase,这个性能比Solr快太多。而且在1000个并发的时候,solr无法承受住压力,而hbase性能依旧良好。

 

CDH5.13上独立安装Solr7.3.1

CDH5.13默认带的Solr版本号比较低,只有4.10.3。这种低版本无法支持我们的应用需求,所以只能独立安装Solr7.3.1,唯一的缺点就是无法在CDH上面统一管理了。

1、下载Solr的安装包省略,下载后解压Solr安装包。下面的流程默认解压到当前用户根目录,即 ~

2、解压solr7

tar -zxvf solr-7.3.1.tgz

3、创建ZK的solr7 root

cd ~/solr-7.3.1/bin
./solr zk mkroot /solr7 -z 10.127.60.2,10.127.60.3,10.127.60.4:2181
4、查看端口占用情况,避免端口被CDH占用
netstat -nl | grep 9983
或者
sudo netstae -nltp | grep 9983,这样可以拿到正在使用端口的pid
5、执行安装指令。如下指令,在一个物理机上建立两个solr实例。建议将每个实例的数据文件目录放在不同硬盘下。

sudo ./install_solr_service.sh ../../solr-7.3.1.tgz -i /srv/BigData/hadoop/solr1 -d /srv/BigData/hadoop/solr1/solr_data -u solrup -s solr1 -p 9983 -n

sudo ./install_solr_service.sh ../../solr-7.3.1.tgz -i /srv/BigData/hadoop/solr2 -d /srv/BigData/hadoop/solr2/solr_data -u solrup -s solr2 -p 9984 -n

6、修改 /etc/default/solr1.in.sh,改变部分参数。
----HDFS版本
SOLR_JAVA_MEM="-Xms16g -Xmx16g -Dsolr.directoryFactory=HdfsDirectoryFactory -Dsolr.lock.type=hdfs -Dsolr.hdfs.home=hdfs://10.127.60.1:8020/solr7 -XX:MaxDirectMemorySize=20g -Dsolr.autoSoftCommit.maxTime=-1 -Dsolr.autoCommit.maxTime=-1 -XX:+UseLargePages -Dsolr.hdfs.blockcache.slab.count=100"
ZK_HOST="10.127.60.2,10.127.60.3,10.127.60.4/solr7"

---本地
SOLR_JAVA_MEM="-Xms16g -Xmx16g -Dsolr.autoSoftCommit.maxTime=-1 -Dsolr.autoCommit.maxTime=-1 -XX:+UseLargePages"

ZK_HOST="10.127.60.2,10.127.60.3,10.127.60.4/solr7"

SOLR_JAVA_MEM="-Xms32g -Xmx32g -Dsolr.autoSoftCommit.maxTime=-1 -Dsolr.autoCommit.maxTime=-1 -XX:+UseLargePages"

ZK_HOST="10.127.60.2,10.127.60.3,10.127.60.4/solr7"
7、创建collection

---在zookeeper上传配置文件
bin/solr zk upconfig -z 10.127.60.2,10.127.60.3,10.127.60.4:2181 -n mynewconfig -d /path/to/configset

–参考资料:https://lucene.apache.org/solr/guide/7_3/solr-control-script-reference.html#solr-control-script-reference

./solr create -c picc_bigdata -d /zp_test/solor7/conf -n picc_bigdata -s 6

8、如果将数据温江放在HDFS上,solrup用户需要有对应的权限。在HDFS上创建目录 /solr7,同时赋予solrup用户所有权。 hdfs dfs -chown solrup /solr7

Solr索引建立优化

测试环境的硬件情况:四台pc,20颗2核,内存256G,11盘1.4T。带宽为千兆网络,最快传输速度为140MB/S。

测试情况结论:

1、Solr在使用HDFS索引的建立速度远不如本地索引。但是我们在其它公司使用的都是HDFS索引,不至于这么慢。最有可能的情况是网络传输速度不够,因为在其它公司都是使用的双万兆网卡。

2、增加Solr的实例数量,可以增加Solr的分片数量,提高写入速度。

3、每行的数据量越大,也减慢写入速度,毕竟一行的数据字段数越多,一行数据的数据量越大。

4、由于我们只使用批量导入,不是会设计到近实时索引。所以我们禁用了soft commit。而且只在批量导入完成的时候,执行一次hard commit。这样做可以极大的降低solr的压力。因为每一次hard commit或者达到soft commit的阀值的时候,就会触发solr的索引handler更替,以及一系列动作。

禁用软提交的方法:

修改 /etc/default/solr1.in.sh,设置参数-Dsolr.autoSoftCommit.maxTime=-1 -Dsolr.autoCommit.maxTime=-1;

----HDFS版本
SOLR_JAVA_MEM="-Xms16g -Xmx16g -Dsolr.directoryFactory=HdfsDirectoryFactory -Dsolr.lock.type=hdfs -Dsolr.hdfs.home=hdfs://10.127.60.1:8020/solr7 -XX:MaxDirectMemorySize=20g -Dsolr.autoSoftCommit.maxTime=-1 -Dsolr.autoCommit.maxTime=-1 -XX:+UseLargePages -Dsolr.hdfs.blockcache.slab.count=100"

---本地
SOLR_JAVA_MEM="-Xms16g -Xmx16g -Dsolr.autoSoftCommit.maxTime=-1 -Dsolr.autoCommit.maxTime=-1 -XX:+UseLargePages"

SOLR_JAVA_MEM="-Xms32g -Xmx32g -Dsolr.autoSoftCommit.maxTime=-1 -Dsolr.autoCommit.maxTime=-1 -XX:+UseLargePages"

从HIVE导入数据到HBase

从hive到hbase导入数据,最快的方式是使用bulkload。这种方式正好已经被hive实现了,可以从这个链接中找到原文解释。https://cwiki.apache.org/confluence/display/Hive/HBaseBulkLoad

整个代码非常简单,只有如下几步:

1、创建映射表。
CREATE TABLE new_hbase_table(rowkey string, x int, y int)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,cf:x,cf:y”);
SET hive.hbase.bulk=true;
INSERT OVERWRITE TABLE new_hbase_table
SELECT rowkey_expression, x, y FROM …any_hive_query…;
上面的代码只是先建表,然后再将数据插入到HBASE中。官方文档对上面的代码所做的背后处理,进行了简单的解释。
1、Decide how you want the data to look once it has been loaded into HBase.
决定hbase中的数据格式和模式。
2、Decide on the number of reducers you’re planning to use for parallelizing the sorting and HFile creation. This depends on the size of your data as well as cluster resources available.
根据数据量大小和集群可用资源,决定reucer数量。
3、Run Hive sampling commands which will create a file containing “splitter” keys which will be used for range-partitioning the data during sort.
计算预分区键。
4、Prepare a staging location in HDFS where the HFiles will be generated.
准备HFile的存放路径
5、Run Hive commands which will execute the sort and generate the HFiles.
(Optional: if HBase and Hive are running in different clusters, distcp the generated files from the Hive cluster to the HBase cluster.)
执行HIVE命令,生成HFile。
6、Run HBase script loadtable.rb to move the files into a new HBase table.
(Optional: register the HBase table as an external table in Hive so you can access it from there.)
将HFile装载入HBase表。
大致为以上步骤。建议看下官方文档,查阅每步的具体藐视。
但是经过性能测试后,实际的导入性能,只做到了每分钟100万的导入量。后来我尝试将原来的HIVE表做成parquet+snappy压缩,使spark的task数从513降低到了70,但是性能还是每分钟100万。说明这个性能是卡在HBASE。
—————————————————————————-
后来发现在导入的时候实际上,最好要提前在HBASE做预分区。虽然HIVE的官方指南中说明了它确实做了预分区,但是我发现CDH5.13应该是没有做。下图是在执行插入时,通过检测hbase的表的分区信息截图:
scan ‘hbase:meta’,{FILTER=>”PrefixFilter(‘testzx.h_cust_cont_zx’)”}
为什么要做预分区呢?我记得官方解释说是,做了预分区后,导入一开始的时候压力负载就会被打散。不会造成压力会集中。
除了预分区,另外一种有可能的时候数据倾斜。数据倾斜也会导致部分region server压力过大。
了解以上后,改造代码。我将rowkey做了md5,然后按照md5进行了预分区。参考资料:https://stackoverflow.com/questions/28165833/how-can-i-pre-split-in-hbase
1、在HBase中建立预分区表
MD5开头只有0,1,2,3,4,5,6,7,8,9,a,b,c,d,e,f,我有6个region server,分区信息如下:
create ‘h_cust_cont_zx’, ‘cf’,SPLITS => [‘3′,’6′,’9′,’c’,’e’]
2、然后再在HIVE中用外表映射到已建立好的HBase表中:
–这两个指令不知道有没有用
SET hive.hbase.bulk=true;
set hive.hbase.wal.enabled=false;
CREATE EXTERNAL TABLE h_cust_cont_zx(rk string,
ecif_id string,
customer_id string)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,cf:ecif_id
,cf:customer_id”)
TBLPROPERTIES(“hbase.table.name” = “h_cust_cont_zx”,”hbase.mapred.output.outputtable” = “h_cust_cont_zx”);
INSERT OVERWRITE TABLE h_cust_cont_zx
select md5(concat(ecif_id,customer_id)),* from t_r_cust_cont0720_snappy;
最后的导入性能是1.5小时3亿,每分钟333万。
—————————————————————————-
接着我追加了一个region server,将分区提升到了7个后。再进行导入,执行速度为每分钟接近700万。
使用了预分区,只是是负载在一开始的时候,使分区较为均衡。但是最终还是会进行自动分区。
上面图可以看见,虽然预分区7个。但是最终还是重分区了。

hive 分桶

1、hive分桶可以加速JOIN速度。
2、hive可以独立分桶,也可以先分区,再进行分桶。

参考资料:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL+BucketedTables

3、可以执行抽样查询:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Sampling

4、其他优点可以参考:

https://www.qubole.com/blog/5-tips-for-efficient-hive-queries/

 

首先创建表,这里创建的是先分区再分桶。其实我们是可以不先分区,直接分桶,具体根据你的需求:

CREATE TABLE user_info_bucketed(user_id BIGINT, firstname STRING, lastname STRING)
COMMENT 'A bucketed copy of user_info'
PARTITIONED BY(ds STRING)
CLUSTERED BY(user_id) INTO 256 BUCKETS;

这时候可以在spark的任务栏里面看见,多了一个stage,这个stage的任务数正好等于分桶数256.这时我们再检查HDFS中该表目录下的数据文件数量正好也等于256个。

然后进行表插入。

set hive.enforce.bucketing = true; -- (Note: Not needed in Hive 2.x onward)
FROM user_id
INSERT OVERWRITE TABLE user_info_bucketed
PARTITION (ds='2009-02-25')
SELECT userid, firstname, lastname WHERE ds='2009-02-25';

分桶后,可以再HDFS中看到该表目录下存在多个子文件目录:

——————————————————————–

—-不启用bucket join

select *
from ods.p_t_contract_master m
left outer join ods.p_t_customer c on m.applicant_id = c.customer_id
limit 100;

2018-08-28 17:34:06,160 Stage-4_0: 192(+174)/735 Stage-5_0: 0/1099
Status: Finished successfully in 660.92 seconds

select *
from ods.p_t_contract_master m
left outer join t_customer_bucket c on m.applicant_id = c.customer_id
limit 100;

2018-08-28 17:49:00,042 Stage-6_0: 472/472 Finished Stage-7_0: 0(+180)/1099
Time taken: 728.227 seconds, Fetched: 100 row(s)

select *
from t_contract_master_bucket m
left outer join ods.p_t_customer c on m.applicant_id = c.customer_id
limit 100;
2018-08-28 17:55:06,944 Stage-8_0: 0(+192)/808 Stage-9_0: 0/1099
Time taken: 845.256 seconds, Fetched: 100 row(s)

select *
from t_contract_master_bucket m
left outer join t_customer_bucket c on m.applicant_id = c.customer_id
limit 100;

2018-08-28 18:09:32,400 Stage-10_0: 0(+192)/545 Stage-11_0: 0/1099
Time taken: 707.371 seconds, Fetched: 100 row(s)

——-启用bucket join
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;

select *
from ods.p_t_contract_master m
left outer join t_customer_bucket c on m.applicant_id = c.customer_id
limit 100;

select *
from t_contract_master_bucket m
left outer join ods.p_t_customer c on m.applicant_id = c.customer_id
limit 100;

select *
from t_contract_master_bucket m
left outer join t_customer_bucket c on m.applicant_id = c.customer_id
limit 100;

2018-08-28 18:21:32,583 Stage-12_0: 0(+8)/472 Stage-13_0: 0/1099
Status: Finished successfully in 661.10 seconds

hive parquet表启用压缩

1、首先表必须是要确保是parquet表,之后的语句才会有效。因为是对hive中的parquet表进行压缩。

2、
SET parquet.compression=SNAPPY;
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;

该设置其实只对插入数据有效,对查询是默认有效的。

CREATE TABLE t_r_cust_cont0720_snappy(
ecif_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET
TBLPROPERTIES ("parquet.compression"="SNAPPY"); ---表级的压缩

3、检查压缩是否有效
desc formatted t_r_cust_cont0720_snappy;

压缩后的表:

4、经过计算压缩比为:1:6.压缩后的数据为原来的六分之一。

————————————————————————————
虽然从数据文件上看来是进行了压缩,但是不知道为什么里面的compressd依然为NO。


这里面我之前有个错误。没有分清totaldatasize和rawdatasize。简单理解就是rawdatasize是原始文件大小,即压缩前;totaldatasize是实际存储文件大小,即压缩后。

下面对一张表进行压缩前的情况

下面是压缩后的情况

比较明显的就是使用snappy压缩后文件数量降低了,同时totaldatasize降低了一倍,但是rawdatasize一致。同时compresed始终为NO,只能通过parquet.compression为Snappy辨别表是否为压缩表。

接着我使用了压缩表进行脚本测试。性能和压缩前几乎一致,同时还减少了存储空间。

blocksize为128m 不压缩 23mins

blocksize为128m 只压缩新表 26mins

blocksize为128m 压缩 24mins