2

我想将数据从一个演员的流发送到另一个演员的流。最后,这应该与远程参与者一起工作。使用Akka.NET Streams这应该是一件容易的事,但也许我误解了它。

这是SenderActor的一部分:

var stream = new FileStream(FILENAME, FileMode.Open);
var materializer = Context.Materializer();
var source = StreamConverters.FromInputStream(() => stream, CHUNK_SIZE);
var result = source.To(Sink.ActorRef<ByteString>(this.receiver, new Messages.StreamComplete()))
    .Run(materializer);

注意:this.receiverIActorRef数据应该发送到的地方。

ReceiverActor 现在在流结束后获取所有ByteString消息。Messages.StreamCompleted

这怎么能很容易地放在 ReceiverActor 中呢?在最好的情况下Stream再次。

ReceiverActor中,我尝试将所有ByteString消息发送到Source应该填写 a 的 a MemoryStream

class ReceiverActor : ReceiveActor
{
    private readonly IActorRef streamReceiver;
    private readonly MemoryStream stream;

    public ReceiverActor()
    {
        this.stream = new MemoryStream();
        this.streamReceiver = Source.ActorRef<ByteString>(128, Akka.Streams.OverflowStrategy.Fail)
            .To(StreamConverters.FromOutputStream(() => this.stream, true))
            .Run(Context.Materializer());
        Context.Watch(this.streamReceiver);

        Receive<ByteString>((message) => ReceivedStreamChunk(message));
        Receive<Messages.StreamComplete>((message) => ReceivedStreamComplete(message));
        Receive<Terminated>((message) => ReceivedTerminated(message));
        ReceiveAny((message) => ReceivedAnyMessage(message));
    }
    private void ReceivedTerminated(Terminated message)
    {
        Console.WriteLine($"[receiver] {message.ActorRef.Path.ToStringWithoutAddress()} terminated, local stream length {(this.stream.CanRead ? this.stream.Length : -1)}");
    }
    private void ReceivedStreamComplete(Messages.StreamComplete message)
    {
        Console.WriteLine($"[receiver] got signaled that the stream completed, local stream length {this.stream.Length}");
    }
    private void ReceivedStreamChunk(object message)
    {
        Console.WriteLine($"[receiver] got chunk, previous stream length {this.stream.Length}");
        this.streamReceiver.Forward(message);
    }
    private void ReceivedAnyMessage(object message)
    {
        Console.WriteLine($"[receiver] got message {message.GetType().FullName}");
    }
}

但是它MemoryStream是异步填充的,当streamReceiver终止时它会关闭流,所以我无法获取数据。

如何正确检索流?


更新我在本地工作:

感谢来自 Akka.NET 的 gitter 频道中 Horusiath 的输入,我能够直接接收ByteStrings,而不必在那里使用 Akka.Stream 。

Receive<ByteString>((message) => ReceivedStreamChunk(message));

和:

private void ReceivedStreamChunk(ByteString message)
{
    var bytes = message.ToArray();
    targetStream.Write(bytes, 0, bytes.Length);
    Console.WriteLine($"[receiver] got chunk, now stream length {this.stream.Length}");
}

请注意,Akka.NET 1.3 会将方法WriteTo(Stream)和添加WriteToAsync(Stream, CancellationToken)ByteString.

这仍然不适用于远程参与者,因为接收参与者系统收到此错误(序列化程序是 Hyperion):

错误 [没有为此对象定义无参数构造函数。]

实际上ByteString有一个无参数的构造函数,但它是protected.

我认为它ByteString不可序列化?

4

1 回答 1

0

通过在源和接收器之间插入转换流ByteString来转换为时,我让它与远程参与者一起工作。byte[]

例子:

FileIO.FromFile(...).Via(Flow.Create<ByteString>().Select(x => x.ToArray())).To(...)

于 2018-04-12T13:51:02.490 回答