Solr查询调优一: query VS filterquery 区别

Solr有两个查询参数,分别是query(q)和filterquery(fq)。官方文档没有写清楚两者之间具体有什么区别。

fq的官方文档这样写着:https://lucene.apache.org/solr/guide/7_3/common-query-parameters.html#fq-filter-query-parameter

The fq parameter defines a query that can be used to restrict the superset of documents that can be returned, without influencing score. It can be very useful for speeding up complex queries, since the queries specified with fq are cached independently of the main query. When a later query uses the same filter, there’s a cache hit, and filter results are returned quickly from the cache.
When using the fq parameter, keep in mind the following:

  • The fq parameter can be specified multiple times in a query. Documents will only be included in the result if they are in the intersection of the document sets resulting from each instance of the parameter. In the example below, only documents which have a popularity greater then 10 and have a section of 0 will match.fq=popularity:[10 TO *]&fq=section:0
  • Filter queries can involve complicated Boolean queries. The above example could also be written as a single fq with two mandatory clauses like so:fq=+popularity:[10 TO *] +section:0
  • The document sets from each filter query are cached independently. Thus, concerning the previous examples: use a single fq containing two mandatory clauses if those clauses appear together often, and use two separate fq parameters if they are relatively independent. (To learn about tuning cache sizes and making sure a filter cache actually exists, see The Well-Configured Solr Instance.)
  • It is also possible to use filter(condition) syntax inside the fq to cache clauses individually and – among other things – to achieve union of cached filter queries.
  • As with all parameters: special characters in an URL need to be properly escaped and encoded as hex values. Online tools are available to help you with URL-encoding. For example: http://meyerweb.com/eric/tools/dencoder/.

fq和q虽然不太好区分,但是能明确区分出两者的差别,对性能提升很高。两者的主要区别如下:
1、q又叫main query,fq全程filter query;
2、相关性评分
fq只有一个用途:就是查询出满足条件的文档。q有两个用途:1、查询出满足条件的文档;2、对返回的文档针对搜索关键字进行相关性评分。因此可以这样使用两者:将q看成一个特殊的filter,仅会多一步相关性评分。所以可以将用户搜索的关键词放入q中,这样可以根据用户的搜索给出相关性最高的文档,例如keyword=apache solr,同时将用户下拉选择的枚举字段放入fq参数中,例如category=techonology。
3、缓存和执行速度
将filter query 从main query中分离出来,有两个目的:
1、filter query 可以使用 filter query cache。
2、filter query 不进行开销巨大的相关性评分,加快执行速度。
4、可以指定多fq,但是只能有一个q
5、执行顺序
到底是fq先执行,还是q执行,看了很多文档,各执一词。但是solr in action的答案比较靠谱,执行顺序还是要看具体情况。

1 、每一个fq参数都会首先到filter cache中查询文档是否存在。
2、如果fq参数没有在 filter cache 找到,就会检索索引文件,并将检索到docset放入缓存中。
3、所有filter的docset进行取交集,最终生成一个唯一的docset。
4 、The q parameter is passed in (along with the filter DocSet) to be executed as a
Lucene query. When executing the query, Lucene plays leapfrog between the
query and combined filters, advancing both the query and filter results objects
to their next present internal ID (an integer). When both the query result and
filter result objects contain the same ID, that ID is collected, a process that
includes generating the relevancy score for the document
这段我翻译的不太清楚。意思大概是将q查出来的结果和前面filter的结果进行交集,最后为交集的每一个结果计算相关性评分。
5、执行post filter

参考资料:
1、solr in action

Solr优化一:部署调整

我们有一张大宽表,数据量大概在20亿左右,100多个字段,HDFS中不算复制因子,原始数据文件差不多1.8TB。这算是一个较大的宽表了。
大数据集群节点大概20台.CDH默认只能在一个节点上安装一个SOLR实例。由于其版本过低,不满足我们的功能要求。改为了独立部署当时最新的SOLR版本V7.3.1。

