2017-06-12 312 views
0

我正在写sck卡夫卡制作人,我想从斯卡拉卡夫卡客户端发送消息给卡夫卡经纪人,问题是经纪人没有得到这些消息我通过验证从命令行启动kafka使用者。卡夫卡制片人和消费者在命令提示符下工作正常。 Kafka是Kerberos和SASL_PlainText安全性启用。发送数据到Kerberos从Scala启用Kafka集群客户端

请在下面找到我的conf文件,客户端代码和应用程​​序日志。我认为从代码连接到Kerberos时一定有一些问题。

斯卡拉客户:

package com.ABC.adds.producer 

import akka.actor.ActorSystem 
import akka.kafka.ProducerSettings 
import akka.kafka.scaladsl.Producer 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Source 
import com.ABC.adds.models.Models.GMMOfaq 
import com.ABC.adds.producer.serializer.ModelSerializer 
import com.thoughtworks.xstream.XStream 
import com.thoughtworks.xstream.io.xml.DomDriver 
import com.typesafe.config.ConfigFactory 
import com.typesafe.scalalogging.LazyLogging 
import org.apache.kafka.clients.CommonClientConfigs 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} 
import org.apache.kafka.common.serialization.ByteArraySerializer 

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.util.{Failure, Success} 

object faqProducer extends App with LazyLogging{ 

    val config = ConfigFactory.load() 
    implicit val system = ActorSystem.create("adds-faq-producer", config) 
    implicit val mat = ActorMaterializer() 

    val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ModelSerializer[PPOfaq](classOf[PPOfaq])) 
    .withBootstrapServers("jbt12324.systems.pfk.ABC:3382") 
    .withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") 
     .withProperty("zookeeper.connect","jbt234234.systems.pfk.ABC:2341,jbt234.systems.pfk.ABC:234,jbt1234324.systems.pfk.ABC:2234") 
     .withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "1") 

    val xstream = new XStream(new DomDriver) 
    val personString: String = scala.io.Source.fromInputStream(getClass().getClassLoader().getResourceAsStream("PPOfaq.xml")).mkString 
    xstream.alias("faq", classOf[PPOfaq]) 
    val ppofaq: PPOfaq = xstream.fromXML(personString).asInstanceOf[PPOfaq] 

    logger.info(s"Producer Configuration is : {} ", producerSettings.toString) 
    logger.info(s"Sending message : {}", ppofaq) 

    logger.info("KafkaProducer Send first fetching Partitions for topics") 
    val kafkaProducer = producerSettings.createKafkaProducer() 
    kafkaProducer.partitionsFor("asp.adds.ppo.pems") 
    val done1 = kafkaProducer.send(new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq)) 
    val recordMetaData : RecordMetadata = done1.get() 

    logger.info("Topic is : " + recordMetaData.topic() +" partition is : "+ recordMetaData.partition() +" offset is : "+ recordMetaData.offset()) 

    logger.info("KafkaProdcuer Send first fetching Partitions for topics end") 

    val done = Source.single(ppofaq) 
    .map { elem => 
     new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq) 
    } 
    .runWith(Producer.plainSink(producerSettings)) 

    done onComplete { 
    case Success(s) => { 
    logger.info(s"The producer has sent a message to the topic: asp.adds.ppo.pems!!") 
    } 
    case Failure(e) => { 
    logger.error("Erorr occured while producing Topic", e) 
    e.printStackTrace() 
    e.fillInStackTrace() 
    e.getCause 
    e.getMessage 
    } 
} 
} 

这是Kafka_Client的conf文件我使用Kerberos身份验证:

KafkaClient { 
com.sun.security.auth.module.Krb5LoginModule required 
doNotPrompt=true 
useTicketCache=false 
useKeyTab=true 
serviceName="kafka" 
principal="[email protected]" 
keyTab="/home/pqr/.pqr.headless.keytab" 
debug=true 
client=true; 
}; 
Client { 
    com.sun.security.auth.module.Krb5LoginModule required 
    doNotPrompt=true 
    useKeyTab=true 
    useTicketCache=false 
    serviceName="zookeeper" 
    principal="[email protected]" 
    keyTab="/home/pqr/.pqr.headless.keytab" 
    debug=true; 
}; 

