我想将数据从一个演员的流发送到另一个演员的流。最后,这应该与远程参与者一起工作。使用Akka.NET Streams这应该是一件容易的事,但也许我误解了它。
这是SenderActor的一部分:
var stream = new FileStream(FILENAME, FileMode.Open);
var materializer = Context.Materializer();
var source = StreamConverters.FromInputStream(() => stream, CHUNK_SIZE);
var result = source.To(Sink.ActorRef<ByteString>(this.receiver, new Messages.StreamComplete()))
.Run(materializer);
注意:this.receiver
是IActorRef
数据应该发送到的地方。
ReceiverActor 现在在流结束后获取所有ByteString
消息。Messages.StreamCompleted
这怎么能很容易地放在 ReceiverActor 中呢?在最好的情况下Stream
再次。
在ReceiverActor中,我尝试将所有ByteString
消息发送到Source
应该填写 a 的 a MemoryStream
:
class ReceiverActor : ReceiveActor
{
private readonly IActorRef streamReceiver;
private readonly MemoryStream stream;
public ReceiverActor()
{
this.stream = new MemoryStream();
this.streamReceiver = Source.ActorRef<ByteString>(128, Akka.Streams.OverflowStrategy.Fail)
.To(StreamConverters.FromOutputStream(() => this.stream, true))
.Run(Context.Materializer());
Context.Watch(this.streamReceiver);
Receive<ByteString>((message) => ReceivedStreamChunk(message));
Receive<Messages.StreamComplete>((message) => ReceivedStreamComplete(message));
Receive<Terminated>((message) => ReceivedTerminated(message));
ReceiveAny((message) => ReceivedAnyMessage(message));
}
private void ReceivedTerminated(Terminated message)
{
Console.WriteLine($"[receiver] {message.ActorRef.Path.ToStringWithoutAddress()} terminated, local stream length {(this.stream.CanRead ? this.stream.Length : -1)}");
}
private void ReceivedStreamComplete(Messages.StreamComplete message)
{
Console.WriteLine($"[receiver] got signaled that the stream completed, local stream length {this.stream.Length}");
}
private void ReceivedStreamChunk(object message)
{
Console.WriteLine($"[receiver] got chunk, previous stream length {this.stream.Length}");
this.streamReceiver.Forward(message);
}
private void ReceivedAnyMessage(object message)
{
Console.WriteLine($"[receiver] got message {message.GetType().FullName}");
}
}
但是它MemoryStream
是异步填充的,当streamReceiver
终止时它会关闭流,所以我无法获取数据。
如何正确检索流?
更新我在本地工作:
感谢来自 Akka.NET 的 gitter 频道中 Horusiath 的输入,我能够直接接收ByteStrings
,而不必在那里使用 Akka.Stream 。
Receive<ByteString>((message) => ReceivedStreamChunk(message));
和:
private void ReceivedStreamChunk(ByteString message)
{
var bytes = message.ToArray();
targetStream.Write(bytes, 0, bytes.Length);
Console.WriteLine($"[receiver] got chunk, now stream length {this.stream.Length}");
}
请注意,Akka.NET 1.3 会将方法WriteTo(Stream)
和添加WriteToAsync(Stream, CancellationToken)
到ByteString
.
这仍然不适用于远程参与者,因为接收参与者系统收到此错误(序列化程序是 Hyperion):
错误 [没有为此对象定义无参数构造函数。]
实际上ByteString
有一个无参数的构造函数,但它是protected
.
我认为它ByteString
不可序列化?