第一种方案:
由于每个物理机上留给SOLR的内存只有60GB,所以一开始,我们在每个节点上部署了3个实例,每个SOLR实例 60GB/3 =20GB。这样部署有以下几个问题:
1、SOLR普通检索过慢,经常需要10S到几十秒不等。
2、SOLR的distinct分析语句经常导致 SOLR实例OOM。
3、SOLR实例挂掉后,无法自动重启。
4、索引建立还行,每分钟接近1000万。

第二种方案:
从大数据集群中独立出6台物理机,专门用作SOLR集群。扣掉HBASE占用的50GB内存,操作系统50GB,剩下的150GB留给SOLR使用。每个节点上部署5个实例,摊到每个实例上150GB/5=30GB。部署后结果:
1、SOLR普通检索速度提升,时间变为1秒到3秒之内。
2、SOLR的distinct分析语句偶尔导致 SOLR实例OOM 。但是SOLR能够自动重启。
3、 SOLR的distinct分析语句执行时间较长,大致在3.3分钟左右。
4、索引建立极慢,20分钟才能建完400万数据。目前未探明是卡在网络还是磁盘?磁盘应该不会,因为每个实例都在一块独立的磁盘。

个人认为OOM的原因是因为内存不足,没有给SOLR足够的缓存空间吧。因为在SOLR实例中看到系统的物理内存始终为250.6G,差一点256GB,濒临崩溃的边缘。但是CDH的监控,只有146GB,和SOLR的监控不一致。SOLR的HEAP-SPACE,32GB只用到了8GB.

dynamic partition搬表 out of memory

在想启用dynamic partition来偷懒搬表的情况时,如果表的数据过大(例如我在六个节点上搬迁2亿数据)时,就会经常OOM。

样例SQL:

set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions= 1000;
set hive.exec.max.dynamic.partitions.pernode=1000;

CREATE TABLE B (
NAME STRING
)
partitioned BY (NAME2 string,NAME3 string,NAME4 string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
STORED AS PARQUET;

INSERT INTO TABLE B
PARTITION(NAME2,NAME3,NAME4)
select NAME2,NAME3,NAME4
FROM A;

我想将A表的数据进行分区,并导入B表。最简单的方式就启用动态分区。可是执行上面的SQL时,报出OOM错误。一般碰到OOM,我第一反应就是提高heap space。

set spark.executor.memory=25G;

结果还是一样OOM。难道是内存还不够吗?24G–>30G–>48GG。。还是不行。

最后找到一篇文章,启用hive.optimize.sort.dynamic.partition即可;

SET hive.optimize.sort.dynamic.partition=true

针对该参数和对应问题的解释:
该问题出现的原因是太多的文件同时进入写入。当启用hive.optimize.sort.dynamic.partition后面,动态分区列就会进行全局排序,这样REDUCE端,每个动态分区列的值只会有一个文件写入,从而REDUCE阶段降低了内存使用。

hive.optimize.sort.dynamic.partition
  • Default Value: true in Hive 0.13.0 and 0.13.1; false in Hive 0.14.0 and later (HIVE-8151)
  • Added In: Hive 0.13.0 with HIVE-6455

When enabled, dynamic partitioning column will be globally sorted. This way we can keep only one record writer open for each partition value in the reducer thereby reducing the memory pressure on reducers.
个人感觉就是在内部使用distribute by。

参考文章:
1、https://cloud.tencent.com/developer/article/1079007
2、https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
3、https://stackoverflow.com/questions/33147764/hive-setting-hive-optimize-sort-dynamic-partition

HIVE 几个 by的区别

参考资料:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy

Order By:
1、在hive.mapred.mode=strict模式下,order by 后面必须加limit。因为不加的话,order by会对所有的数据进行排序。而要进行全局排序,只能使用一个Reducer进行排序,如果结果集数据量异常大的话,速度异常缓慢。

Sort by VS Order by

1、Sort by是将在一个Reducer中的数据进行局部排序,如果有多Reducer,那么结果集只能是局部排序的。而Order By是所有Reducer汇总到一个Reducer进行全局排序。

Cluster By VS Distribute By

1、Cluster By 只是 Distribute By 和Sort By 的简写。即:Cluster By 字段A ==Distribute By 字段A Sort By 字段A。那么既然这样,为什么还要Distribute By和Sort By呢?那就是当Distribute By和Sort By字段不一致的时候就需要了。例如Distribute By 字段A Sort By 字段B 字段C。
2、Cluster By 因为包含Sort by,所以在每一个Reducer中的数据是有序的。
3、Distribute By 是无序的。
4、当Distribute By和Sort By的字段不一致时,就无法使用Cluster By。

hive dynamic partition 性能 优化

今天正在做分区合并的时候,需要初始化一部分测试数据,数据量共计有34亿,每10个分片的数据量合计在4亿左右,所以想偷懒直接使用动态分区来实现:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
) t;

