12

我一直在玩一些实验性的 Akka Streams API,我有一个用例,我想看看如何实现。对于我的用例,我有一个StreamTcp基础Flow,它通过将连接的输入流绑定到我的服务器套接字来提供。我拥有的流程基于ByteString进入其中的数据。传入的数据将有一个分隔符,这意味着我应该将分隔符之前的所有内容视为一条消息,并将下一个分隔符之后的所有内容视为下一条消息。所以玩一个更简单的例子,不使用套接字,只使用静态文本,这就是我想出的:

import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString

object BasicTransformation {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")

    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")

    Flow(data).
      splitWhen(c => c == '.').
      foreach{producer => 
        Flow(producer).
          filter(c => c != '.').
          fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
          map(_.toString).
          filter(!_.isEmpty).
          foreach(println(_)).
          consume(FlowMaterializer(MaterializerSettings()))
      }.
      onComplete(FlowMaterializer(MaterializerSettings())) {
        case any =>
          system.shutdown
      }
  }
}

Flow我发现实现我的目标的主要功能是splitWhen,然后生成额外的子流,每个.分隔符对应每个消息。然后,我使用另一个步骤管道处理每个子流,最后在最后打印各个消息。

这一切似乎有点冗长,以完成我认为非常简单和常见的用例。所以我的问题是,是否有一种更简洁、更简洁的方式来执行此操作,或者这是通过分隔符拆分流的正确和首选方式?

4

4 回答 4

13

看起来 API 最近经过改进以包含akka.stream.scaladsl.Framing。该文档还包含如何使用它的示例。关于您的具体问题:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Framing, Source}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory

object TcpDelimiterBasedMessaging extends App {
  object chunks {
    val first = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
    val second = ByteString("More text.delimited by.a period.")
  }

  implicit val system = ActorSystem("delimiter-based-messaging", ConfigFactory.defaultReference())
  implicit val dispatcher = system.dispatcher
  implicit val materializer = ActorMaterializer()

  Source(chunks.first :: chunks.second :: Nil)
    .via(Framing.delimiter(ByteString("."), Int.MaxValue))
    .map(_.utf8String)
    .runForeach(println)
    .onComplete(_ => system.terminate())
}

产生以下输出: Lorem Ipsum is simply Dummy text of the printing And typesetting industry More text delimited by a period

于 2016-03-27T02:53:57.917 回答
2

现在在 akka-streams 文档中的 Streams Cookbook 中发布了类似的示例代码,位于Parsing lines from a stream of ByteStrings

于 2015-02-08T15:25:35.190 回答
2

我认为安德烈的使用Framing是您问题的最佳解决方案,但我遇到了类似的问题,发现Framing太有限了。我statefulMapConcat改为使用它允许您使用您喜欢的任何规则对输入的 ByteString 进行分组。这是您的问题的代码,以防它帮助任何人:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

object BasicTransformation extends App {

  implicit val system = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()
  implicit val dispatcher = system.dispatcher
  val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")

  val grouping = Flow[Byte].statefulMapConcat { () =>
    var bytes = ByteString()
    byt =>
      if (byt == '.') {
        val string = bytes.utf8String
        bytes = ByteString()
        List(string)
      } else {
        bytes :+= byt
        Nil
      }
  }

  Source(data).via(grouping).runForeach(println).onComplete(_ => system.terminate())
}

产生: Lorem Ipsum is simply Dummy text of the printing And typesetting industry

于 2016-11-02T09:49:55.983 回答
1

在 Akka 用户组上发布同样的问题后,我从 Endre Varga 和 Viktor Klang ( https://groups.google.com/forum/#!topic/akka-user/YsnwIAjQ3EE ) 那里得到了一些建议。我最终接受了 Endre 对 a 的建议,Transformer然后transformFlow. 下面是我之前示例的略微修改版本:

import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
import akka.stream.Transformer
import akka.util.ByteStringBuilder

object BasicTransformation {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")                           
    implicit val mater = FlowMaterializer(MaterializerSettings())

    val data = List(
      ByteString("Lorem Ipsum is"), 
      ByteString(" simply.Dummy text of.The prin"), 
      ByteString("ting.And typesetting industry.")
    )
    Flow(data).transform(new PeriodDelimitedTransformer).foreach(println(_))
  }
}

定义PeriodDelimitedTransformer如下:

class PeriodDelimitedTransformer extends Transformer[ByteString,String]{
  val buffer = new ByteStringBuilder

  def onNext(msg:ByteString) = {    
    val msgString = msg.utf8String
    val delimIndex = msgString.indexOf('.')
    if (delimIndex == -1){
      buffer.append(msg)
      List.empty
    }
    else{
      val parts = msgString.split("\\.")
      val endsWithDelim = msgString.endsWith(".")

      buffer.putBytes(parts.head.getBytes())
      val currentPiece = buffer.result.utf8String            
      val otherPieces = parts.tail.dropRight(1).toList

      buffer.clear
      val lastPart = 
      if (endsWithDelim){
          List(parts.last)
      }
      else{
          buffer.putBytes(parts.last.getBytes())
          List.empty
      }           


      val result = currentPiece :: otherPieces ::: lastPart
      result
    }

  }  
}

因此,我之前的解决方案的一些复杂性被汇总到了 thisTransformer中,但这似乎是最好的方法。在我最初的解决方案中,流最终被分成多个子流,这并不是我真正想要的。

于 2014-09-03T16:30:34.010 回答