kafka cdh kerberos 安全认证后,使用java程序demo

折腾了两天,终于在启用了kerberos安全认证的cdh5.13上,完成了java版的生产者,消费者DEMO.一种功德无量的感觉油然而生。

生产者代码:

public static void main(String[] args) throws InterruptedException {
System.setProperty("java.security.auth.login.config",
"/Users/zhouxiang/develop/test/testkafka/jaas-client.conf");
System.setProperty("java.security.krb5.conf", "/Users/zhouxiang/develop/test/testkafka/krb5.conf");
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"brokeriplist");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.setProperty("sasl.kerberos.service.name", "kafka");
KafkaProducer producer = new KafkaProducer(props);
for (int i = 0; i < 100; i++) { ProducerRecord data = new ProducerRecord("BPC", "hello kafka " + i);
producer.send(data);
TimeUnit.SECONDS.sleep(2);
System.out.println("test_nopar "+ " hello kafka " + i);
}
producer.flush();
producer.close();
}

消费者代码:

public static void main(String[] args) {
System.setProperty("java.security.auth.login.config", "/home/bigdata/kafka_test/jaas-client.conf");
System.setProperty("java.security.krb5.conf", "/home/bigdata/kafka_test/krb5.conf");
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"brokeriplist");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "GSSAPI");
props.setProperty("sasl.kerberos.service.name", "kafka");
try (KafkaConsumer consumer = new KafkaConsumer<>(props)) {
// List of topics to subscribe to
consumer.subscribe(Arrays.asList("BPC"));
while (true) {
System.out.println("consumer while……");
try {
ConsumerRecords records = consumer.poll(1000);
for (ConsumerRecord record : records) {
System.err.println(record.offset() + "," + record.key() + "," + record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

配置文件:


代码看起来非常简单,但是从0开始会有好多问题。以下是问题记录:

1、Cannot locate default realm)

解决方法:
因为没有在krb5.conf文件里面没有加上default_realm。加上就好了。。

[libdefaults]
default_realm = test.COM

2、The group coordinator is not available
消费者程序,在启动后一直报上面的错误信息。后来检索后,hdp对问题的解释如下:

参考网站:https://community.hortonworks.com/content/supportkb/175137/error-the-group-coordinator-is-not-available-when.html

Cause:
When using bootstrap-server parameter, the connection is through the Brokers instead of Zookeeper. The Brokers use __consumer_offsets to store information about committed offsets for each topic:partition per group of consumers (groupID). In this case, __consumer_offsets was pointing to invalid Broker IDs. Hence, the above the exception was displayed. To check if the Broker IDs are correct for this topic, execute the following command:

所以我根据上面的提示,删除了对应的__consumer_offsets,没有任何改动,程序就成功了。
操作指令: 
rmr /kafka/brokers/topics/__consumer_offsets

3、在程序开发的时候,务必把log4j的日志级别设置为debug。因为很多异常信息,发现只有在log4j中打印出来,整个程序死活都不会抛出异常。

4、这个问题有点恶心,日志报错为:Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) – LOOKING_UP_SERVER)]

Caused by: KrbException: Identifier doesn’t match expected value (906)

这个问题报错得相当恶心,因为集群内都是可以运行的,集群外就是不行。从日志上看,kerberos服务是成功登录的,但是到了和kafka通讯的时候,就报错了。
后来这个问题既不是没有安装kerberos,也不是因为没有安装JCE。而是没有在hosts文件没有配置集群内服务器的hosts。。。。不知道为什么报错要这么不明显。。哎。反正加了集群内所有服务器的hosts就搞定了

5、Network is unreachable (connect failed) 认证失败

null credentials from Ticket Cache
[Krb5LoginModule] authentication failed
Network is unreachable (connect failed)
10:17:32,300 INFO KafkaProducer:341 - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
10:17:32,319 DEBUG KafkaProducer:177 - [Producer clientId=producer-1] Kafka producer has been closed
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.eclipse.jdt.internal.jarinjarloader.JarRsrcLoader.main(JarRsrcLoader.java:61)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:441)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:297)
at cn.com.bmsoft.kafka.client.SimpleProducer.main(SimpleProducer.java:27)
… 5 more
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Network is unreachable (connect failed)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:112)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:114)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:398)
… 7 more
Caused by: javax.security.auth.login.LoginException: Network is unreachable (connect failed)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:808)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52)
at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:98)
at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:53)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:82)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103)
… 11 more
Caused by: java.net.SocketException: Network is unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.security.krb5.internal.TCPClient.(NetClient.java:63)
at sun.security.krb5.internal.NetClient.getInstance(NetClient.java:43)
at sun.security.krb5.KdcComm$KdcCommunication.run(KdcComm.java:393)
at sun.security.krb5.KdcComm$KdcCommunication.run(KdcComm.java:364)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.krb5.KdcComm.send(KdcComm.java:348)
at sun.security.krb5.KdcComm.sendIfPossible(KdcComm.java:253)
at sun.security.krb5.KdcComm.send(KdcComm.java:229)
at sun.security.krb5.KdcComm.send(KdcComm.java:200)
at sun.security.krb5.KrbAsReqBuilder.send(KrbAsReqBuilder.java:316)
at sun.security.krb5.KrbAsReqBuilder.action(KrbAsReqBuilder.java:361)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:776)
… 28 more

