我一直在玩Akka Streams API实验,我有一个用例,我想看看如何实现。对于我的用例,我有一个StreamTcp
基于Flow
,它是通过将连接的输入流绑定到我的服务器套接字来提供的。我拥有的Flow基于ByteString
数据。进入的数据将会有一个分隔符,这意味着我应该将分隔符之前的所有内容作为一条消息进行处理,并将所有分隔符之后的所有内容作为下一条消息进行处理。所以,玩弄一个简单的例子,不使用插座只是静态的文字,这是我想出了:如何使用Akka Streams在分隔符上分割入站流
import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
object BasicTransformation {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
Flow(data).
splitWhen(c => c == '.').
foreach{producer =>
Flow(producer).
filter(c => c != '.').
fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
map(_.toString).
filter(!_.isEmpty).
foreach(println(_)).
consume(FlowMaterializer(MaterializerSettings()))
}.
onComplete(FlowMaterializer(MaterializerSettings())) {
case any =>
system.shutdown
}
}
}
在Flow
主要功能,我发现来完成我的目标是splitWhen
,然后产生额外的子流程,每个符合.
定界符的每个消息。然后我用另一个步骤流程处理每个子流程,最后打印单个消息。
这一切似乎有点冗长,完成我认为是一个非常简单和常见的用例。所以我的问题是,是否有一个更清洁,更不详细的方式来做到这一点,或者这是一个正确和首选的方式来分隔一个流的分隔符?
完美!这应该是被接受的答案。 另请注意,它将跨越大块工作。与尝试: 'VAL第一=字节串( “Lorem存有是printing.And排版的simply.Dummy文本”)' 'VAL第二=字节串(” industry.More text.delimited by.a周期。 “)' – 2017-04-04 16:46:53