2017-02-12 90 views
1

我正在使用蛋糕解决方案Akka client for scala and Kafka。虽然我创建KafkaProducerActor演员,并尝试使用ask模式发送消息并返回将来并执行一些操作,但每次我都面临ask超时异常。下面是我的代码:Apache Kafka:KafkaProducerActor抛出异常ASk超时。

class SimpleAkkaProducer (config: Config, system: ActorSystem) { 

    private val producerConf = KafkaProducer. 
    Conf(config, 
     keySerializer = new StringSerializer, 
     valueSerializer = new StringSerializer) 

    val actorRef = system.actorOf(KafkaProducerActor.props(producerConf)) 

    def sendMessageWayOne(record: ProducerRecords[String, String]) = { 
    actorRef ! record 
    } 

    def sendMessageWayTwo(record: ProducerRecords[String, String]) = { 
    implicit val timeout = Timeout(100.seconds) 
    val future = (actorRef ? record).mapTo[String] 
    future onComplete { 
     case Success(data) => println(s" >>>>>>>>>>>> ${data}") 
     case Failure(ex) => ex.printStackTrace() 
    } 
    } 
} 

object SimpleAkkaProducer { 
    def main(args: Array[String]): Unit = { 
    val system = ActorSystem("KafkaProducerActor") 
    val config = ConfigFactory.defaultApplication() 
    val simpleAkkaProducer = new SimpleAkkaProducer(config, system) 

    val topic = config.getString("akka.topic") 
    val messageOne = ProducerRecords.fromKeyValues[String, String](topic, 
     Seq((Some("Topics"), "First Message")), None, None) 

    simpleAkkaProducer.sendMessageWayOne(messageOne) 
    simpleAkkaProducer.sendMessageWayTwo(messageOne) 
    } 
} 

以下是例外:

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://KafkaProducerActor/user/$a#-1520717141]] after [100000 ms]. Sender[null] sent message of type "cakesolutions.kafka.akka.ProducerRecords". 
    at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604) 
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) 
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:864) 
    at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109) 
    at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) 
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:862) 
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) 
    at java.lang.Thread.run(Thread.java:745) 
+0

添加KafkaProducerActor'的'的定义。 –

+0

'KafkaProducerActor'由api和实现设计,就像https://github.com/cakesolutions/scala-kafka-client/blob/1cbeccbb183ca06585f4b9fb1e366048e993ac51/akka/src/main/scala/cakesolutions/kafka/akka/KafkaProducerActor.scala –

+0

嘿@YuvalItzchakov你有没有找到解决办法? –

回答

2

生产者演员只响应给发件人,如果指定的ProducerRecordssuccessResponsefailureResponse值比None以外的东西。当Kafka写入成功时,将successResponse值发送回发件人,并且在Kafka写入失败时发回failureResponse值。

实施例:

val record = ProducerRecords.fromKeyValues[String, String](
    topic = topic, 
    keyValues = Seq((Some("Topics"), "First Message")), 
    successResponse = Some("success"), 
    failureResponse = Some("failure") 
) 

val future = (actorRef ? record).mapTo[String] 
future onComplete { 
    case Success("success") => println("Send succeeded!") 
    case Success("failure") => println("Send failed!") 
    case Success(data) => println(s"Send result: $data") 
    case Failure(ex) => ex.printStackTrace() 
} 
+0

好吧@Jaakko,这是有道理的,这是工作,但仍然如何发送消息后获得'RrecordMetaData'? –

+0

不幸的是,没有办法获得'RecordMetaData'与当前版本的生产者actor。在项目页面上欢迎修补程序和建议。 :) –

+0

好的,谢谢@Jaakko的帮助。 –