HIVE 几个 by的区别

参考资料:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy

Order By:
1、在hive.mapred.mode=strict模式下,order by 后面必须加limit。因为不加的话,order by会对所有的数据进行排序。而要进行全局排序,只能使用一个Reducer进行排序,如果结果集数据量异常大的话,速度异常缓慢。

Sort by VS Order by

1、Sort by是将在一个Reducer中的数据进行局部排序,如果有多Reducer,那么结果集只能是局部排序的。而Order By是所有Reducer汇总到一个Reducer进行全局排序。

Cluster By VS Distribute By

1、Cluster By 只是 Distribute By 和Sort By 的简写。即:Cluster By 字段A ==Distribute By 字段A Sort By 字段A。那么既然这样,为什么还要Distribute By和Sort By呢?那就是当Distribute By和Sort By字段不一致的时候就需要了。例如Distribute By 字段A Sort By 字段B 字段C。
2、Cluster By 因为包含Sort by,所以在每一个Reducer中的数据是有序的。
3、Distribute By 是无序的。
4、当Distribute By和Sort By的字段不一致时,就无法使用Cluster By。

hive dynamic partition 性能 优化

今天正在做分区合并的时候,需要初始化一部分测试数据,数据量共计有34亿,每10个分片的数据量合计在4亿左右,所以想偷懒直接使用动态分区来实现:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
) t;

在执行这段代码的时候,完全跑不动,直接就报错了。报错为JAVA HEAP SPACE OOM。
我也不明白什么原因,认为是数据过多,然后我改进SQL进行分批跑,但是结果还是一样,依然是JAVA HEAP SPACE OOM

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t ;

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn >= 10 and RN_T.rn < 20
) t ;

看了下执行计划,额,确实也看不懂。只能面向搜索编程了。
经过一番搜索,都说要在动态分区后面加一个 distribute by。大致意思是,在前面的SQL中,执行动态分区的时候,每一个mapper内部都要进行排序,而用了distribute by后,就不会排序了。

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime;

追加了DISTRIBUTE BY后,不报错了。但是执行速度确是异常缓慢,处理10行就要将进半小时,780/10= 78条,这个时间难以接受。
在不断的搜索中,接近真相。。。参考文章:https://qubole.zendesk.com/hc/en-us/articles/214885263-HIVE-Dynamic-Partitioning-tips
文章里面提到:
1、使用DISTRIBUTE BY 分区字段,存在一个问题:使用DISTRIBUTE BY 分区字段后,每一个分区只有一个mapper,这样如果有的分区数据量过大,就会数据倾斜,执行缓慢。而我这个例子,明明集群资源足够,却没有充分利用。集群可以启动200个mapper,但是由于SQL只有10个分区,最终只有10个mapper。虽然每个分区最大2000万数据量,但是速度还是很慢,大致半小时左右。
2、DISTRIBUTE BY 分区字段,将每个分区的数据写入一个文件中。前面10个分区,就会有10个文件。

文中提出一个修改方案:

INSERT INTO TABLE TEST_PRODUCT_FEE PARTITION( part_inserttime )
SELECT *
FROM (
select T1.*
from TEST_PRODUCT_FEE_FULL_tmp_2 T1
INNER JOIN part_inserttime_RN RN_T ON (T1.part_inserttime = RN_T.part_inserttime)
WHERE RN_T.rn <10
) t DISTRIBUTE BY part_inserttime,LIST_ID;

DISTRIBUTE BY 两个字段,第一个是分区字段,第二个是高基数的维度,这里我选择的是该表唯一主键。
整个执行速度很快,大致两分钟左右。通过DISTRIBUTE BY 两个字段,而且第二个字段是唯一主键,所有数据均匀的分在了每一个mapper中。
但是这里需要注意一个问题:
因为所有的数据均匀打散到每一个Mapper中,所以每一个mapper中就有10个分区的数据,这样每一个mapper就要将写入10个文件中。假设有400个mapper,设为M,10个分区,设为P,那么就会有 M*P=4000个数据文件。这样就会有太多文件,而且文件比较小。要想控制这个结果,可以人为的设置mapper数量。

set mapred.reduce.tasks=200;

PS:
1、如果两张表的分区键是一致的,那么动态分区插入是很快。因为是直接进行了文件拷贝。
2、在CDH的hive执行SQ来完成后,觉得在执行自动合并小文件,原来应该生成了200*700多个文件,紧接着又执行了些任务,最终文件数为700多个。具体还需要抽空去研究下。

solr 分片与复制

在没有使用solrcloude的时候,可以使用如下的架构图进行分片与复制:

具体可以参考solr的官方指南:Combining Distribution and Replication  章节 。简单来说就是一主两从。从片的同步方式,我觉得应该是通过从主片同步快照的方式,来实现的。

Snapshot :A directory containing hard links to the data files of an index. Snapshots are distributed from the master nodes when the slaves pull them, “smart copying” any segments the slave node does not have in snapshot directory that contains the hard links to the most recent index data files.

在solr cloud里面,这种方式主要是用来进行备份使用。其实我个人觉得这种快照的复制方式应该是用来快速备份的,不应该是用来进行主从分片同步用的,更可能用的是PULL索引文件的形式。

Solr cloud提供了三种分片模式,分别为:

NRT: This is the default. A NRT replica (NRT = NearRealTime) maintains a transaction log and writes new documents to it’s indexes locally. Any replica of this type is eligible to become a leader. Traditionally, this was the only type supported by Solr.

TLOG: This type of replica maintains a transaction log but does not index document changes locally. This type helps speed up indexing since no commits need to occur in the replicas. When this type of replica needs to update its index, it does so by replicating the index from the leader. This type of replica is also eligible to become a shard leader; it would do so by first processing its transaction log. If it does become a leader, it will behave the same as if it was a NRT type of replica.

PULL: This type of replica does not maintain a transaction log nor index document changes locally. It only replicates the index from the shard leader. It is not eligible to become a shard leader and doesn’t participate in shard leader election at all.

这三种模式的主要分别是,NRT可以做主片,可以使用近实时索引(支持SOFT COMMIT),同步索引靠数据转发;TLOG也可以做主片,当为主片是和NRT一致,不能近实时索引,从片需要和主片同步的时候,只是从从片同步索引文件;PULL不能做主片,仅从主片同步索引文件。
创建分片的时候,副本默认使用的NRT模式。

Solr cloud可推荐使用的分片组合方式:

1、全部NRT:适用于小到中级的集群;更新吞吐量不太高的大型集群;

2、全部TLOG:不需要 实时索引;每一个分片的副本数较多;同时需要所有分片都能切换为主片;

3、TLOG+PULL:不需要 实时索引;每一个分片的副本数较多;提高查询能力,能够容忍短时的过期数据。

我们做了个测试:solr7.3,16个物理节点,每个节点3个实例,每个实例20G,需要索引的数据量为一亿,160个spark executors,一主两从(注:我们不需要NRT特性,我们是夜间批量)

1、全部NRT需要20分钟,大概每分钟600-800万。

2、全部TLOG需要10分钟,大概每分钟10000万。

3、TLOG+PULL:未测

PS:solr创建索引,可以先使用solr自带的zk脚本工具中uploadconfig方法上传配置文件,再使用solr的collection api里面的createcollection方法创建。

一份参考资料:

下面参考文章的原文地址:https://berlinbuzzwords.de/sites/berlinbuzzwords.de/files/media/documents/replicatypes-berlinbuzzwords.pdf

其中有启发的一张图: