akka-stream

    0热度

    1回答

    更新:我把我的问题在test project来解释我的意思详细 ============== ================================================== ===== 我有一个Akka源contiune从数据库表中读取,groupby一些关键然后减少它。但是,在我应用reduce函数后,似乎数据永远不会发送到sink,因为上游始终有数据到来,所以它会保持减少

    0热度

    1回答

    有两个表TableA和TableB。 我需要一些记录复制从TableA到TableB。我用slick-3.0,并使用以下方法: import akka.stream._ import akka.stream.scaladsl._ ... //{{ READ DATA FROM TABLE A val q = TableA.filter(somePredicate).result val

    0热度

    1回答

    我想有条件地在一个值(非物化)Source上加上一个值。我应该怎么做? val src: Source[_,NotUsed] = ??? Source.combine(Source.single(???), src) 的Source.combine文档提到 使用给定的策略,例如合并或CONCAT ,但不提供关于选择的concat策略的例子。

    0热度

    1回答

    我第一次使用Akka Streams Testkit,并没有找到一个很好的模式来测试一个流而不是在时间窗口内产生一个值。 这工作: intercept[AssertionError] { // '.expectNext' throws this src.request(1) .expectNext(100 millis) // expect no entries in ...

    0热度

    2回答

    我试图用阿卡流流的文件和正在运行到一个小问题,提取流的结果为未来[字符串]: def streamMigrationFile(source: Source[ByteString, _]): Future[String] = { var fileString = "" val sink = Sink.foreach[ByteString](byteString => fileSt

    2热度

    1回答

    我用阿卡流“ActorPublisher演员作为流每个连接的数据Source发送到传入的WebSocket或HTTP连接。 ActorPublisher的contract是定期通过提供需求请求数据 - 下游可接受的元素数量。如果需求为0,我不应该发送更多元素。我观察到,如果我缓冲元素,当消费者速度缓慢时,缓冲区大小在1到60之间波动,但大多数在40-50之间。 要流我使用阿卡-HTTP“s到的We

    1热度

    4回答

    有一个整数的一些流: val source = Source(List(1,2,3,4,5)) 是否有可能获得从源头上(count, sum)结果?对于上面的例子,它将是(5, 15)。 我想我应该用流量,并结合他们: val countFlow = Flow[Int].fold(0)((c, _) => c + 1) val sumFlow = Flow[Int].fold(0)((s, e)

    2热度

    1回答

    我已阅读Akka streams materialization concept,并理解流物化是: ,以运行服用流描述(图),并分配它需要的所有必要资源的过程。 我跟着一个例子,使用mapMaterializedValue构建我的akka​​流,将消息发送到队列。代码的目的是推动信息流蓝图后,排队已经建立和代码工作,但我真的不明白是什么mapMaterrializaedValue代码做: Prom

    0热度

    1回答

    我有一个工作akka-http应用程序。现在我尝试通过slf4j和logback添加日志记录以及我的应用程序崩溃。 build.sbt libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http" % "10.0.7", "ch.qos.logback" % "logback-classic" % "1.2.3",

    1热度

    1回答

    我有一个通过Http().cachedHostConnectionPoolHttps与第三方服务集成的akka​​-http应用中的路由。我想以正确的方式测试它。但是不知道应该如何:( 下面是这条路线的样子: val routes: Route = pathPrefix("access-tokens") { pathPrefix(Segment) { userId => par