hive multiple inserts 一次扫描多次插入

hive multiple inserts可以仅执行一次扫描,将这次扫描的数据分别根据不同的条件插入到多张表或者同一张表的不同分区中。但是不能插入同一张表!

Hive extension (multiple inserts):
FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1
[INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]
[INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;

这个语法可以降低扫描次数,如果分开多次插入,就会需要扫描多次。如果写成multiple inserts就只会扫表一次。官网解释如下:

Multiple insert clauses (also known as Multi Table Insert) can be specified in the same query.

Multi Table Inserts minimize the number of data scans required. Hive can insert data into multiple tables by scanning the input data just once (and applying different query operators) to the input data.

以下是执行计划截图:


测试情况:在20亿的数据中,执行扫描插入到六张表,只花了六分钟。

SSL服务器的url重写,导致系统登录后跳转到http协议

首先描述下我们的部署架构: 在F5-#1上配置虚拟服务器VS-#1以及负载均衡池(包含10.1.1.1、10.1.1.2),并将VS-#1 443端口的https请求解密为http请求后转发至池内服务器80端口。

用户通过浏览器访问系统的时候,是使用https访问到ssl服务器,然后ssl服务器使用url重写,将请求报文修改为http协议,并负载均衡转发到后台服务器。这样就造成一个问题:后台tomcat容易并不知道自己需要使用https,只知道自己是使用的是http。也是就说:httpservletrequest.getSchema()得到的是http,而不是https,所以这样就导致spring security的重定向到系统首页的url地址是http协议,而不是https协议。

解决方案:

让tomcat知道自己位于proxy(h5负载均衡)后面。在tomcat的sever.xml里面的connector里面生命proxy的相关信息,包括proxyName,proxyPort,schema,secure等等。参考网址:http://www.thecodingforums.com/threads/load-balancing-an-https-java-web-application-in-tomcat.145712/

proxyName=”https://www.test.com” proxyPort=”443″ scheme=”https” secure=”true”

kafka cdh kerberos 安全认证后,使用java程序demo

折腾了两天,终于在启用了kerberos安全认证的cdh5.13上,完成了java版的生产者,消费者DEMO.一种功德无量的感觉油然而生。

生产者代码:

消费者代码:

配置文件:


代码看起来非常简单,但是从0开始会有好多问题。以下是问题记录:

1、Cannot locate default realm)

解决方法:
因为没有在krb5.conf文件里面没有加上default_realm。加上就好了。。

[libdefaults]
default_realm = test.COM

2、The group coordinator is not available
消费者程序,在启动后一直报上面的错误信息。后来检索后,hdp对问题的解释如下:

参考网站:https://community.hortonworks.com/content/supportkb/175137/error-the-group-coordinator-is-not-available-when.html

Cause:
When using bootstrap-server parameter, the connection is through the Brokers instead of Zookeeper. The Brokers use __consumer_offsets to store information about committed offsets for each topic:partition per group of consumers (groupID). In this case, __consumer_offsets was pointing to invalid Broker IDs. Hence, the above the exception was displayed. To check if the Broker IDs are correct for this topic, execute the following command:

所以我根据上面的提示,删除了对应的__consumer_offsets,没有任何改动,程序就成功了。
操作指令: 
rmr /kafka/brokers/topics/__consumer_offsets

3、在程序开发的时候,务必把log4j的日志级别设置为debug。因为很多异常信息,发现只有在log4j中打印出来,整个程序死活都不会抛出异常。

4、这个问题有点恶心,日志报错为:Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) – LOOKING_UP_SERVER)]

Caused by: KrbException: Identifier doesn’t match expected value (906)

这个问题报错得相当恶心,因为集群内都是可以运行的,集群外就是不行。从日志上看,kerberos服务是成功登录的,但是到了和kafka通讯的时候,就报错了。
后来这个问题既不是没有安装kerberos,也不是因为没有安装JCE。而是没有在hosts文件没有配置集群内服务器的hosts。。。。不知道为什么报错要这么不明显。。哎。反正加了集群内所有服务器的hosts就搞定了

5、Network is unreachable (connect failed) 认证失败