在执行这段代码的时候,完全跑不动,直接就报错了。报错为JAVA HEAP SPACE OOM。
我也不明白什么原因,认为是数据过多,然后我改进SQL进行分批跑,但是结果还是一样,依然是JAVA HEAP SPACE OOM

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t ;

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn >= 10 and RN_T.rn < 20
) t ;

看了下执行计划,额,确实也看不懂。只能面向搜索编程了。
经过一番搜索,都说要在动态分区后面加一个 distribute by。大致意思是,在前面的SQL中,执行动态分区的时候,每一个mapper内部都要进行排序,而用了distribute by后,就不会排序了。

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime;

追加了DISTRIBUTE BY后,不报错了。但是执行速度确是异常缓慢,处理10行就要将进半小时,780/10= 78条,这个时间难以接受。
在不断的搜索中,接近真相。。。参考文章:https://qubole.zendesk.com/hc/en-us/articles/214885263-HIVE-Dynamic-Partitioning-tips
文章里面提到:
1、使用DISTRIBUTE BY 分区字段,存在一个问题:使用DISTRIBUTE BY 分区字段后,每一个分区只有一个mapper,这样如果有的分区数据量过大,就会数据倾斜,执行缓慢。而我这个例子,明明集群资源足够,却没有充分利用。集群可以启动200个mapper,但是由于SQL只有10个分区,最终只有10个mapper。虽然每个分区最大2000万数据量,但是速度还是很慢,大致半小时左右。
2、DISTRIBUTE BY 分区字段,将每个分区的数据写入一个文件中。前面10个分区,就会有10个文件。

文中提出一个修改方案:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime,LIST_ID;

DISTRIBUTE BY 两个字段,第一个是分区字段,第二个是高基数的维度,这里我选择的是该表唯一主键。
整个执行速度很快,大致两分钟左右。通过DISTRIBUTE BY 两个字段,而且第二个字段是唯一主键,所有数据均匀的分在了每一个mapper中。
但是这里需要注意一个问题:
因为所有的数据均匀打散到每一个Mapper中,所以每一个mapper中就有10个分区的数据,这样每一个mapper就要将写入10个文件中。假设有400个mapper,设为M,10个分区,设为P,那么就会有 M*P=4000个数据文件。这样就会有太多文件,而且文件比较小。要想控制这个结果,可以人为的设置mapper数量。

set mapred.reduce.tasks=200;

PS:
1、如果两张表的分区键是一致的,那么动态分区插入是很快。因为是直接进行了文件拷贝。
2、在CDH的hive执行SQ来完成后,觉得在执行自动合并小文件,原来应该生成了200*700多个文件,紧接着又执行了些任务,最终文件数为700多个。具体还需要抽空去研究下。

solr 分片与复制

在没有使用solrcloude的时候,可以使用如下的架构图进行分片与复制:

