hdfs概述

到底什么是hdfs?
个人觉得hdfs其实只是在普通的单机文件系统上加了一层,从而将多个物理机的文件系统统一成一个分布式的文件系统。这个所谓加的一层,就是namenode,能够将各个打散的文件块,拼凑成一个大文件,再提供与操作系统文件系统一致的命令接口。这就是我们常用的dfs指令。
这样熟悉文件系统的用户,可以无缝的迁移到hdfs上,并且无需关心分布式的底层细节。所以,还是计算机里的那句名言:计算机里没有任何一个问题是加一层解决不了的,如果真的不能,就加两层。

HDFS Router-based Federation


到底什么是BLOCK?
其实一个BLOCK就是操作系统上一个普通文件,所谓block size为128mb。其实每一个大文件都切分为128mb个多个小文件,再分布到不同的服务器上。

上面的图是一个节点上dfs数据目录的文件情况。
先看路径 /subdir5/subdir235,这个目录分成了两级,每级都分了很多的子目录。这样可以的目的,可以避免单个文件夹的文件数量,超过操作系统上限。
再看具体文件,可以看到文件大小正好为128M。

对SECONDARY NAMENODE的误解
Secondary Namenode不是Namenode的备份,但是它确实可以作为热备。。

数据复制

HDFS is designed to reliably store very large files across machines in a large cluster. It stores each file as a sequence of blocks. The blocks of a file are replicated for fault tolerance. The block size and replication factor are configurable per file.
hdfs的每一个文件都被拆分成一系列的block,并放在了不同的物理机上。每一个文件的block size 和复制银子都可以单独设置。

All blocks in a file except the last block are the same size, while users can start a new block without filling out the last block to the configured block size after the support for variable length block was added to append and hsync.
所有的block,除了最后一块block,其大小都是一致的。

An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later. Files in HDFS are write-once (except for appends and truncates) and have strictly one writer at any time.
The NameNode makes all decisions regarding replication of blocks. It periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly. A Blockreport contains a list of all blocks on a DataNode.

hdfs的每一个文件只能写一次(除了apped和tuncate),并且任何时间只能有一个writer。

Safemode
On startup, the NameNode enters a special state called Safemode. Replication of data blocks does not occur when the NameNode is in the Safemode state. The NameNode receives Heartbeat and Blockreport messages from the DataNodes. A Blockreport contains the list of data blocks that a DataNode is hosting. Each block has a specified minimum number of replicas. A block is considered safely replicated when the minimum number of replicas of that data block has checked in with the NameNode. After a configurable percentage of safely replicated data blocks checks in with the NameNode (plus an additional 30 seconds), the NameNode exits the Safemode state. It then determines the list of data blocks (if any) that still have fewer than the specified number of replicas. The NameNode then replicates these blocks to other DataNodes.
启动的时候,namenode进入Safemode(安全模式)。安全模式状态下,不会发生block replication。如果发现任何data block副本数低于配置,namenode就是将这些block进行复制,从而达到指定的副本数。

Replication Pipelining
When a client is writing data to an HDFS file with a replication factor of three, the NameNode retrieves a list of DataNodes using a replication target choosing algorithm. This list contains the DataNodes that will host a replica of that block. The client then writes to the first DataNode. The first DataNode starts receiving the data in portions, writes each portion to its local repository and transfers that portion to the second DataNode in the list. The second DataNode, in turn starts receiving each portion of the data block, writes that portion to its repository and then flushes that portion to the third DataNode. Finally, the third DataNode writes the data to its local repository. Thus, a DataNode can be receiving data from the previous one in the pipeline and at the same time forwarding data to the next one in the pipeline. Thus, the data is pipelined from one DataNode to the next.
简单意思就是,客户端在朝hdfs写入带有三个副本因子的文件时,namenode返回datanode列表。客户端首先写入第一个datanode,接着第一个datanode将数据转发写入第二个datanode,接着第二个datanode将数据转发写入第三个datanode。

拍自hadoop operation
拍自hadoop operation

Replica Placement: The First Baby Steps

