akka-stream

    0热度

    2回答

    在使用Akka Streams时,似乎我永远无法获得错误处理权限。 所以这是我的代码 var db = Database.forConfig("oracle") var mysqlDb = Database.forConfig("mysql_read") var mysqlDbWrite = Database.forConfig("mysql_write") implicit val ac

    4热度

    1回答

    TLDR:是更好兑现每个请求一个流(即使用短寿命流)或使用跨请求单个流物化,当我有传出http请求作为一部分流的? 详细信息:我有一个典型的服务,需要一个HTTP请求时,它驱散几个第三方服务的下游(不是由我控制),然后送回前汇总的结果。我正在使用akka-http进行客户端实施并喷洒服务器(遗留下来,随着时间的推移将会转移到akka-http)。示意性地: request -> map -1-*-

    2热度

    1回答

    我正在运行一个Akka Streams Reactive Kafka应用程序,它应该在重负载下正常工作。运行该应用程序大约10分钟后,该应用程序将以OutOfMemoryError停机。我试图调试堆转储,发现akka.dispatch.Dispatcher正在占用〜5GB的内存。以下是我的配置文件。 阿卡版本:2.4.18 反应卡夫卡版本:2.4.18 1. application.conf: c

    1热度

    1回答

    我有一个来源,分组元素和一个水槽,使批量请求, 我使用KillSwitch能够关闭图形在某些任意时间点。 ,最新一批不完整的记录中源输出得到时​​被称为 val source = Source.tick(10.millis, 10.millis, "tick").grouped(500) val (switch, _) = source.viaMat(KillSwitches.single)(

    2热度

    1回答

    我有以下方法,它返回一个Future[Source[List[String]]](前两个CSV文件的行): def get(url: String, charset: String, delimiter: Char, quote: Char, escape: Char) = { val scanner = CsvParsing.lineScanner( delimiter.to

    1热度

    1回答

    我是Akka Streams的新手,我有一个问题。 所以我可以从服务器操作和处理数据的一些客户(从下面的官方文档的源代码)。 private static final ActorSystem system = ActorSystem.create("Client"); private static final Materializer materializer = ActorMaterializ

    0热度

    1回答

    我运行一个阿卡流卡夫卡应用程序,我想结合的流消费者的监督策略,例如,如果经纪人下降,流消费者停止超时后死亡,主管可以重新启动消费者。 这里是我的完整代码: UserEventStream: import akka.actor.{Actor, PoisonPill, Props} import akka.kafka.{ConsumerSettings, Subscriptions} import

    0热度

    1回答

    我在8080端口上它完美,并显示以下消息在本地运行一个简单的阿卡HTTP服务器: Started server at 127.0.0.1:8080, press enter to kill server 我使用sbt-assembly创建的.jar文件。它生活在target/scala-2.12/my-app-assembly-0.1.jar 然后,我创建了一个简单的Dockerfile,如:

    0热度

    1回答

    我是新来的阿卡流。我从github运行下面的例子。但是,向“Helloer”actor发送的消息不会在输出控制台中接收和显示。 StreamingApp.scala import _root_.akka.actor.{ Actor, Props } import org.apache.spark._ import org.apache.spark.streaming._ import org

    3热度

    2回答

    我有一个包含20万个用户的500,000个元素和一个队列。信息以不同的速度处理(1,15,30,60秒,3,50分钟,3,16小时或更长时间,24小时为超时)。我需要消费者的回应,以便对数据进行一些处理。我将为此和基于事件的onComplete使用Scala Future。 为了不淹没队列,我想发送前30条消息到队列:消费者将选择20条消息,并且10条队列将在队列中等待。当其中一个Future s