2017-07-28 61 views
0

是否可以通过Kafka Producer对象发送字符串数组。我想从'topic1'中接收一些消息 - 文本行然后将其分割为单个单词并将其发送到另一个主题。Kafka - scala - 处理多个邮件

object KafkaConsumer extends App { 

     implicit val actorSystem = ActorSystem("test-actor-system") 
     implicit val streamMaterializer = ActorMaterializer() 
     implicit val executionContext = actorSystem.dispatcher 
     val log = actorSystem.log 


     // PRODUCER config 
     val producerSettings = ProducerSettings(
     actorSystem, 
     new ByteArraySerializer, 
     new StringSerializer) 
     .withBootstrapServers("localhost:9092") 
     .withProperty("auto.create.topics.enable", "true") 

     // CONSUMER config 
     val consumerSettings = ConsumerSettings(
     system = actorSystem, 
     keyDeserializer = new ByteArrayDeserializer, 
     valueDeserializer = new StringDeserializer) 
     .withBootstrapServers("localhost:9092") 
     .withGroupId("kafka-sample") 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
     // -----------------------------------------------------------------------// 

     // ROUTE OF THE APP 
     Consumer.committableSource(consumerSettings, 
     Subscriptions.topics("topic1")) 
    .map { 
      msg => println(s"topic1 -> topic2: $msg") 
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.record.value), msg.committableOffset) 
      } 
    .runWith(Producer.commitableSink(producerSettings)) 
    } 
+0

快速回复是的,你也可以发送对象的JSON – MaximeF

+0

我的主要目标是发送该数组的每个单个元素。我的意思是=>消费者从topic1获取:“xxxx xxx xx x”,并且我想使用Producer 4消息发送:“xxxx”,“xxx”,“xx”,“x”。你能帮助我吗? – METUAN

+0

也许我不明白,但你可以拆分你的消息,然后致电4次发送(产生)消息。 – MaximeF

回答

0

的阿卡流样品创建一个简单的流读取一个消息,使用产生于卡夫卡和提交对所消耗的消息偏移的接收器。如果您需要阅读一个或多个消息并生成许多消费集中的单词,则应该使用Akka Stream Graph API进行更多操作。

此示例使用图表以及从卡夫卡建立一个来源和使用groupedWithin读了一堆的消息,并得到了现有词。

创建了两个简单的流程,一个用于获取最后一个偏移量,另一个用于获取单词。然后创建一个Source阶段,将来自Kafka的消息广播到两个流中,并将结果压缩成一个元组(Seq [String],Long)。随着runForeach函数消息的产生。请注意,邮件不会按照Future.sequence的顺序生成。

虽然样品可看久了它编译和运行使用正确“com.typesafe.akka” %%“阿卡 - 流 - 卡夫卡”%“0.14”

import java.util.Properties 

import akka.actor.ActorSystem 
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset} 
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions} 
import akka.kafka.scaladsl.Consumer 
import akka.stream.{ActorMaterializer, SourceShape} 
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Source, Zip} 

import org.apache.kafka.clients.consumer.ConsumerConfig 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} 
import org.apache.kafka.common.serialization.{ 
    ByteArrayDeserializer, 
    ByteArraySerializer, 
    StringDeserializer, 
    StringSerializer 
} 

import scala.concurrent.Future 
import scala.util.Success 
import scala.concurrent.duration._ 

object SplitSource extends App { 

    implicit val actorSystem = ActorSystem("test-actor-system") 
    implicit val streamMaterializer = ActorMaterializer() 
    implicit val executionContext = actorSystem.dispatcher 
    val log = actorSystem.log 

    // PRODUCER config 
    val producerSettings = ProducerSettings(actorSystem, 
              new ByteArraySerializer, 
              new StringSerializer) 
    .withBootstrapServers("localhost:9092") 
    .withProperty("auto.create.topics.enable", "true") 

    // CONSUMER config 
    val consumerSettings = 
    ConsumerSettings(system = actorSystem, 
        keyDeserializer = new ByteArrayDeserializer, 
        valueDeserializer = new StringDeserializer) 
     .withBootstrapServers("localhost:9092") 
     .withGroupId("kafka-sample4") 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 

    implicit val producerConfig = { 
    val props = new Properties() 
    props.setProperty("bootstrap.servers", "localhost:9092") 
    props.setProperty("key.serializer", classOf[StringSerializer].getName) 
    props.setProperty("value.serializer", classOf[StringSerializer].getName) 
    props 
    } 

    lazy val kafkaProducer = new KafkaProducer[String, String](producerConfig) 

    // Create Scala future from Java 
    private def publishToKafka(id: String, data: String) = { 
    Future { 
     kafkaProducer 
     .send(new ProducerRecord("outTopic", id, data)) 
     .get() 
    } 
    } 

    def getKafkaSource = 
    Consumer 
     .committableSource(consumerSettings, Subscriptions.topics("inTopic")) 
     // It consumes 10 messages or waits 30 seconds to push downstream 
     .groupedWithin(10, 30 seconds) 

    val getStreamSource = GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val in = getKafkaSource 

    // BroadCast to two flows. One for obtain the last offset to commit 
    // and other to return the Seq with the words to publish 
    val br = b.add(Broadcast[Seq[CommittableMessage[Array[Byte], String]]](2)) 
    val zipResult = b.add(Zip[CommittableOffset, Array[String]]()) 
    val flowCommit = Flow[Seq[CommittableMessage[Array[Byte], String]]].map(_.last.committableOffset) 

    // Flow that creates the list of all words in all consumed messages 
    val _flowWords = 
     Flow[Seq[CommittableMessage[Array[Byte], String]]].map(input => { 
     input.map(_.record.value()).mkString(" ").split(" ") 
     }) 

    val zip = Zip[CommittableOffset, Array[String]] 

    // build the Stage 
    in ~> br ~> flowCommit ~> zipResult.in0 
      br ~> _flowWords ~> zipResult.in1 

    SourceShape(zipResult.out) 
    } 

    Source.fromGraph(getStreamSource).runForeach { msgs => 
    { 
     // Publish all words and when all futures complete the commit the last Kafka offset 
     val futures = msgs._2.map(publishToKafka("outTopic", _)).toList 

     // Produces in parallel!!. Use flatMap to make it in order 
     Future.sequence(futures).onComplete { 
     case Success(e) => { 
      // Once all futures are done, it makes commit to the last consumed message 
      msgs._1.commitScaladsl() 
     } 
     } 
    } 
    } 

} 

的阿卡流API允许创建出色的处理流水线。

+0

它完美的工作!谢谢! – METUAN

0

您应该map之前使用mapConcat,因为它

变换每个输入元素到输出元件的Iterable随后被压平到输出流中。

全部附加线将是这样的:

Subscriptions.topics("topic1")) 
    .mapConcat { msg => msg.record.value().split(" ").toList } 
    .map { ... 
+0

这是一个简单而好的解决方案,但请注意,如果您将所有消息映射到字符串列表,则无法从它们获取可接受的偏移量。 – METUAN