这是当我在群集中运行我的罐子我得到了应用程序日志: 应用程序日志:

[[email protected] ResourceBundle]$ java -Djava.security.auth.login.config=kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.debug=logincontext,gssloginconfig,configfile,configparser, -jar adds-producer.jar 
      [Policy Parser]: creating policy entry for expanded java.ext.dirs path: 
        file:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/lib/ext/* 
      [Policy Parser]: creating policy entry for expanded java.ext.dirs path: 
        file:/usr/java/packages/lib/ext/* 
    14:44:56.520 [main] INFO c.h.adds.producer.addsProducer$ - Producer Configuration is : [email protected] 
    14:44:56.523 [main] INFO c.h.adds.producer.addsProducer$ - Sending message : PPOadds(PIC_EVENT,01/06/2016,26/10/2016,ASSIGNED,asd_asdasd_ERRORED,asd,asdMSR,High,CASE-3,CASE-4,CASE-1,CASE-2,,CustomAttributes(GCAL,PTS Only,,DTF_INT_WORKFLOWS_COMPLETE,16065763,2495921,12,CreditDefaultSwap,CreditDefaultSwap,VERIFIED_asdasd,ABCUSA,asdCDS)) 
    14:44:56.524 [main] INFO c.h.adds.producer.addsProducer$ - KafkaProducer Send first fetching Partitions for topics 
    configfile: reading file:/home/asdasd-asdasdds-adds-asda/adasd-erer/hfgh/kafka_client_jaas.conf 
    configparser: Reading next config entry: KafkaClient 
    configparser:   com.sun.security.auth.module.Krb5LoginModule, REQUIRED 
    configparser:     [email protected] 
    configparser:     debug=true 
    configparser:     doNotPrompt=true 
    configparser:     keyTab=/home/asdasd-asdad-adds-rrewr/.sdfsf-sdfsd-adds-sdfsf.headless.keytab 
    configparser:     client=true 
    configparser:     useKeyTab=true 
    configparser:     useTicketCache=false 
    configparser:     serviceName=kafka 
    configparser: Reading next config entry: Client 
    configparser:   com.sun.security.auth.module.Krb5LoginModule, REQUIRED 
    configparser:     [email protected] 
    configparser:     debug=true 
    configparser:     doNotPrompt=true 
    configparser:     keyTab=/home/sdfdsf-sfds-adds-sdf/.sdff.sdsfs-adds-usdfs.headless.keytab 
    configparser:     useKeyTab=true 
    configparser:     useTicketCache=false 
    configparser:     serviceName=zookeeper 
    Debug is true storeKey false useTicketCache false useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /home/dasda-sasd-adds-asdad/.asdad-asd-adds-adsasd.headless.keytab refreshKrb5Config is false principal is [email protected] tryFirstPass is false useFirstPass is false storePass is false clearPass is false 
    principal is [email protected] 
    Will use keytab 
      [LoginContext]: login success 
    Commit Succeeded 

      [LoginContext]: commit success 
    14:44:56.748 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 
    adds in thread "main" org.apache.kafka.common.errors.Timeoutadds: Failed to update metadata after 60000 ms. 

请让我知道如果ia我做错了什么。 谢谢, 马亨德拉Tonape

回答

0

我们无法在我们的集群消费从消费端的消息,但我们能够在我们本地机器消耗的消息,这是因为我们写我们的应用程序API使用卡夫卡0.10的和我们的集群有卡夫卡版本0.9。如果您检查了这两个Kafka版本之间的差异,您会发现这两个版本API之间存在显着差异。

此外,请启用Kerberos调试日志以检查用户是否使用启用了Kerberos的群集进行身份验证

相关问题