我正在使用Akka Streams Kafka将卡夫卡消息传递给远程服务。我希望保证该服务每次只收到一条消息(至少一次,最多一次发送)。处理消息后提交Kafka消费者补偿的好模式是什么?
这是我想出了代码:
private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
topicPattern: String,
mapCommittableMessageToSinkMessage: Function[CommittableMessage[String, String], T]) {
val groupId = config.getString("group-id")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
import system.dispatcher // the ExecutionContext that will be used in ask call below
Consumer.committableSource(consumerSettings, Subscriptions
.topicPattern(topicPattern))
.map(message => (message, mapCommittableMessageToSinkMessage(message)))
.mapAsync(1)(tuple => ask(subscriber, tuple._2).map(_ => tuple._1))
.mapAsync(1)(message => message.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
}
正如代码所示,它映射原始消息的元组,以及传递给用户(发送到远程服务的男主角转化消息)。该元组的用途是在用户完成处理后提交偏移量。
它的一些东西看起来像一个反模式,但我不确定更好的方法来做到这一点。任何建议在更好的方式?
谢谢!
谢谢Stefano!你的方法奏效了。这是比我的方法更多的代码。我是Akka的新手,所以我可以看到GraphDSL对于复杂流是一种更可扩展的方法。我将在单独的答案中使用样板文字发布代码。 – jacob