具体可以参考solr的官方指南:Combining Distribution and Replication  章节 。简单来说就是一主两从。从片的同步方式,我觉得应该是通过从主片同步快照的方式,来实现的。

Snapshot :A directory containing hard links to the data files of an index. Snapshots are distributed from the master nodes when the slaves pull them, “smart copying” any segments the slave node does not have in snapshot directory that contains the hard links to the most recent index data files.

在solr cloud里面,这种方式主要是用来进行备份使用。其实我个人觉得这种快照的复制方式应该是用来快速备份的,不应该是用来进行主从分片同步用的,更可能用的是PULL索引文件的形式。

Solr cloud提供了三种分片模式,分别为:

NRT: This is the default. A NRT replica (NRT = NearRealTime) maintains a transaction log and writes new documents to it’s indexes locally. Any replica of this type is eligible to become a leader. Traditionally, this was the only type supported by Solr.

TLOG: This type of replica maintains a transaction log but does not index document changes locally. This type helps speed up indexing since no commits need to occur in the replicas. When this type of replica needs to update its index, it does so by replicating the index from the leader. This type of replica is also eligible to become a shard leader; it would do so by first processing its transaction log. If it does become a leader, it will behave the same as if it was a NRT type of replica.

PULL: This type of replica does not maintain a transaction log nor index document changes locally. It only replicates the index from the shard leader. It is not eligible to become a shard leader and doesn’t participate in shard leader election at all.

这三种模式的主要分别是,NRT可以做主片,可以使用近实时索引(支持SOFT COMMIT),同步索引靠数据转发;TLOG也可以做主片,当为主片是和NRT一致,不能近实时索引,从片需要和主片同步的时候,只是从从片同步索引文件;PULL不能做主片,仅从主片同步索引文件。
创建分片的时候,副本默认使用的NRT模式。

Solr cloud可推荐使用的分片组合方式:

1、全部NRT:适用于小到中级的集群;更新吞吐量不太高的大型集群;

2、全部TLOG:不需要 实时索引;每一个分片的副本数较多;同时需要所有分片都能切换为主片;

3、TLOG+PULL:不需要 实时索引;每一个分片的副本数较多;提高查询能力,能够容忍短时的过期数据。

我们做了个测试:solr7.3,16个物理节点,每个节点3个实例,每个实例20G,需要索引的数据量为一亿,160个spark executors,一主两从(注:我们不需要NRT特性,我们是夜间批量)

1、全部NRT需要20分钟,大概每分钟600-800万。

2、全部TLOG需要10分钟,大概每分钟10000万。

3、TLOG+PULL:未测

PS:solr创建索引,可以先使用solr自带的zk脚本工具中uploadconfig方法上传配置文件,再使用solr的collection api里面的createcollection方法创建。

一份参考资料:

下面参考文章的原文地址:https://berlinbuzzwords.de/sites/berlinbuzzwords.de/files/media/documents/replicatypes-berlinbuzzwords.pdf

其中有启发的一张图:

CDH 上 进行hbase bulkload

在从HIVE导入全量数据到HBASE的时候,我们使用SQL的方式是insert into table hbase_a select * from A.但是这种方式在17个数据节点上速度很慢,单表5亿数据需要50分钟,即便我们做了预分区。一共有五张表大表需要导入,时间难以忍受。

我们的ROWKEY都是做了MD5的,所以预分区很好分,我们17个REGION SERVER,分了17个区(我记得哪里推荐一个REGION SERVER 10个分区)。最终导入完毕后,检查分区数,远高于这个值。

后来发现HBASE全量导入有BULKLOAD这样的方法,性能很快。以后流程主要参考两篇文章:
1、https://cwiki.apache.org/confluence/display/Hive/HBaseBulkLoad
2、https://community.cloudera.com/t5/Storage-Random-Access-HDFS/HBase-slow-bulk-loading-using-Hive/td-p/43649

