2

在学习如何使用 akka I/OI 时,我试图在 akka i/o 之上实现一个简单的协议,并遵循此处的文档。

但是在我的 gradle 文件中,我使用 2.3.9 版本,如下所示

dependencies {
    compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.7'
    compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.3.9'
    compile group: 'com.typesafe.akka', name: 'akka-contrib_2.11', version: '2.3.9'
    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.5'
    testCompile group: 'junit', name: 'junit', version: '4.11'
}

导入一些特定于管道的东西,比如

import akka.io.SymmetricPipelineStage;
import akka.io.PipelineContext;
import akka.io.SymmetricPipePair;

generate 无法解决符号错误。

因此我的问题。

  1. 是这些被删除还是我需要添加到我的 gradle 文件中的一些依赖项。
  2. 如果它们被删除,编码/解码阶段将如何处理?
4

2 回答 2

6

管道是实验性的,实际上在 Akka 2.3 中被删除。迁移指南 2.2.x 到 2.3.x中记录了删除。

这里还提到了能够用 Akka 2.3 打包“旧”管道实现,尽管它似乎不是简单地添加依赖项。

我敢打赌,Akka Streams 旨在成为 Akka 2.4 中更好的管道替代品,但现在可以作为实验模块使用。可以通过将 Akka Streams 与 Akka I/O 结合使用来处理编码/解码阶段或协议层。

于 2015-03-28T17:24:05.287 回答
0

是的,管道在没有任何替代方案的情况下被移除。我来自 Netty 世界,并没有发现管道“不直观”——它们积累缓冲区并为儿童演员提供随时可用的消息。

看看我们的解决方案,它需要"org.scalaz" %% "scalaz-core" % 7.2.14作为依赖项。

Codec 类是一个Statemonad,它被 actor 调用并产生输出。在我们正在使用的项目中Varint32 protobuf encoding,因此每条消息都带有varint32长度字段:

import com.google.protobuf.CodedInputStream
import com.trueaccord.scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message}
import com.zeptolab.tlc.front.codecs.Varint32ProtoCodec.ProtoMessage

import scalaz.{-\/, State, \/, \/-}

trait Accumulator
trait Codec[IN, OUT] {

  type Stream = State[Accumulator, Seq[IN]]

  def decode(buffer: Array[Byte]): Throwable \/ IN

  def encode(message: OUT): Array[Byte]

  def emptyAcc: Accumulator

  def decodeStream(data: Array[Byte]): Stream

}

object Varint32ProtoCodec {

  type ProtoMessage[T] = GeneratedMessage with Message[T]

  def apply[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) = new Varint32ProtoCodec[IN, OUT](protoType)

}

class Varint32ProtoCodec[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) extends Codec[IN, OUT] {

  import com.google.protobuf.CodedOutputStream

  private case class AccumulatorImpl(expected: Int = -1, buffer: Array[Byte] = Array.empty) extends Accumulator

  override def emptyAcc: Accumulator = AccumulatorImpl()

  override def decode(buffer: Array[Byte]): Throwable \/ IN = {
    \/.fromTryCatchNonFatal {
      val dataLength = CodedInputStream.newInstance(buffer).readRawVarint32()
      val bufferLength = buffer.length
      val dataBuffer = buffer.drop(bufferLength - dataLength)
      protoType.parseFrom(dataBuffer)
    }
  }

  override def encode(message: OUT): Array[Byte] = {
    val messageBuf = message.toByteArray
    val messageBufLength = messageBuf.length
    val prependLength = CodedOutputStream.computeUInt32SizeNoTag(messageBufLength)
    val prependLengthBuffer = new Array[Byte](prependLength)
    CodedOutputStream.newInstance(prependLengthBuffer).writeUInt32NoTag(messageBufLength)
    prependLengthBuffer ++ messageBuf
  }

  override def decodeStream(data: Array[Byte]): Stream = State {
    case acc: AccumulatorImpl =>
      if (data.isEmpty) {
        (acc, Seq.empty)
      } else {
        val accBuffer = acc.buffer ++ data
        val accExpected = readExpectedLength(accBuffer, acc)
        if (accBuffer.length >= accExpected) {
          val (frameBuffer, restBuffer) = accBuffer.splitAt(accExpected)
          val output = decode(frameBuffer) match {
            case \/-(proto) => Seq(proto)
            case -\/(_) => Seq.empty
          }
          val (newAcc, recOutput) = decodeStream(restBuffer).run(emptyAcc)
          (newAcc, output ++ recOutput)
        } else (AccumulatorImpl(accExpected, accBuffer), Seq.empty)
      }
    case _ => (emptyAcc, Seq.empty)
  }

  private def readExpectedLength(data: Array[Byte], acc: AccumulatorImpl) = {
    if (acc.expected == -1 && data.length >= 1) {
      \/.fromTryCatchNonFatal {
        val is = CodedInputStream.newInstance(data)
        val dataLength = is.readRawVarint32()
        val tagLength = is.getTotalBytesRead
        dataLength + tagLength
      }.getOrElse(acc.expected)
    } else acc.expected
  }

}

演员是:

import akka.actor.{Actor, ActorRef, Props}
import akka.event.Logging
import akka.util.ByteString
import com.zeptolab.tlc.front.codecs.{Accumulator, Varint32ProtoCodec}
import com.zeptolab.tlc.proto.protocol.{Downstream, Upstream}

object FrameCodec {
  def props() = Props[FrameCodec]
}

class FrameCodec extends Actor {

  import akka.io.Tcp._

  private val logger       = Logging(context.system, this)
  private val codec        = Varint32ProtoCodec[Upstream, Downstream](Upstream)
  private val sessionActor = context.actorOf(Session.props())

  def receive = {
    case r: Received =>
      context become stream(sender(), codec.emptyAcc)
      self ! r
    case PeerClosed => peerClosed()
  }

  private def stream(ioActor: ActorRef, acc: Accumulator): Receive = {
    case Received(data) =>
      val (next, output) = codec.decodeStream(data.toArray).run(acc)
      output.foreach { up =>
        sessionActor ! up
      }
      context become stream(ioActor, next)
    case d: Downstream =>
      val buffer = codec.encode(d)
      ioActor ! Write(ByteString(buffer))
    case PeerClosed => peerClosed()
  }

  private def peerClosed() = {
    logger.info("Connection closed")
    context stop self
  }

}
于 2017-08-02T09:55:11.770 回答