null credentials from Ticket Cache
[Krb5LoginModule] authentication failed
Network is unreachable (connect failed)
10:17:32,300 INFO KafkaProducer:341 – [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
10:17:32,319 DEBUG KafkaProducer:177 – [Producer clientId=producer-1] Kafka producer has been closed
Exception in thread “main” java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.eclipse.jdt.internal.jarinjarloader.JarRsrcLoader.main(JarRsrcLoader.java:61)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:441)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:297)
at cn.com.bmsoft.kafka.client.SimpleProducer.main(SimpleProducer.java:27)
… 5 more
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Network is unreachable (connect failed)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:112)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:114)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:398)
… 7 more
Caused by: javax.security.auth.login.LoginException: Network is unreachable (connect failed)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:808)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52)
at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:98)
at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:53)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:82)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103)
… 11 more
Caused by: java.net.SocketException: Network is unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.security.krb5.internal.TCPClient.(NetClient.java:63)
at sun.security.krb5.internal.NetClient.getInstance(NetClient.java:43)
at sun.security.krb5.KdcComm$KdcCommunication.run(KdcComm.java:393)
at sun.security.krb5.KdcComm$KdcCommunication.run(KdcComm.java:364)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.krb5.KdcComm.send(KdcComm.java:348)
at sun.security.krb5.KdcComm.sendIfPossible(KdcComm.java:253)
at sun.security.krb5.KdcComm.send(KdcComm.java:229)
at sun.security.krb5.KdcComm.send(KdcComm.java:200)
at sun.security.krb5.KrbAsReqBuilder.send(KrbAsReqBuilder.java:316)
at sun.security.krb5.KrbAsReqBuilder.action(KrbAsReqBuilder.java:361)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:776)
… 28 more

首先我检查了到krb服务器的88端口,以及kafka所在的9092端口,全部都是通的,所以不存在网络错误。。最后问题还是处在了dns上。
解决:
修改krb5.conf的kdc和admin_server的值为/etc/hosts文件中对应的域名。

TEST.COM = {
kdc = 10.1.1.1
admin_server = 10.1.1.1
}
修改为:
TEST.COM = {
kdc = KERB_SERVER
admin_server = KERB_SERVER
}

kafka kerberos安全认证后消息测试

在CDH 5.13.1 启用了安全认证后,KAFKA需要经过一些安全参数配置才可以使用。而CDH各个地方的资料都有或多或少的问题,导致各种错误。我这里将今天的踩坑过程记录下来。

1、准备工作
首先需要通过kerberos的安全,需要准备好四个个文件:kaytab文件(krb认证文件)、jaas.conf文件、producer.properties、consumer.properties

jaas.conf文件内容:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=”/Users/develop/test/testkafka/kafka.keytab”
storeKey=true
useTicketCache=true
debug=true
principal=”kafka@PICC.COM”;
};
注意里面keytab文件路径,要指向你存放keytab文件路径。

producer.properties文件内容:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

consumer.properties文件内容:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI

2、导出jaas.conf文件路径到JVM环境变量
export KAFKA_OPTS=”-Djava.security.auth.login.config=/root/kafka_test/jaas.conf”

至此完成安全认证准备,可以进行kafka的相关操作了

3、创建topic
sh /opt/cloudera/parcels/KAFKA/bin/kafka-topics –create –zookeeper server1:2181/kafka –replication-factor 1 –partitions 1 –topic test
注意:-zookeeper server1:2181/kafka 最后的路径/kafka是cdh上kafka的默认root路径。如果不加这个路径就会报如下错误。

4、审查topic是否创建成功

sh /opt/cloudera/parcels/KAFKA/bin/kafka-topics –list –zookeeper server1:2181/kafka

5、创建消费者
sh /opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer –new-consumer –topic test –from-beginning –bootstrap-server server1:9092 –consumer.config consumer.properties –partition 0
注意:虽然我们创建topic的时候设置–partitions 1,但是这里如若不指定–partition 0 ,消费者死活都不会拿到消息的!!具体原因没有仔细探查,估计是什么默认参数的问题,目前这样设置是可以拿到消息的。

6、创建生产者
sh /opt/cloudera/parcels/KAFKA/bin/kafka-console-producer –broker-list server1:9092 –topic test –producer.config producer.properties

至此可以在安全认证的kafka上,通过生产者和消费者测试。但是如何使用java程序登入安全认证的kafka还在努力中。。。感觉免费版的cdh真难用。。

Solr查询调优三:facet.threads

This param will cause loading the underlying fields used in faceting to be executed in parallel with the number of threads specified. Specify as facet.threads=N where N is the maximum number of threads used. Omitting this parameter or specifying the thread count as 0 will not spawn any threads, and only the main request thread will be used. Specifying a negative number of threads will create up to Integer.MAX_VALUE threads.

facet.threads指定执行facet查询时,并行执行的线程数量。
facet.threads默认为1或者指定为0时,在执行facet的时候只有一个单线程处理。当设置 facet.threads为N时,就会指定最大N个线程执行。当设置为-1时,等于设置为Interger.MAX_VALUE.

Solr查询调优二:fq 禁用缓存和post filter

关闭filter query的缓存
实际应用中,有很多的filter cache是没有必要的,而且filter cahce的上限数量是固定,所以应该禁用一些不常用的filter cache。
例子:
1、fq={!cache=false}id:123&fq={!frange l=90 u=100 cache=false}
2、scale(query({!v=”content:(solr OR lucene)”}),0,100)

