Kafka Java Producer与kerberos

在kerberosed环境中向kafka主题发送消息时收到错误。 我们在hdp 2.3上有集群

我跟着这个http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

但是对于发送消息,我必须首先明确地执行kinit,然后才能将消息发送到kafka主题。 我试图通过java类编织,但这也行不通。 PFB代码:

package com.ct.test.kafka; import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { String principalName = "ctadmin"; String keyTabPath = "/etc/security/keytabs/ctadmin.keytab"; boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath); if (!authStatus) { System.out.println("Authntication fails, try something else " + authStatus); } else { System.out.println("Authntication successfull " + authStatus); } System.setProperty("java.security.krb5.conf", "/etc/krb5.conf"); System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf"); System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); System.setProperty("sun.security.krb5.debug", "true"); try { long events = Long.parseLong("3"); Random rnd = new Random(); Properties props = new Properties(); System.out.println("After broker list- " + args[0]); props.put("metadata.broker.list", args[0]); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); props.put("security.protocol", "PLAINTEXTSASL"); //props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner"); System.out.println("After config prop -1"); ProducerConfig config = new ProducerConfig(props); System.out.println("After config prop -2 config" + config); Producer producer = new Producer(config); System.out.println("After config prop -3"); for (long nEvents = 0L; nEvents < events; nEvents += 1L) { Date runtime = new Date(); String ip = "192.168.2" + rnd.nextInt(255); String msg = runtime + " www.example.com, " + ip; KeyedMessage data = new KeyedMessage("test_march4", ip, msg); System.out.println("After config prop -1 data" + data); producer.send(data); } producer.close(); } catch (Throwable th) { th.printStackTrace(); } } } 

Pom.xml:从hortonworks repo下载的所有依赖项。

    org.apache.kafka kafka_2.10 0.9.0.2.3.4.0-3485   org.apache.kafka kafka-clients 0.9.0.2.3.4.0-3485   org.jasypt jasypt-spring31 1.9.2 compile   org.apache.hadoop hadoop-common 2.7.1.2.3.4.0-3485   

错误:Case1:当我指定myuser kafka_jass.conf时

 log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. After config prop -2 configkafka.producer.ProducerConfig@643293ae java.lang.SecurityException: Configuration Error: Line 6: expected [controlFlag] at com.sun.security.auth.login.ConfigFile.(ConfigFile.java:110) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at javax.security.auth.login.Configuration$2.run(Configuration.java:258) at javax.security.auth.login.Configuration$2.run(Configuration.java:250) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249) at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291) at org.apache.kafka.common.security.kerberos.Login.(Login.java:104) at kafka.common.security.LoginManager$.init(LoginManager.scala:36) at kafka.producer.Producer.(Producer.scala:50) at kafka.producer.Producer.(Producer.scala:73) at kafka.javaapi.producer.Producer.(Producer.scala:26) at com.ct.test.kafka.TestProducer.main(TestProducer.java:51) Caused by: java.io.IOException: Configuration Error: Line 6: expected [controlFlag] at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563) at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413) at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383) at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283) at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219) at com.sun.security.auth.login.ConfigFile.(ConfigFile.java:108) 

MyUser_Kafka_jass.conf

 KafkaClient { com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true useTicketCache=true renewTicket=true principal="ctadmin/prod-dev1-dn1@PROD.COM"; useKeyTab=true serviceName="kafka" keyTab="/etc/security/keytabs/ctadmin.keytab" client=true; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/security/keytabs/ctadmin.keytab" storeKey=true useTicketCache=true serviceName="zookeeper" principal="ctadmin/prod-dev1-dn1@PROD.COM"; }; 

case2:当我指定Kafkas自己的jaas文件时

 Java config name: /etc/krb5.conf Loaded from Java config javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner authentication information from the user at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899) at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719) at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762) at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687) at javax.security.auth.login.LoginContext.login(LoginContext.java:595) at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298) at org.apache.kafka.common.security.kerberos.Login.(Login.java:104) at kafka.common.security.LoginManager$.init(LoginManager.scala:36) at kafka.producer.Producer.(Producer.scala:50) at kafka.producer.Producer.(Producer.scala:73) at kafka.javaapi.producer.Producer.(Producer.scala:26) at com.ct.test.kafka.TestProducer.main(TestProducer.java:51) 

这很好,如果我在运行这个应用程序之前做了kinit,否则它将通过上面的错误。 我不能在我的生产环境中这样做,如果我们的应用程序本身有任何方法可以做到这一点,那么请帮助我。 如果您需要更多详细信息,请与我们联系。

谢谢:)

我不知道第一次犯了什么错误,在我再次做的事情之下,它运作正常。

首先给出所有主题的访问权限:

 bin/kafka-acls.sh --add --allow-principals user:ctadmin --operation ALL --topic marchTesting --authorizer-properties zookeeper.connect={hostname}:2181 

创建jass文件:kafka-jaas.conf

 KafkaClient { com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true useTicketCache=true principal="ctadmin@HSCALE.COM" useKeyTab=true serviceName="kafka" keyTab="/etc/security/keytabs/ctadmin.keytab" client=true; }; 

Java程序:

 package com.ct.test.kafka; import java.util.Date; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class KafkaProducer { public static void main(String[] args) { String topic = args[0]; Properties props = new Properties(); props.put("metadata.broker.list", "{Hostname}:6667"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); props.put("security.protocol", "PLAINTEXTSASL"); ProducerConfig config = new ProducerConfig(props); Producer producer = new Producer(config); for (int i = 0; i < 10; i++){ producer.send(new KeyedMessage(topic, "Test Date: " + new Date())); } } } 

运行申请:

java -Djava.security.auth.login.config = / home / ctadmin / kafka-jaas.conf -Djava.security.krb5.conf = / etc / krb5.conf -Djavax.security.auth.useSubjectCredsOnly = true -cp kafka -testing-0.0.1-jar-with-dependencies.jar com.ct.test.kafka.KafkaProducer

你可以在这段输出中看到错误是你的jaas文件中的分号:

 Line 6: expected [controlFlag] 

这行不能有分号:

 principal="ctadmin/prod-dev1-dn1@PROD.COM"; 

它只能存在于最后一行: