2017-02-20 90 views
0

我有下面的代码片段:阿卡流+阿卡的Http传递参数

case class SomeClass(param1:String,param2:String,param3:String) 

    val someClassActorSource: Source[SomeClass, ActorRef] = Source 
     .actorPublisher[SomeClass](Props[SomeClassActorPublisher]) 

    val someFlow: ActorRef = Flow[SomeClass] 

     .mapAsync(3)(f=> getDocumentById(f)) 

     .map(f =>{ 
      val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test") 
      .withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a) 
      ) 
      (request,request) 

     }).via(connection) 

     //Parsing Response 
     .mapAsync(3){ 
      case (Success(HttpResponse(status, _, entity, _)),request)=> 
      entity.dataBytes.runFold(ByteString(""))(_ ++ _) 
     } 
     .map(resp =>parse(resp.utf8String,?????????????)) 
     .to(Sink.someSink{....}) 
     .runWith(someClassActorSource) 

    def parse(resp:String,parseParam:String)=???? 

,并在某处,我发短信给流量代码:

someflow ! SomeClass("a","b","c") 
someflow ! SomeClass("a1","b1","c1") 

我的问题是该方法解析应该从原来如此类

所以对于第一条消息使用参数2应该是

parse(response,"b") 

和第二条消息应该是

parse(response,"b1") 

所以现在的问题是,我怎么能取从我提交给流方法的参数?

回答

1

假设您的connection值正在通过

val connection = Http().cachedHostConnectionPool(...) 

实例可以使用该连接发生在一个元组的事实,而不是简单地传递request两次元组可以在输入SomeClass通过。此SomeClass实例将不得不经过您的每个Flow值才能进入解析阶段。

修改你的代码位:

val getDocumentFlow = 
    Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map(d => d -> f)) 

你的问题没有说明从getDocumentById返回类型,所以我只是用Document

val documentToRequest = 
    Flow[(Document, SomeClass)] map { case (document, someClass) => 
    val request = ... 

    (request, someClass) 
    } 

val parseResponse = 
    Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){ 
    case (Success(HttpResponse(status, _, entity, _)), someClass) => 
     entity 
     .dataBytes 
     .runFold(ByteString(""))(_ ++ _) 
     .map(e => e -> someClass) 
    } 

val parseEntity = Flow[(ByteString, SomeClass)] map { 
    case (entity, someClass) => parse(entity.utf8String, someClass) 
} 

这些流可以被用来作为在问题中描述:

val someFlow = 
    someClassActorSource 
    .via(getDocumentFlow) 
    .via(documentToRequest) 
    .via(connection) 
    .via(parseResponse) 
    .via(parseEntity) 
    .to(Sink.someSink{...}) 
    .run()