The placement of replicas is critical to HDFS reliability and performance. Optimizing replica placement distinguishes HDFS from most other distributed file systems. This is a feature that needs lots of tuning and experience. The purpose of a rack-aware replica placement policy is to improve data reliability, availability, and network bandwidth utilization. The current implementation for the replica placement policy is a first effort in this direction. The short-term goals of implementing this policy are to validate it on production systems, learn more about its behavior, and build a foundation to test and research more sophisticated policies.
Large HDFS instances run on a cluster of computers that commonly spread across many racks. Communication between two nodes in different racks has to go through switches. In most cases, network bandwidth between machines in the same rack is greater than network bandwidth between machines in different racks.
在不同机架的两个节点之间进行通讯,会通过switch。在大多数情况下,同一个机架不同节点之间的网络带宽远大于不同机架的不同节点之间。
The NameNode determines the rack id each DataNode belongs to via the process outlined in Hadoop Rack Awareness. A simple but non-optimal policy is to place replicas on unique racks. This prevents losing data when an entire rack fails and allows use of bandwidth from multiple racks when reading data. This policy evenly distributes replicas in the cluster which makes it easy to balance load on component failure. However, this policy increases the cost of writes because a write needs to transfer blocks to multiple racks.
For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on the local machine if the writer is on a datanode, otherwise on a random datanode, another replica on a node in a different (remote) rack, and the last on a different node in the same remote rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.
通常情况,在复制因子为3时,如果writer(即写程序部署在集群内)在一个datanode上,hdfs会把第一个副本放到和writer同一个节点上。如果不是,会随机选择一个节点存放第一个副本。接着,在另一个机架上选择一个数据节点存放第二个副本,并在和第二个副本相同的机架上,选择另外一个数据节点存放第三个副本。简而言之,就是一个副本在一个节点上,另外两个副本在另一个机架的不同节点上。这种策略,降低了aggregate network的带宽,这样数据仅通过两个rack,而不是三个rack。但是,副本在不同机架上没有均匀分布。
If the replication factor is greater than 3, the placement of the 4th and following replicas are determined randomly while keeping the number of replicas per rack below the upper limit (which is basically (replicas – 1) / racks + 2).
Because the NameNode does not allow DataNodes to have multiple replicas of the same block, maximum number of replicas created is the total number of DataNodes at that time.
After the support for Storage Types and Storage Policies was added to HDFS, the NameNode takes the policy into account for replica placement in addition to the rack awareness described above. The NameNode chooses nodes based on rack awareness at first, then checks that the candidate node have storage required by the policy associated with the file. If the candidate node does not have the storage type, the NameNode looks for another node. If enough nodes to place replicas can not be found in the first path, the NameNode looks for nodes having fallback storage types in the second path.
The current, default replica placement policy described here is a work in progress.

参考资料:
1、http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
2、《hadoop operation》

hive multiple inserts 一次扫描多次插入

hive multiple inserts可以仅执行一次扫描,将这次扫描的数据分别根据不同的条件插入到多张表或者同一张表的不同分区中。但是不能插入同一张表!

Hive extension (multiple inserts):
FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1
[INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]
[INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;

这个语法可以降低扫描次数,如果分开多次插入,就会需要扫描多次。如果写成multiple inserts就只会扫表一次。官网解释如下:

Multiple insert clauses (also known as Multi Table Insert) can be specified in the same query.

Multi Table Inserts minimize the number of data scans required. Hive can insert data into multiple tables by scanning the input data just once (and applying different query operators) to the input data.

以下是执行计划截图:


测试情况:在20亿的数据中,执行扫描插入到六张表,只花了六分钟。

SSL服务器的url重写,导致系统登录后跳转到http协议

首先描述下我们的部署架构: 在F5-#1上配置虚拟服务器VS-#1以及负载均衡池(包含10.1.1.1、10.1.1.2),并将VS-#1 443端口的https请求解密为http请求后转发至池内服务器80端口。

用户通过浏览器访问系统的时候,是使用https访问到ssl服务器,然后ssl服务器使用url重写,将请求报文修改为http协议,并负载均衡转发到后台服务器。这样就造成一个问题:后台tomcat容易并不知道自己需要使用https,只知道自己是使用的是http。也是就说:httpservletrequest.getSchema()得到的是http,而不是https,所以这样就导致spring security的重定向到系统首页的url地址是http协议,而不是https协议。

解决方案:

让tomcat知道自己位于proxy(h5负载均衡)后面。在tomcat的sever.xml里面的connector里面生命proxy的相关信息,包括proxyName,proxyPort,schema,secure等等。参考网址:http://www.thecodingforums.com/threads/load-balancing-an-https-java-web-application-in-tomcat.145712/

proxyName=”https://www.test.com” proxyPort=”443″ scheme=”https” secure=”true”

kafka cdh kerberos 安全认证后,使用java程序demo

折腾了两天,终于在启用了kerberos安全认证的cdh5.13上,完成了java版的生产者,消费者DEMO.一种功德无量的感觉油然而生。

生产者代码:

public static void main(String[] args) throws InterruptedException {
System.setProperty("java.security.auth.login.config",
"/Users/zhouxiang/develop/test/testkafka/jaas-client.conf");
System.setProperty("java.security.krb5.conf", "/Users/zhouxiang/develop/test/testkafka/krb5.conf");
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"brokeriplist");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.setProperty("sasl.kerberos.service.name", "kafka");
KafkaProducer producer = new KafkaProducer(props);
for (int i = 0; i < 100; i++) { ProducerRecord data = new ProducerRecord("BPC", "hello kafka " + i);
producer.send(data);
TimeUnit.SECONDS.sleep(2);
System.out.println("test_nopar "+ " hello kafka " + i);
}
producer.flush();
producer.close();
}