首先要在HIVE上进行配置,在hive辅助JAR目录,配置这个路径。我这里设置的路径为hive用户的home目录。

这个配置是针对HIVESERVER2的,如果你有多个HIVESERVER2实例,每个实例的目录文件需要一致的。配置这个目录后,就需要拷贝多个JAR包到对应的目录下:

sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-client.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-server.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-common.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-protocol.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hive-hbase-handler.jar /user/hive/
sudo -u hdfs hdfs dfs -put -f /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-contrib.jar /user/hive/

赋权操作:

sudo -u hdfs hdfs dfs -chmod 554 /user/hive/.jar
sudo -u hdfs hdfs dfs -chown hive:hive /user/hive/.jar

下面这一大段操作,实际参考了HIVE的原始文章,大致流程:
1、建立存储HBASE分区键的表
2、从原始数据表中抽样出ROWKEY拆分键,它对原始数据进行排序,然后从排序中中找出对应的切分键,作为HBASE的ROWKEY SPLIT KEY。如果要生成17个分区,就要生成16个KEY.最后插入到前面建立的分区键表。

hdfs dfs -rm -r /tmp/hb_range_keys
hdfs dfs -mkdir /tmp/hb_range_keys

beeline -u “jdbc:hive2://” -e “CREATE EXTERNAL TABLE IF NOT EXISTS testzx.hb_range_keys(transaction_id_range_start string) row format serde ‘org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe’ stored as inputformat ‘org.apache.hadoop.mapred.TextInputFormat’ outputformat ‘org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat’ location ‘/tmp/hb_range_keys’;”
beeline -u “jdbc:hive2://” -e ” create temporary function row_sequence as ‘org.apache.hadoop.hive.contrib.udf.UDFRowSequence’; INSERT OVERWRITE TABLE testzx.hb_range_keys SELECT a.id FROM ( SELECT row_sequence() as num, c.id FROM cust_app.app_cust_family c order by c.id) a WHERE ( a.num % ( round( ${total} / 12) ) ) = 0;”

将生成的分区键的数据文件,通过HDFS拷贝到hb_range_key_list目录。

