的阿卡流样品创建一个简单的流读取一个消息,使用产生于卡夫卡和提交对所消耗的消息偏移的接收器。如果您需要阅读一个或多个消息并生成许多消费集中的单词,则应该使用Akka Stream Graph API进行更多操作。
此示例使用图表以及从卡夫卡建立一个来源和使用groupedWithin读了一堆的消息,并得到了现有词。
创建了两个简单的流程,一个用于获取最后一个偏移量,另一个用于获取单词。然后创建一个Source阶段,将来自Kafka的消息广播到两个流中,并将结果压缩成一个元组(Seq [String],Long)。随着runForeach函数消息的产生。请注意,邮件不会按照Future.sequence的顺序生成。
虽然样品可看久了它编译和运行使用正确“com.typesafe.akka” %%“阿卡 - 流 - 卡夫卡”%“0.14”
import java.util.Properties
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Source, Zip}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{
ByteArrayDeserializer,
ByteArraySerializer,
StringDeserializer,
StringSerializer
}
import scala.concurrent.Future
import scala.util.Success
import scala.concurrent.duration._
object SplitSource extends App {
implicit val actorSystem = ActorSystem("test-actor-system")
implicit val streamMaterializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher
val log = actorSystem.log
// PRODUCER config
val producerSettings = ProducerSettings(actorSystem,
new ByteArraySerializer,
new StringSerializer)
.withBootstrapServers("localhost:9092")
.withProperty("auto.create.topics.enable", "true")
// CONSUMER config
val consumerSettings =
ConsumerSettings(system = actorSystem,
keyDeserializer = new ByteArrayDeserializer,
valueDeserializer = new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("kafka-sample4")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit val producerConfig = {
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092")
props.setProperty("key.serializer", classOf[StringSerializer].getName)
props.setProperty("value.serializer", classOf[StringSerializer].getName)
props
}
lazy val kafkaProducer = new KafkaProducer[String, String](producerConfig)
// Create Scala future from Java
private def publishToKafka(id: String, data: String) = {
Future {
kafkaProducer
.send(new ProducerRecord("outTopic", id, data))
.get()
}
}
def getKafkaSource =
Consumer
.committableSource(consumerSettings, Subscriptions.topics("inTopic"))
// It consumes 10 messages or waits 30 seconds to push downstream
.groupedWithin(10, 30 seconds)
val getStreamSource = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val in = getKafkaSource
// BroadCast to two flows. One for obtain the last offset to commit
// and other to return the Seq with the words to publish
val br = b.add(Broadcast[Seq[CommittableMessage[Array[Byte], String]]](2))
val zipResult = b.add(Zip[CommittableOffset, Array[String]]())
val flowCommit = Flow[Seq[CommittableMessage[Array[Byte], String]]].map(_.last.committableOffset)
// Flow that creates the list of all words in all consumed messages
val _flowWords =
Flow[Seq[CommittableMessage[Array[Byte], String]]].map(input => {
input.map(_.record.value()).mkString(" ").split(" ")
})
val zip = Zip[CommittableOffset, Array[String]]
// build the Stage
in ~> br ~> flowCommit ~> zipResult.in0
br ~> _flowWords ~> zipResult.in1
SourceShape(zipResult.out)
}
Source.fromGraph(getStreamSource).runForeach { msgs =>
{
// Publish all words and when all futures complete the commit the last Kafka offset
val futures = msgs._2.map(publishToKafka("outTopic", _)).toList
// Produces in parallel!!. Use flatMap to make it in order
Future.sequence(futures).onComplete {
case Success(e) => {
// Once all futures are done, it makes commit to the last consumed message
msgs._1.commitScaladsl()
}
}
}
}
}
的阿卡流API允许创建出色的处理流水线。
快速回复是的,你也可以发送对象的JSON – MaximeF
我的主要目标是发送该数组的每个单个元素。我的意思是=>消费者从topic1获取:“xxxx xxx xx x”,并且我想使用Producer 4消息发送:“xxxx”,“xxx”,“xx”,“x”。你能帮助我吗? – METUAN
也许我不明白,但你可以拆分你的消息,然后致电4次发送(产生)消息。 – MaximeF