2017-08-14 78 views
0

我是新的使用aka流kafka(和akka流一般)。我正在尝试构建一个图表,以便将消息发布到不同的主题。 如何将生产者作为流连接以提交处理后的消息?我试着用Producer.flow但由于您使用的是GraphDSL我不能得到commitScaladsl连接生产者流程图

object TestFoo { 
    import akka.kafka.ProducerMessage.Message 
    implicit val system = ActorSystem("test-kafka") 
    implicit val materializer = ActorMaterializer() 
    val evenNumbersTopic = "even_numbers" 
    val allNumbersTopic = "all_numbers" 
    lazy val consumerSettings = ConsumerSettings(system, new StringDeserializer(), new JsonDeserializer[Int]) 
    .withBootstrapServers("localhost:9092") 
    .withGroupId("group1") 
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
    lazy val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(evenNumbersTopic, allNumbersTopic))) 
    val producerSettings = ProducerSettings(system, new StringSerializer(), new StringSerializer()) 
    .withBootstrapServers("localhost:9092") 
    val flow: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 
    type TypedMessage = Message[String, Int,CommittableOffset] 
    val bcast = b.add(Broadcast[TypedMessage](2)) 
    val merge = b.add(Merge[TypedMessage](2)) 

    val evenFilter = Flow[TypedMessage].filter ( c => c.record.value() % 2 == 0) 
    val justEven = Flow[TypedMessage].map{ 
     case Message(pr, offset) => 
     val r = new ProducerRecord[String, Int]("general", pr.value()) 
     Message(r, offset) 
    } 
    val allNumbers = Flow[TypedMessage].map{ 
     case Message(pr, offset) => 
     val r = new ProducerRecord[String, Int](allNumbersTopic, pr.value()) 
     Message(r, offset) 
    } 

    val toMsg = Flow[ConsumerMessage.CommittableMessage[String, Int]].map{ msg => 
     val r = new ProducerRecord[String, Int]("general", msg.record.value()) 
     Message(r, msg.committableOffset) 
    } 
    source ~> toMsg ~> bcast 

    bcast ~> evenFilter ~> justEven ~> merge 
    bcast ~> allNumbers ~> merge 
    merge ~> Producer.flow(producerSettings).mapAsync(producerSettings.parallelism) { result => 
     result.message.passThrough.commitScaladsl() //this doesn't compile, cannot get the .commitScaladsl() 
    } 
    ClosedShape 
    })} 
+0

此刻这个例子是操纵着许多其他编译错误。你能修改它,使你的编译错误很容易重现吗? –

+0

@StefanoBonetti是的,我更新了代码,但编译错误较少,谢谢 – igx

回答

0

,编译器不能推断出从前一阶段PassThrough类型。 尝试并明确地将类型参数传递给Producer.flow函数,例如

merge ~> Producer.flow[K, V, CommittableOffset](producerSettings).mapAsync(producerSettings.parallelism) { result => 
    result.message.passThrough.commitScaladsl() 
} 

我已经离开KV非绑定PARAM,请配合有任何键/值类型的生产者就必然产生。如果您希望上面的代码正确连线,则需要将producerSettings类型与合并阶段的代码匹配。你需要这样的东西:

val producerSettings = ProducerSettings(system, new StringSerializer(), new JsonSerializer[Int]) 
    .withBootstrapServers("localhost:9092") 
+0

谢谢,但它实际上产生了相同的结果: val sink = Producer.flow [String,String,CommittableOffset](producerSettings).mapAsync producerSettings.parallelism){result => result.message.passThrough.commitScaladsl()//仍然无法编译,无法获得.commitScaladsl()}'' – igx

+0

对我来说,它看起来像'producerSettings'类型是'[String,String]',但你实际上给它提供了一个'[String,Int]'类型的记录。我已经修改了答案,提出了一些更改 –