0
我有下面的代码片段:阿卡流+阿卡的Http传递参数
case class SomeClass(param1:String,param2:String,param3:String)
val someClassActorSource: Source[SomeClass, ActorRef] = Source
.actorPublisher[SomeClass](Props[SomeClassActorPublisher])
val someFlow: ActorRef = Flow[SomeClass]
.mapAsync(3)(f=> getDocumentById(f))
.map(f =>{
val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test")
.withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a)
)
(request,request)
}).via(connection)
//Parsing Response
.mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)),request)=>
entity.dataBytes.runFold(ByteString(""))(_ ++ _)
}
.map(resp =>parse(resp.utf8String,?????????????))
.to(Sink.someSink{....})
.runWith(someClassActorSource)
def parse(resp:String,parseParam:String)=????
,并在某处,我发短信给流量代码:
someflow ! SomeClass("a","b","c")
someflow ! SomeClass("a1","b1","c1")
我的问题是该方法解析应该从原来如此类
所以对于第一条消息使用参数2应该是
parse(response,"b")
和第二条消息应该是
parse(response,"b1")
所以现在的问题是,我怎么能取从我提交给流方法的参数?