首先我检查了到krb服务器的88端口,以及kafka所在的9092端口,全部都是通的,所以不存在网络错误。。最后问题还是处在了dns上。
解决:
修改krb5.conf的kdc和admin_server的值为/etc/hosts文件中对应的域名。

TEST.COM = {
kdc = 10.1.1.1
admin_server = 10.1.1.1
}
修改为:
TEST.COM = {
kdc = KERB_SERVER
admin_server = KERB_SERVER
}

kafka kerberos安全认证后消息测试

在CDH 5.13.1 启用了安全认证后,KAFKA需要经过一些安全参数配置才可以使用。而CDH各个地方的资料都有或多或少的问题,导致各种错误。我这里将今天的踩坑过程记录下来。

1、准备工作
首先需要通过kerberos的安全,需要准备好四个个文件:kaytab文件(krb认证文件)、jaas.conf文件、producer.properties、consumer.properties

jaas.conf文件内容:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=”/Users/develop/test/testkafka/kafka.keytab”
storeKey=true
useTicketCache=true
debug=true
principal=”kafka@TEST.COM”;
};
注意里面keytab文件路径,要指向你存放keytab文件路径。

producer.properties文件内容:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

consumer.properties文件内容:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI

2、导出jaas.conf文件路径到JVM环境变量
export KAFKA_OPTS=”-Djava.security.auth.login.config=/root/kafka_test/jaas.conf”

至此完成安全认证准备,可以进行kafka的相关操作了

3、创建topic
sh /opt/cloudera/parcels/KAFKA/bin/kafka-topics –create –zookeeper server1:2181/kafka –replication-factor 1 –partitions 1 –topic test
注意:-zookeeper server1:2181/kafka 最后的路径/kafka是cdh上kafka的默认root路径。如果不加这个路径就会报如下错误。

4、审查topic是否创建成功

sh /opt/cloudera/parcels/KAFKA/bin/kafka-topics –list –zookeeper server1:2181/kafka

5、创建消费者
sh /opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer –new-consumer –topic test –from-beginning –bootstrap-server server1:9092 –consumer.config consumer.properties –partition 0
注意:虽然我们创建topic的时候设置–partitions 1,但是这里如若不指定–partition 0 ,消费者死活都不会拿到消息的!!具体原因没有仔细探查,估计是什么默认参数的问题,目前这样设置是可以拿到消息的。

6、创建生产者
sh /opt/cloudera/parcels/KAFKA/bin/kafka-console-producer –broker-list server1:9092 –topic test –producer.config producer.properties

至此可以在安全认证的kafka上,通过生产者和消费者测试。但是如何使用java程序登入安全认证的kafka还在努力中。。。感觉免费版的cdh真难用。。