reactive-kafka

    2热度

    1回答

    我收到来自Kafka反应流消费者的Bytestring文件;我想用这个Bytestring构造一个akka-http请求作为实体HttpEntity.Default。 HttpEntity.Default需要Source [Bytestring,Any]作为其参数之一。 什么是连接两者的最佳方式?

    0热度

    1回答

    我想并行化写入kafka,即有多个生产者将数据发送到kafka,尽管它来自akka流。在其他我的流中有几个从源头开始的阶段,然后在发送数据时,我希望有大约16位工作人员同时发送数据。 我想知道是否需要在阿卡流图DSL中嵌入Akka Streams Kafka并为此使用平衡器,或者如果有更简单的解决方案。另外,简单地说,如果有人做了这样的事情总的来说会很棒。

    0热度

    1回答

    使用Kafka发送大型文件时,是否可以将其分配到分区中,然后使用Akka-Stream重新组装? http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297

    0热度

    1回答

    我试图实现简单的服务,将来自kafka的消息包装在一些数据中并将其发送到外部服务。 处理消息时处理外部服务不可用的常见模式是什么? 到目前为止,我只在外部服务请求成功时手动提交消息。如果没有提交,我希望kafka在一段时间后重新发送消息,以便处理外部服务失败对消费者而言是透明的。尽管如此,我找不到一种方法。 但我很好奇,如果我没有做一些反模式,并有更好的解决方案。

    0热度

    1回答

    我正在使用Scala 2.11和Akka Streams Kafka 0.17。 我有一个流其中: 甲Source使用Source.actorRef创建。在这里,演员计划以固定间隔运行并持续生成消息,并将消息发送到流中。 我已附加Producer作为Flow。制片人推动ProducerMessage.Message成为卡夫卡话题。 一些数据库操作。 我有一个问题,同时构建ProducerMessa

    2热度

    1回答

    我正在运行一个Akka Streams Reactive Kafka应用程序,它应该在重负载下正常工作。运行该应用程序大约10分钟后,该应用程序将以OutOfMemoryError停机。我试图调试堆转储,发现akka.dispatch.Dispatcher正在占用〜5GB的内存。以下是我的配置文件。 阿卡版本:2.4.18 反应卡夫卡版本:2.4.18 1. application.conf: c

    0热度

    1回答

    我看着下列文件:https://github.com/akka/reactive-kafka,我看到了下面的代码片段: implicit val actorSystem = ActorSystem("ReactiveKafka") implicit val materializer = ActorMaterializer() val kafka = new ReactiveKafka()

    0热度

    1回答

    我是新的使用aka流kafka(和akka流一般)。我正在尝试构建一个图表,以便将消息发布到不同的主题。 如何将生产者作为流连接以提交处理后的消息?我试着用Producer.flow但由于您使用的是GraphDSL我不能得到commitScaladsl object TestFoo { import akka.kafka.ProducerMessage.Message impl

    0热度

    1回答

    我开发反应卡夫卡在我们的游戏斯卡拉项目,在我们创建的5个主题,由消费者团体订阅和工作好,现在的问题是我创建了一个新的话题,如何我可以将此主题添加到现有的消费群(是可能) 我的代码是: val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) .wi