hdfs dfs -rm -r /tmp/hb_range_key_list;
hdfs dfs -cp /tmp/hb_range_keys/* /tmp/hb_range_key_list;

注意:其实我在做这步骤的时候,没有成功。。。后来突然明白,我们的ROWKEY是MD5过后的,其实就是分区键本来就应该是既定的了。完全不需要去抽样出来!!根据这个思路,我直接将我做好的分区数据导入到前面的hb_range_keys表中,就完成了。关于HBASE如何针对字符串型的ROWKEY设计预分区键,可以参考下面:
1、https://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/
2、https://grokbase.com/t/hbase/user/128xr5amhs/md5-hash-key-and-splits
3、http://hbase.apache.org/book.html#important_configurations
4、https://groups.google.com/forum/#!topic/nosql-databases/wQkYBAYRFF0

接下来建立导出HFILE表

hdfs dfs -rm -r /tmp/hbsort;
hdfs dfs -mkdir /tmp/hbsort;
beeline -u “jdbc:hive2://” -e “drop table testzx.hbsort; CREATE TABLE testzx.hbsort (id string,agent_code string,cust_ecif_id string,real_name string,gender string,birthday string,age string,certi_type string,certi_code string,job_id string,job_zh string,relatives_ecif_id string,relatives_real_name string,relatives_gender string,relatives_birthday string,relatives_age string,relatives_certi_type string,relatives_certi_code string,relatives_job_id string,relatives_job_zh string,relation string,policy_num string) STORED AS INPUTFORMAT ‘org.apache.hadoop.mapred.TextInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.hbase.HiveHFileOutputFormat’ TBLPROPERTIES (‘hfile.family.path’ = ‘/tmp/hbsort/cf’);”

导入数据到导出HFILE表。主要设置的reduce task 数量要和前面设计的分区数多1.前面16个区,后面就应该17.


beeline -u “jdbc:hive2://” -e “set mapred.reduce.tasks=17;set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; set mapreduce.totalorderpartitioner.path=/tmp/hb_range_key_list; set hfile.compression=gz; INSERT OVERWRITE TABLE testzx.hbsort select t.* from testzx.test_bulk_load t cluster by t.id;”

执行数据导入到HBASE

sudo -u hdfs hdfs dfs -chgrp -R hbase /tmp/hbsort
sudo -u hdfs hdfs dfs -chmod -R 775 /tmp/hbsort
#下面的环境需要导出,但是应该是SSH SESSION级别的,因为我设置这个变量后。
#CDH的好多变量和这个冲突了
export HADOOP_CLASSPATH=hbase classpath

#制定导入
hadoop jar /opt/cloudera/parcels/CDH/lib/hive/lib/hbase-server.jar completebulkload /tmp/hbsort test:test_bulk_load cf

至此完成。

hive CDH5.13 增量更新数据的方法

我们使用的平台是CDH5.13

HIVE使用ORC格式,并启用事务,是可以做到更新的,但是CDH不建议使用。

在CDH中要么使用impala+kudu,要么你就得想一个办法进行增量更新。而我们采用的就是后者。

我们首先使用的方式是:

1、对每一张表创建一个按天分区的历史拉链表。该表每天对应的增量数据都直接LOAD到该表里面。但是这样就会有一个问题,update的数据就会重复,也没法供后续脚本使用

2、所以第二步,就需要形成一个当日全量最新表。这个步骤比较简单,只是对his表进行distribute by 一个主键 并按 日期排序,只选出同主键的排序第一条数据即可。

过程很简单,但是当我们碰到五亿量级以上的单表数据的时候,一张表一次生成全量最新表的过程就会超过10分钟以上,而且使用的全集群资源,没有并行的可能性。而我们一共有十几张这样的大表,整个执行时间长的令人羞愧。让客户看着觉得,这大数据怎么这么差?!

优化,优化,绞尽脑汁的优化。可惜我能力有限,只能检索。还检索一篇文章,是informatica的一篇文章http://blog.tec13.com/wp-content/uploads/2018/12/1097-StrategiesForIncrementalUpdatesOnHIve-H2L-1.pdf。原来还有两种方式:

1、分区合并:之前使用的distribute by 是对整个表重新生成,实际上是没有必要的。因为很多业务数据会按照一定的规则聚集。我分析了一张的增量数据中存在一个insert_time,其中每天的增量,仅会大致分布在十五内,不管这是哪十五天。这样的话,如果将最新全量表按照insert_time的每天进行分区,那我们每天只需要更新这个十五个分区的数据,而不是整表数据。这个性能飞跃是毋庸置疑的。

其次,我们可以做成一个策略,小于五亿的表还是用distribute by,而大于五亿的部分表再使用前面说的按某一个日期分区。

2、当日全量表建立在hbase:很明显了,hbase是肯定是立即更新的,而且很快。但是有个顾虑,如果将hbase的表用于后续sql脚本进行大量join的话,其性能显然是不足的。除非用的次数不多那还能用这种策略。我也尝试想更新hbase表,再反抽形成一张hive表。可惜性能也是不好,因为这就和distribute by一样,要搬动全表。

后续再补充这篇文章,并给些样例代码。

solr 一主多从模式灌数据总是报错

我们有一张大宽表,大概200个字段,共有20亿左右的数据。在使用spark并行从hive导入Solr的时候,总是会报错,不是 no register leader 就是we are leader这类的错误。

后来搜索后发现这类问题有两类原因:

1、solr 内存过小,导致GC时间过长,从而让zk认为solr已经挂掉了。但是我们给了16G的内存还算不错,同时也检查了solr的gc日志,没有发现有GC时间过长的情况。

2、solr 与Zookeeper的timeout 时间过短。这正是我们的问题所在,我们在安装solr的时候没有指定solr的timeout时间。http://lucene.472066.n3.nabble.com/SOLR-zookeeper-connection-timeout-during-startup-is-hardcoded-to-10000ms-td4403341.html#a4403671

解决方法:

找到/ETC/solr.in.sh,添加配置:

ZK_CLIENT_TIMEOUT=”120000″

同时调整zookeeper的最大会话超时时间为160秒。

13亿表的自关联优化

今天在做一个大表的自关联优化。这张表中共计有13亿的数据量,设表为A,具体字段只有三个

CUST_ID_A 客户A的ID
CUST_ID_B 客户B的ID
RELATION 关系

A.CUST_ID_A 其中一个值的最高重复条数有3万条,这样光这一个CUST_ID_A,如果自关联会预计会有3万*3万=9亿条。

原SQL如下:

select *
from A as left
inner join A as right on left.cust_id_b = right.cust_id_a

这个SQL会执行60分钟左右,无法满足我们的要求。

第一次优化:
我将A表复制成两张表,分别在CUST_ID_A和CUST_ID_B进行分桶,进而想通过分桶JOIN提供更高的JOIN速度。

create table bucket_a(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_A) SORTED BY(CUST_ID_A) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

create table bucket_b(
CUST_ID_A String,
CUST_ID_B String,
RELATION String
)
CLUSTERED BY (CUST_ID_B) SORTED BY(CUST_ID_B) INTO 4096 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

select *
from bucket_b as left
inner join bucket_a as right on left.cust_id_b = right.cust_id_a

可惜效果依然很差,时间和之前一样,没有效果。通过这个,我觉得是因为我把握错关键点了。现在其实是数据倾斜,分桶解决不了数据倾斜,因为同一个ID还是会打入同一个通,依然是串行的。
而我的目标两个方向:
1、优化SQL计算逻辑,不再计算全量,剔除A表中倾斜数据。这个虽然逻辑上想通了可以做剔除出来单独计算,但是短期做不了。
2、目前明确倾斜的数据在执行的时候,是串行的。我们只需要将这个串行的改为并行的就行。

解决方法一:
主要逻辑是在A表增加一个字段part(int),然后将这13亿的数据打散成20份。然后使用union all 聚合。这个过程每个一个union里面的逻辑,都是并行的!整个过程只花了8分钟。

create table parted_a(
CUST_ID_A String,
CUST_ID_B String,
part int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE parted_a
SELECT *,FLOOR(RAND() * 20) --给每一条数据赋予一个20范围内的随机值,这就类似分区了。
FROM A;

--执行手动UNION ALL 20个分片。。。。。。。。。。。。SQL好长好长,但是可以执行
--注意右表还是原来的全量表,这样才能不会造成数据丢失
INSERT INTO TABLE RESULT
SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 1

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 2

UNION ALL

SELECT *
FROM PARTED_A AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A AND left.PART = 3
。。。。。。。


整个过程花了8分钟,最终生成了141亿的数据。。。时间优化成功了,但是数据量还是太大了,最终还是要去优化SQL逻辑

解决方法二:
进行动态分区。可惜没有效果,很奇怪。。哎,和我预计玩不一样。整个执行时间和原来一样。

set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions= 1000;
set hive.exec.max.dynamic.partitions.pernode=1000;
create table dynamic_a(
CUST_ID_A String,
CUST_ID_B String
)
partitioned by (part int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET;

INSERT INTO TABLE dynamic_a PARTITION(part)
SELECT *,FLOOR(RAND() * 500) --分区要在最后一个字段。
FROM A;

INSERT INTO TABLE RESULT
SELECT *
FROM dynamic_a AS left
INNER JOIN A AS right ON left.CUSTOMER_ID_B = right.CUSTOMER_ID_A

原以为能完美的解决,可惜还是最后几个task变得异常慢。看来还是数据倾斜了,得抽空好好研究下HIVE具体怎么执行的了。