消费者代码:

public static void main(String[] args) {
System.setProperty("java.security.auth.login.config", "/home/bigdata/kafka_test/jaas-client.conf");
System.setProperty("java.security.krb5.conf", "/home/bigdata/kafka_test/krb5.conf");
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"brokeriplist");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "GSSAPI");
props.setProperty("sasl.kerberos.service.name", "kafka");
try (KafkaConsumer consumer = new KafkaConsumer<>(props)) {
// List of topics to subscribe to
consumer.subscribe(Arrays.asList("BPC"));
while (true) {
System.out.println("consumer while……");
try {
ConsumerRecords records = consumer.poll(1000);
for (ConsumerRecord record : records) {
System.err.println(record.offset() + "," + record.key() + "," + record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

配置文件:


代码看起来非常简单,但是从0开始会有好多问题。以下是问题记录:

1、Cannot locate default realm)

解决方法:
因为没有在krb5.conf文件里面没有加上default_realm。加上就好了。。

[libdefaults]
default_realm = test.COM

2、The group coordinator is not available
消费者程序,在启动后一直报上面的错误信息。后来检索后,hdp对问题的解释如下:

参考网站:https://community.hortonworks.com/content/supportkb/175137/error-the-group-coordinator-is-not-available-when.html

Cause:
When using bootstrap-server parameter, the connection is through the Brokers instead of Zookeeper. The Brokers use __consumer_offsets to store information about committed offsets for each topic:partition per group of consumers (groupID). In this case, __consumer_offsets was pointing to invalid Broker IDs. Hence, the above the exception was displayed. To check if the Broker IDs are correct for this topic, execute the following command:

所以我根据上面的提示,删除了对应的__consumer_offsets,没有任何改动,程序就成功了。
操作指令: 
rmr /kafka/brokers/topics/__consumer_offsets

3、在程序开发的时候,务必把log4j的日志级别设置为debug。因为很多异常信息,发现只有在log4j中打印出来,整个程序死活都不会抛出异常。

4、这个问题有点恶心,日志报错为:Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) – LOOKING_UP_SERVER)]

Caused by: KrbException: Identifier doesn’t match expected value (906)

这个问题报错得相当恶心,因为集群内都是可以运行的,集群外就是不行。从日志上看,kerberos服务是成功登录的,但是到了和kafka通讯的时候,就报错了。
后来这个问题既不是没有安装kerberos,也不是因为没有安装JCE。而是没有在hosts文件没有配置集群内服务器的hosts。。。。不知道为什么报错要这么不明显。。哎。反正加了集群内所有服务器的hosts就搞定了

5、Network is unreachable (connect failed) 认证失败

null credentials from Ticket Cache
[Krb5LoginModule] authentication failed
Network is unreachable (connect failed)
10:17:32,300 INFO KafkaProducer:341 - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
10:17:32,319 DEBUG KafkaProducer:177 - [Producer clientId=producer-1] Kafka producer has been closed
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.eclipse.jdt.internal.jarinjarloader.JarRsrcLoader.main(JarRsrcLoader.java:61)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:441)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:297)
at cn.com.bmsoft.kafka.client.SimpleProducer.main(SimpleProducer.java:27)
… 5 more
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Network is unreachable (connect failed)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:112)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:114)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:398)
… 7 more
Caused by: javax.security.auth.login.LoginException: Network is unreachable (connect failed)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:808)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52)
at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:98)
at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:53)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:82)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103)
… 11 more
Caused by: java.net.SocketException: Network is unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.security.krb5.internal.TCPClient.(NetClient.java:63)
at sun.security.krb5.internal.NetClient.getInstance(NetClient.java:43)
at sun.security.krb5.KdcComm$KdcCommunication.run(KdcComm.java:393)
at sun.security.krb5.KdcComm$KdcCommunication.run(KdcComm.java:364)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.krb5.KdcComm.send(KdcComm.java:348)
at sun.security.krb5.KdcComm.sendIfPossible(KdcComm.java:253)
at sun.security.krb5.KdcComm.send(KdcComm.java:229)
at sun.security.krb5.KdcComm.send(KdcComm.java:200)
at sun.security.krb5.KrbAsReqBuilder.send(KrbAsReqBuilder.java:316)
at sun.security.krb5.KrbAsReqBuilder.action(KrbAsReqBuilder.java:361)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:776)
… 28 more

首先我检查了到krb服务器的88端口,以及kafka所在的9092端口,全部都是通的,所以不存在网络错误。。最后问题还是处在了dns上。
解决:
修改krb5.conf的kdc和admin_server的值为/etc/hosts文件中对应的域名。

TEST.COM = {
kdc = 10.1.1.1
admin_server = 10.1.1.1
}
修改为:
TEST.COM = {
kdc = KERB_SERVER
admin_server = KERB_SERVER
}