2014-09-02 54 views
10

我一直在玩Akka Streams API实验,我有一个用例,我想看看如何实现。对于我的用例,我有一个StreamTcp基于Flow,它是通过将连接的输入流绑定到我的服务器套接字来提供的。我拥有的Flow基于ByteString数据。进入的数据将会有一个分隔符,这意味着我应该将分隔符之前的所有内容作为一条消息进行处理,并将所有分隔符之后的所有内容作为下一条消息进行处理。所以,玩弄一个简单的例子,不使用插座只是静态的文字,这是我想出了:如何使用Akka Streams在分隔符上分割入站流

import akka.actor.ActorSystem 
import akka.stream.{ FlowMaterializer, MaterializerSettings } 
import akka.stream.scaladsl.Flow 
import scala.util.{ Failure, Success } 
import akka.util.ByteString 

object BasicTransformation { 

    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("Sys") 

    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 

    Flow(data). 
     splitWhen(c => c == '.'). 
     foreach{producer => 
     Flow(producer). 
      filter(c => c != '.'). 
      fold(new StringBuilder)((sb, c) => sb.append(c.toChar)). 
      map(_.toString). 
      filter(!_.isEmpty). 
      foreach(println(_)). 
      consume(FlowMaterializer(MaterializerSettings())) 
     }. 
     onComplete(FlowMaterializer(MaterializerSettings())) { 
     case any => 
      system.shutdown 
     } 
    } 
} 

Flow主要功能,我发现来完成我的目标是splitWhen,然后产生额外的子流程,每个符合.定界符的每个消息。然后我用另一个步骤流程处理每个子流程,最后打印单个消息。

这一切似乎有点冗长,完成我认为是一个非常简单和常见的用例。所以我的问题是,是否有一个更清洁,更不详细的方式来做到这一点,或者这是一个正确和首选的方式来分隔一个流的分隔符?

回答

1

在Akka用户组发布相同的问题后,我从Endre Varga和Viktor Klang(https://groups.google.com/forum/#!topic/akka-user/YsnwIAjQ3EE)得到了一些建议。我结束了与Endre的Transformer的建议,然后在Flow上使用transform方法。我前面的例子稍加修改的版本载于下文:

import akka.actor.ActorSystem 
import akka.stream.{ FlowMaterializer, MaterializerSettings } 
import akka.stream.scaladsl.Flow 
import scala.util.{ Failure, Success } 
import akka.util.ByteString 
import akka.stream.Transformer 
import akka.util.ByteStringBuilder 

object BasicTransformation { 

    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("Sys")       
    implicit val mater = FlowMaterializer(MaterializerSettings()) 

    val data = List(
     ByteString("Lorem Ipsum is"), 
     ByteString(" simply.Dummy text of.The prin"), 
     ByteString("ting.And typesetting industry.") 
    ) 
    Flow(data).transform(new PeriodDelimitedTransformer).foreach(println(_)) 
    } 
} 

随着PeriodDelimitedTransformer定义为以下几点:

class PeriodDelimitedTransformer extends Transformer[ByteString,String]{ 
    val buffer = new ByteStringBuilder 

    def onNext(msg:ByteString) = {  
    val msgString = msg.utf8String 
    val delimIndex = msgString.indexOf('.') 
    if (delimIndex == -1){ 
     buffer.append(msg) 
     List.empty 
    } 
    else{ 
     val parts = msgString.split("\\.") 
     val endsWithDelim = msgString.endsWith(".") 

     buffer.putBytes(parts.head.getBytes()) 
     val currentPiece = buffer.result.utf8String    
     val otherPieces = parts.tail.dropRight(1).toList 

     buffer.clear 
     val lastPart = 
     if (endsWithDelim){ 
      List(parts.last) 
     } 
     else{ 
      buffer.putBytes(parts.last.getBytes()) 
      List.empty 
     }   


     val result = currentPiece :: otherPieces ::: lastPart 
     result 
    } 

    } 
} 

所以一些我以前的解决方案的复杂性被卷成这个Transformer ,但这似乎是最好的方法。在我最初的解决方案中,流最终分裂成多个子流,这不是我想要的。

1

有一些示例代码在的akka​​流文档中现在发布在Streams Cookbook中。

10

它看起来像API最近改进,包括akka.stream.scaladsl.Framing。该文档还包含如何使用它的example。关于你的具体问题:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Framing, Source} 
import akka.util.ByteString 
import com.typesafe.config.ConfigFactory 

object TcpDelimiterBasedMessaging extends App { 
    object chunks { 
    val first = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 
    val second = ByteString("More text.delimited by.a period.") 
    } 

    implicit val system = ActorSystem("delimiter-based-messaging", ConfigFactory.defaultReference()) 
    implicit val dispatcher = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    Source(chunks.first :: chunks.second :: Nil) 
    .via(Framing.delimiter(ByteString("."), Int.MaxValue)) 
    .map(_.utf8String) 
    .runForeach(println) 
    .onComplete(_ => system.terminate()) 
} 

产生以下输出: Lorem Ipsum is simply Dummy text of the printing And typesetting industry More text delimited by a period

+0

完美!这应该是被接受的答案。 另请注意,它将跨越大块工作。与尝试: 'VAL第一=字节串( “Lorem存有是printing.And排版的simply.Dummy文本”)' 'VAL第二=字节串(” industry.More text.delimited by.a周期。 “)' – 2017-04-04 16:46:53

0

我觉得安德烈的使用Framing是你的问题的最佳解决方案,但我也有类似的问题,发现Framing是太有限了。我使用了statefulMapConcat,它允许您使用您喜欢的任何规则将输入的ByteString分组。下面的代码对你的问题的情况下,它可以帮助任何人:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Flow, Source} 
import akka.util.ByteString 

object BasicTransformation extends App { 

    implicit val system = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 
    implicit val dispatcher = system.dispatcher 
    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 

    val grouping = Flow[Byte].statefulMapConcat {() => 
    var bytes = ByteString() 
    byt => 
     if (byt == '.') { 
     val string = bytes.utf8String 
     bytes = ByteString() 
     List(string) 
     } else { 
     bytes :+= byt 
     Nil 
     } 
    } 

    Source(data).via(grouping).runForeach(println).onComplete(_ => system.terminate()) 
} 

主要生产: Lorem Ipsum is simply Dummy text of the printing And typesetting industry