改变filter执行顺序
将过滤最多数据的filter置于最前面,这样后面如果有需要进行高开销的filter,计算的数据量就大大减少。
例子:
fq={!cost=1}category:technology&
fq={!cost=2}date:[NOW/DAY-1YEAR TO *]&
fq={!geofilt pt=37.773,-122.419 sfield=location d=50 cost=3}&
fq={!frange l=90 u=100 cache=false cost=100}
scale(query({!v=”content:(solr OR lucene)”}),0,100)

COST的数值越高,filter越后执行。将特别耗资源的filter设置成100,同时将cache变成false,因为它的结果是随机值,没有保存的意义。类似于POST FILER。

POST FILTER
Post Filter是一个特殊filter。它在所有的main query和filter执行完毕后才开始执行,即在mainquery和filter产生的最后交集文档后执行post filter。
post filter类似于前面提到的将filter设置为cost=100.
post filter 一般用于高开销的检索和匹配。自己可以实现postfiler interface实现自己的post filter。

Solr查询调优一: query VS filterquery 区别

Solr有两个查询参数,分别是query(q)和filterquery(fq)。官方文档没有写清楚两者之间具体有什么区别。

fq的官方文档这样写着:https://lucene.apache.org/solr/guide/7_3/common-query-parameters.html#fq-filter-query-parameter

The fq parameter defines a query that can be used to restrict the superset of documents that can be returned, without influencing score. It can be very useful for speeding up complex queries, since the queries specified with fq are cached independently of the main query. When a later query uses the same filter, there’s a cache hit, and filter results are returned quickly from the cache.
When using the fq parameter, keep in mind the following:

  • The fq parameter can be specified multiple times in a query. Documents will only be included in the result if they are in the intersection of the document sets resulting from each instance of the parameter. In the example below, only documents which have a popularity greater then 10 and have a section of 0 will match.fq=popularity:[10 TO *]&fq=section:0
  • Filter queries can involve complicated Boolean queries. The above example could also be written as a single fq with two mandatory clauses like so:fq=+popularity:[10 TO *] +section:0
  • The document sets from each filter query are cached independently. Thus, concerning the previous examples: use a single fq containing two mandatory clauses if those clauses appear together often, and use two separate fq parameters if they are relatively independent. (To learn about tuning cache sizes and making sure a filter cache actually exists, see The Well-Configured Solr Instance.)
  • It is also possible to use filter(condition) syntax inside the fq to cache clauses individually and – among other things – to achieve union of cached filter queries.
  • As with all parameters: special characters in an URL need to be properly escaped and encoded as hex values. Online tools are available to help you with URL-encoding. For example: http://meyerweb.com/eric/tools/dencoder/.

fq和q虽然不太好区分,但是能明确区分出两者的差别,对性能提升很高。两者的主要区别如下:
1、q又叫main query,fq全程filter query;
2、相关性评分
fq只有一个用途:就是查询出满足条件的文档。q有两个用途:1、查询出满足条件的文档;2、对返回的文档针对搜索关键字进行相关性评分。因此可以这样使用两者:将q看成一个特殊的filter,仅会多一步相关性评分。所以可以将用户搜索的关键词放入q中,这样可以根据用户的搜索给出相关性最高的文档,例如keyword=apache solr,同时将用户下拉选择的枚举字段放入fq参数中,例如category=techonology。
3、缓存和执行速度
将filter query 从main query中分离出来,有两个目的:
1、filter query 可以使用 filter query cache。
2、filter query 不进行开销巨大的相关性评分,加快执行速度。
4、可以指定多fq,但是只能有一个q
5、执行顺序
到底是fq先执行,还是q执行,看了很多文档,各执一词。但是solr in action的答案比较靠谱,执行顺序还是要看具体情况。

1 、每一个fq参数都会首先到filter cache中查询文档是否存在。
2、如果fq参数没有在 filter cache 找到,就会检索索引文件,并将检索到docset放入缓存中。
3、所有filter的docset进行取交集,最终生成一个唯一的docset。
4 、The q parameter is passed in (along with the filter DocSet) to be executed as a
Lucene query. When executing the query, Lucene plays leapfrog between the
query and combined filters, advancing both the query and filter results objects
to their next present internal ID (an integer). When both the query result and
filter result objects contain the same ID, that ID is collected, a process that
includes generating the relevancy score for the document
这段我翻译的不太清楚。意思大概是将q查出来的结果和前面filter的结果进行交集,最后为交集的每一个结果计算相关性评分。
5、执行post filter

参考资料:
1、solr in action

Solr优化一:部署调整

我们有一张大宽表,数据量大概在20亿左右,100多个字段,HDFS中不算复制因子,原始数据文件差不多1.8TB。这算是一个较大的宽表了。
大数据集群节点大概20台.CDH默认只能在一个节点上安装一个SOLR实例。由于其版本过低,不满足我们的功能要求。改为了独立部署当时最新的SOLR版本V7.3.1。

