2015-05-19 84 views
2

我正在想出一个解决方案,将我收到的串流分成多个Strings。我一直在研究,看起来在Akka-Streams的早期版本中,有一个Transformer类,你可以扩展来做这种类型的转换。在Akka-Streams中分流内流

在我使用的版本(RC2)中有Stage s,但我不确定如何实现分割模式。

Source.actorPublisher[String](MyActor.props). 
.XXXXX(_.split("\n")) 
.map(...) 
.to(Sink(...)) 

我要找的XXXXX组件,让我输入一个String并返回String序列,将发出每一个到流的其余部分。

+2

您可以如用'mapConcat'结果元素总是仅依赖于单个输入元素。如果依赖关系更复杂,则可以使用(有状态)阶段。 – jrudolph

+2

除此之外,通常'mapConcat'可以用'flatMap'来考虑。名字不同,因为一些单子法不会成立。 – almendar

回答

3

我同意@jrudolph认为mapConcat可能是你要找的。一个简单的例子示出了在操作此方法:

val strings = List(
    """hello 
    world 
    test 
    this""", 
    """foo 
    bar 
    baz 
    """ 

) 

    implicit val system = ActorSystem("test") 
    implicit val mater = ActorFlowMaterializer() 
    Source(strings). 
    mapConcat(_.split("\n").map(_.trim).toList). 
    runForeach(println) 

如果运行该代码你将看到以下打印出来:

hello 
world 
test 
this 
foo  
bar 
baz 
+0

这就是我一直在寻找的!谢谢! – hveiga

1

阿卡提供Framing辅助函数用于这种类型的问题。

假设你的字符集是UTF-8,你可以写一个函数,在分隔String值的最大尺寸,并返回一个Flow可以进行拆分:

import akka.stream.scaladsl.Framing 
import akka.util.ByteString 

val newLineSplitter : (Int) => Flow[String, String, NotUsed] = 
    (maxLineSize) => 
    Flow[String] 
     .map(ByteString.apply) 
     .via(Framing delimiter (ByteString("\n"), maxLineSize)) 
     .via(Flow[ByteString] map (_.utf8String))