2017-10-21 95 views
1

基本上这里是我使用的代码。akka stream阅读时无尽http流反压

当我用curl进行连接时,我发现curl命令中的所有实体都非常快。当我尝试模仿与akka相同的行为时,打印出我得到的元素之间会有很大的停顿。

流动波纹管在某种程度上受到了压力,并且在前4条消息之后,1条消息的其余部分会在明显的打印线之后到达。

前4条消息大约是2k JSON,最后一条没有。 5是80k JSON。

最后一个实体(编号5)也是最大的块,我得到它在流完成之前打印的印象。而且我非常积极,只有2-3秒的运行时间。

知道为什么这流中读取前4个元素

val awesomeHttpReq = Http().singleRequest(
    HttpRequest(
    method = GET, 
    uri = Uri("http://some-service-providing-endless-http.stream") 
) 
) 

val a = Source.fromFuture(awesomeHttpReq).flatMapConcat { 
    case HttpResponse(status, _, entity, _) => 
    // I saw some comments the back pressure might kick in 
    // because I might not be consuming the bytes here properly 
    // but this is totally in line with all the examples etc. 

    entity.withoutSizeLimit.getDataBytes.via(Framing delimiter (ByteString("\n"), Int.MaxValue)) 
} map { bytes => 
    parse(bytes decodeString StandardCharsets.UTF_8).fold(pf => throw new IllegalStateException(s"unable to parse: $pf"), identity[Json]) 
} mapConcat { items => 
    // every line that comes in from previous stage contains 
    // key elements - this I'm interested in, it's an array 
    items.asObject flatMap (_.toMap get "events") flatMap (_ asArray) getOrElse Nil 
} 

val b: Future[Vector[Json]] = a 
    .takeWithin(50 second) 
    .runWith(Sink.fold(Vector.empty[Json])((a, b) => { 

    // I'm using this to see what's going on in the stream 
    // there are significant pauses between the entities 
    // in reality the elements are available in the stream (all 5) 
    // within 2-3 seconds 
    // and this printing just has very big pause after first 4 elements 

    println(s"adding\n\n\n ${b.noSpaces}") 
    a :+ b 
    })) 

Await.result(b, 1 minute) 

后只是挂我看了一下这个问题似乎真的接近我有https://github.com/akka/akka-http/issues/57,但不知何故未能找到我的情况下,一些有帮助的。

我也试过改变一下akka http的块大小,并没有真正的帮助。

这里有传入消息的时机: 从流初始化:

1. 881 ms 
2. 889 ms 
3. 894 ms 
4. 898 ms 
// I don't understand why this wait time of 30 seconds in betweeen 
5. 30871 ms 

的最后一条消息显然某处挂起30秒

任何想法真的可以理解。

更新:

因为它是真正奇怪的是,前4种元素在4摆脱一贯暨第五届一个正在等待了30秒,我决定从默认的4增加initial-input-buffer-size = 4到16,现在它按预期工作。我只是无法理解上面代码中背压的起因。

更新2:

缓冲区大小有助于我的简单示例。但在我的真正的问题我有一些很奇怪的事情:

entity.withoutSizeLimit.dataBytes 
    .alsoTo(Sink.foreach(a => println("stage 1 " + a.decodeString(StandardCharsets.UTF_8)))) 
    .via(Framing delimiter (ByteString("\n"), Int.MaxValue)) 
    .buffer(1000, OverflowStrategy.backpressure) 
    .alsoTo(Sink.foreach(a => println("stage 2 " + a.decodeString(StandardCharsets.UTF_8)))) 

我可以看到我的取景之前所需要的信息(阶段1),但之后在日志(第2阶段)。我确信有足够的空间可以通过放置缓冲区来实现。

现在我已经找到了新的行字符不会真正走进盈阶段(第一阶段),这是何等的每一行通常结束:

"7da".sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toChar).mkString 
res12: String = 
"} 
" 

在我的最后一个项目我m错过了最后一个字节a,基本上新行并没有进入框架。所以整个事情不会发射。

+0

有趣的是,我想知道你是否可以在不使用akka-http的情况下重现这种情况,即将一些源JSON转储到文件中,并使用'Source.fromFile'代替http请求。 –

+0

当我刚从卷曲转储它的作品。此外,我现在尝试'initial-input-buffer-size = 16',它按预期工作......这真的很奇怪,看起来背压在某处。但无法弄清楚在哪里。 –

+0