第一种方案:
由于每个物理机上留给SOLR的内存只有60GB,所以一开始,我们在每个节点上部署了3个实例,每个SOLR实例 60GB/3 =20GB。这样部署有以下几个问题:
1、SOLR普通检索过慢,经常需要10S到几十秒不等。
2、SOLR的distinct分析语句经常导致 SOLR实例OOM。
3、SOLR实例挂掉后,无法自动重启。
4、索引建立还行,每分钟接近1000万。

第二种方案:
从大数据集群中独立出6台物理机,专门用作SOLR集群。扣掉HBASE占用的50GB内存,操作系统50GB,剩下的150GB留给SOLR使用。每个节点上部署5个实例,摊到每个实例上150GB/5=30GB。部署后结果:
1、SOLR普通检索速度提升,时间变为1秒到3秒之内。
2、SOLR的distinct分析语句偶尔导致 SOLR实例OOM 。但是SOLR能够自动重启。
3、 SOLR的distinct分析语句执行时间较长,大致在3.3分钟左右。
4、索引建立极慢,20分钟才能建完400万数据。目前未探明是卡在网络还是磁盘?磁盘应该不会,因为每个实例都在一块独立的磁盘。

个人认为OOM的原因是因为内存不足,没有给SOLR足够的缓存空间吧。因为在SOLR实例中看到系统的物理内存始终为250.6G,差一点256GB,濒临崩溃的边缘。但是CDH的监控,只有146GB,和SOLR的监控不一致。SOLR的HEAP-SPACE,32GB只用到了8GB.

dynamic partition搬表 out of memory

在想启用dynamic partition来偷懒搬表的情况时,如果表的数据过大(例如我在六个节点上搬迁2亿数据)时,就会经常OOM。

样例SQL:

set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions= 1000;
set hive.exec.max.dynamic.partitions.pernode=1000;

CREATE TABLE B (
NAME STRING
)
partitioned BY (NAME2 string,NAME3 string,NAME4 string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’
STORED AS PARQUET;

INSERT INTO TABLE B
PARTITION(NAME2,NAME3,NAME4)
select NAME2,NAME3,NAME4
FROM A;

我想将A表的数据进行分区,并导入B表。最简单的方式就启用动态分区。可是执行上面的SQL时,报出OOM错误。一般碰到OOM,我第一反应就是提高heap space。

set spark.executor.memory=25G;

结果还是一样OOM。难道是内存还不够吗?24G–>30G–>48GG。。还是不行。

最后找到一篇文章,启用hive.optimize.sort.dynamic.partition即可;

SET hive.optimize.sort.dynamic.partition=true

针对该参数和对应问题的解释:
该问题出现的原因是太多的文件同时进入写入。当启用hive.optimize.sort.dynamic.partition后面,动态分区列就会进行全局排序,这样REDUCE端,每个动态分区列的值只会有一个文件写入,从而REDUCE阶段降低了内存使用。

hive.optimize.sort.dynamic.partition
  • Default Value: true in Hive 0.13.0 and 0.13.1; false in Hive 0.14.0 and later (HIVE-8151)
  • Added In: Hive 0.13.0 with HIVE-6455

When enabled, dynamic partitioning column will be globally sorted. This way we can keep only one record writer open for each partition value in the reducer thereby reducing the memory pressure on reducers.
个人感觉就是在内部使用distribute by。

参考文章:
1、https://cloud.tencent.com/developer/article/1079007
2、https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
3、https://stackoverflow.com/questions/33147764/hive-setting-hive-optimize-sort-dynamic-partition

HIVE 几个 by的区别

参考资料:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy

Order By:
1、在hive.mapred.mode=strict模式下,order by 后面必须加limit。因为不加的话,order by会对所有的数据进行排序。而要进行全局排序,只能使用一个Reducer进行排序,如果结果集数据量异常大的话,速度异常缓慢。

Sort by VS Order by

1、Sort by是将在一个Reducer中的数据进行局部排序,如果有多Reducer,那么结果集只能是局部排序的。而Order By是所有Reducer汇总到一个Reducer进行全局排序。

Cluster By VS Distribute By

1、Cluster By 只是 Distribute By 和Sort By 的简写。即:Cluster By 字段A ==Distribute By 字段A Sort By 字段A。那么既然这样,为什么还要Distribute By和Sort By呢?那就是当Distribute By和Sort By字段不一致的时候就需要了。例如Distribute By 字段A Sort By 字段B 字段C。
2、Cluster By 因为包含Sort by,所以在每一个Reducer中的数据是有序的。
3、Distribute By 是无序的。
4、当Distribute By和Sort By的字段不一致时,就无法使用Cluster By。