是的,管道在没有任何替代方案的情况下被移除。我来自 Netty 世界,并没有发现管道“不直观”——它们积累缓冲区并为儿童演员提供随时可用的消息。
看看我们的解决方案,它需要"org.scalaz" %% "scalaz-core" % 7.2.14
作为依赖项。
Codec 类是一个State
monad,它被 actor 调用并产生输出。在我们正在使用的项目中Varint32 protobuf encoding
,因此每条消息都带有varint32
长度字段:
import com.google.protobuf.CodedInputStream
import com.trueaccord.scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message}
import com.zeptolab.tlc.front.codecs.Varint32ProtoCodec.ProtoMessage
import scalaz.{-\/, State, \/, \/-}
trait Accumulator
trait Codec[IN, OUT] {
type Stream = State[Accumulator, Seq[IN]]
def decode(buffer: Array[Byte]): Throwable \/ IN
def encode(message: OUT): Array[Byte]
def emptyAcc: Accumulator
def decodeStream(data: Array[Byte]): Stream
}
object Varint32ProtoCodec {
type ProtoMessage[T] = GeneratedMessage with Message[T]
def apply[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) = new Varint32ProtoCodec[IN, OUT](protoType)
}
class Varint32ProtoCodec[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) extends Codec[IN, OUT] {
import com.google.protobuf.CodedOutputStream
private case class AccumulatorImpl(expected: Int = -1, buffer: Array[Byte] = Array.empty) extends Accumulator
override def emptyAcc: Accumulator = AccumulatorImpl()
override def decode(buffer: Array[Byte]): Throwable \/ IN = {
\/.fromTryCatchNonFatal {
val dataLength = CodedInputStream.newInstance(buffer).readRawVarint32()
val bufferLength = buffer.length
val dataBuffer = buffer.drop(bufferLength - dataLength)
protoType.parseFrom(dataBuffer)
}
}
override def encode(message: OUT): Array[Byte] = {
val messageBuf = message.toByteArray
val messageBufLength = messageBuf.length
val prependLength = CodedOutputStream.computeUInt32SizeNoTag(messageBufLength)
val prependLengthBuffer = new Array[Byte](prependLength)
CodedOutputStream.newInstance(prependLengthBuffer).writeUInt32NoTag(messageBufLength)
prependLengthBuffer ++ messageBuf
}
override def decodeStream(data: Array[Byte]): Stream = State {
case acc: AccumulatorImpl =>
if (data.isEmpty) {
(acc, Seq.empty)
} else {
val accBuffer = acc.buffer ++ data
val accExpected = readExpectedLength(accBuffer, acc)
if (accBuffer.length >= accExpected) {
val (frameBuffer, restBuffer) = accBuffer.splitAt(accExpected)
val output = decode(frameBuffer) match {
case \/-(proto) => Seq(proto)
case -\/(_) => Seq.empty
}
val (newAcc, recOutput) = decodeStream(restBuffer).run(emptyAcc)
(newAcc, output ++ recOutput)
} else (AccumulatorImpl(accExpected, accBuffer), Seq.empty)
}
case _ => (emptyAcc, Seq.empty)
}
private def readExpectedLength(data: Array[Byte], acc: AccumulatorImpl) = {
if (acc.expected == -1 && data.length >= 1) {
\/.fromTryCatchNonFatal {
val is = CodedInputStream.newInstance(data)
val dataLength = is.readRawVarint32()
val tagLength = is.getTotalBytesRead
dataLength + tagLength
}.getOrElse(acc.expected)
} else acc.expected
}
}
演员是:
import akka.actor.{Actor, ActorRef, Props}
import akka.event.Logging
import akka.util.ByteString
import com.zeptolab.tlc.front.codecs.{Accumulator, Varint32ProtoCodec}
import com.zeptolab.tlc.proto.protocol.{Downstream, Upstream}
object FrameCodec {
def props() = Props[FrameCodec]
}
class FrameCodec extends Actor {
import akka.io.Tcp._
private val logger = Logging(context.system, this)
private val codec = Varint32ProtoCodec[Upstream, Downstream](Upstream)
private val sessionActor = context.actorOf(Session.props())
def receive = {
case r: Received =>
context become stream(sender(), codec.emptyAcc)
self ! r
case PeerClosed => peerClosed()
}
private def stream(ioActor: ActorRef, acc: Accumulator): Receive = {
case Received(data) =>
val (next, output) = codec.decodeStream(data.toArray).run(acc)
output.foreach { up =>
sessionActor ! up
}
context become stream(ioActor, next)
case d: Downstream =>
val buffer = codec.encode(d)
ioActor ! Write(ByteString(buffer))
case PeerClosed => peerClosed()
}
private def peerClosed() = {
logger.info("Connection closed")
context stop self
}
}