用文件作为流尝试,使用与此处相同的代码。我不会遇到这个问题:(现在让我有点疯狂:D –

回答

1

经过相当多的调查后,我决定解决这个问题,因为它看起来像存在多种因素的组合。整个问题的输入源实际上是我公司使用的背景中带有kafka的专有企业服务总线:https://github.com/zalando/nakadi

通过上述症状,我想也许是系统没有根据文档,他们可能无法通过追加来发送\n,但他们也坦然以每行玩,但是这仅仅是不是因为我查的情况下在代码:https://github.com/zalando/nakadi/blob/0859645b032d19f7baa919877f72cb076f1da867/src/main/java/org/zalando/nakadi/service/EventStreamWriterString.java#L36

看到这个之后,我尝试用这个例子来模拟整个事情:

build.sbt

name := "test-framing" 

version := "0.1" 

scalaVersion := "2.12.4"  

lazy val akkaVersion = "2.5.6" 
lazy val akkaHttpVersion = "10.0.10" 

libraryDependencies ++= Seq(
    "com.typesafe.akka" %% "akka-stream" % akkaVersion, 
    "com.typesafe.akka" %% "akka-http" % akkaHttpVersion, 
    "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion 
) 

scalacOptions in Compile ++= (scalacOptions in Compile).value :+ "-Yrangepos" 

* TestApp.scala - 在那里我有这个问题在我的代码*

import java.nio.charset.StandardCharsets 

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model._ 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Framing, Sink, Source} 
import akka.util.ByteString 

import scala.concurrent.duration._ 
import scala.concurrent.{Await, Future} 

object TestApp extends App { 

    implicit val system = ActorSystem("MyAkkaSystem") 
    implicit val materializer = ActorMaterializer() 

    val awesomeHttpReq = Http().singleRequest(
    HttpRequest(
     method = HttpMethods.GET, 
     uri = Uri("http://localhost:9000/streaming-json") 
    ) 
) 

    val a = Source.fromFuture(awesomeHttpReq).flatMapConcat { 
    case HttpResponse(status, _, entity, _) => 
     entity.withoutSizeLimit.getDataBytes 
     .via(Framing delimiter (ByteString("\n"), Int.MaxValue)) 
    } map { bytes => 
    bytes decodeString StandardCharsets.UTF_8 
    } 

    val b: Future[Vector[String]] = a 
    .takeWithin(50 second) 
    .runWith(Sink.fold(Vector.empty[String])((a, b) => { 
     println(s"adding $b") 
     a :+ b 
    })) 

    Await.result(b, 1 minute) 

} 

*模拟终点*

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.common.EntityStreamingSupport 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport 
import akka.http.scaladsl.server.Directives 
import akka.stream.scaladsl.{Flow, Source} 
import akka.stream.{ActorMaterializer, ThrottleMode} 
import akka.util.ByteString 
import spray.json._ 

import scala.concurrent.duration._ 
import scala.io.StdIn 

object TestApp2 extends App { 

    implicit val system = ActorSystem("MyAkkaSystem") 
    implicit val materializer = ActorMaterializer() 

    implicit val executionContext = system.dispatcher 

    case class SomeData(name: String) 

    trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { 
    implicit val someFormat = jsonFormat1(SomeData) 
    } 

    val start = ByteString.empty 
    val sep = ByteString("\n") 
    val end = ByteString.empty 

    implicit val jsonStreamingSupport = EntityStreamingSupport 
    .json() 
    .withFramingRenderer(Flow[ByteString].intersperse(sep)) 

    object MyJsonService extends Directives with JsonSupport { 

    def streamingJsonRoute = 
     path("streaming-json") { 
     get { 
      val sourceOfNumbers = Source(1 to 1000000) 

      val sourceOfDetailedMessages = 
      sourceOfNumbers 
       .map(num => SomeData(s"Hello $num")) 
       .throttle(elements = 5, 
         per = 30 second, 
         maximumBurst = 6, 
         mode = ThrottleMode.Shaping) 

      complete(sourceOfDetailedMessages) 
     } 
     } 
    } 

    val bindingFuture = 
    Http().bindAndHandle(MyJsonService.streamingJsonRoute, "localhost", 9000) 

    println(s"Server online at http://localhost:9000/\nPress RETURN to stop...") 
    StdIn.readLine() // let it run until user presses return 
    bindingFuture 
    .flatMap(_.unbind()) // trigger unbinding from the port 
    .onComplete(_ => system.terminate()) // and shutdown when done 

} 

在模拟端点我的行为如预期,所以没有任何关于阿卡的错误。

当多个图书馆+ nakadi汇集在一起​​,但这只是鹅狩猎仍然可能会有一些问题。最后,如果我将batch_flush_timeout降低到某个较低值,服务器实际上会将下一个事件发送到管道中,因此管道中最后一个消息将被推送到我的应用程序层,以便我可以处理它。

基本上所有这些文字都是因为一个单独的字节在某种程度上不会成帧,但是在过去的几天我又学到了很多关于akka流的知识。