5

我们正在开发用于流式传输大量数据的 WCF 服务,因此我们选择使用WCF 流式传输功能与protobuf-net序列化相结合。

语境:

通常一个想法是序列化服务中的对象,将它们写入流并发送。在另一端,调用者将收到一个 Stream 对象,它可以读取所有数据。

所以目前服务方法代码看起来有点像这样:

public Result TestMethod(Parameter parameter)
{
    // Create response
    var responseObject = new BusinessResponse { Value = "some very large data"};

    // The resposne have to be serialized in advance to intermediate MemoryStream
    var stream = new MemoryStream();
    serializer.Serialize(stream, responseObject);
    stream.Position = 0;

    // ResultBody is a stream, Result is a MessageContract
    return new Result {ResultBody = stream};
}

BusinessResponse 对象被序列化为 MemoryStream 并从方法返回。在客户端,调用代码如下所示:

var parameter = new Parameter();

// Call the service method
var methodResult = channel.TestMethod(parameter);

// protobuf-net deserializer reads from a stream received from a service.
// while reading is performed by protobuf-net, 
// on the service side WCF is actually reading from a 
// memory stream where serialized message is stored
var result = serializer.Deserialize<BusinessResponse>(methodResult.ResultBody);
return result;

因此,当serializer.Deserialize()被调用时,它会从流中读取methodResult.ResultBody,同时在服务端 WCF 正在读取已从TestMethod.

问题:

我们想要实现的是立即摆脱MemoryStream服务端整个对象的初始序列化。由于我们使用流式传输,我们希望避免在发送之前将序列化对象保留在内存中。

主意:

完美的解决方案是返回一个空的、定制的 Stream 对象(来自TestMethod()),并引用一个要序列化的对象(在我的示例中为“BusinessResponse”对象)。因此,当 WCF 调用Read()我的流的方法时,我在内部使用 protobuf-net 序列化对象的一部分并将其返回给调用者,而不将其存储在内存中。

现在有一个问题,因为我们真正需要的是在读取流的那一刻,可以将一个对象逐个序列化。我知道这是完全不同的序列化方式 - 我不想将对象推送到序列化程序,而是希望逐个请求序列化内容。

使用 protobuf-net 可以以某种方式进行这种序列化吗?

4

1 回答 1

2

我编写了一些代码,可能与 Marc 的门想法一致。

public class PullStream : Stream
{
    private byte[] internalBuffer;
    private bool ended;
    private static ManualResetEvent dataAvailable = new ManualResetEvent(false);
    private static ManualResetEvent dataEmpty = new ManualResetEvent(true);

    public override bool CanRead
    {
        get { return true; }
    }

    public override bool CanSeek
    {
        get { return false; }
    }

    public override bool CanWrite
    {
        get { return true; }
    }

    public override void Flush()
    {
        throw new NotImplementedException();
    }

    public override long Length
    {
        get { throw new NotImplementedException(); }
    }

    public override long Position
    {
        get
        {
            throw new NotImplementedException();
        }
        set
        {
            throw new NotImplementedException();
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        dataAvailable.WaitOne();
        if ( count >= internalBuffer.Length)
        {
            var retVal = internalBuffer.Length;
            Array.Copy(internalBuffer, buffer, retVal);
            internalBuffer = null;
            dataAvailable.Reset();
            dataEmpty.Set();
            return retVal;
        }
        else
        {
            Array.Copy(internalBuffer, buffer, count);
            internalBuffer = internalBuffer.Skip(count).ToArray(); // i know
            return count;
        }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        dataEmpty.WaitOne();
        dataEmpty.Reset();

        internalBuffer = new byte[count];
        Array.Copy(buffer, internalBuffer, count);

        Debug.WriteLine("Writing some data");

        dataAvailable.Set();
    }

    public void End()
    {
        dataEmpty.WaitOne();
        dataEmpty.Reset();

        internalBuffer = new byte[0];

        Debug.WriteLine("Ending writes");

        dataAvailable.Set();
    }
}

这是一个简单的流后代类,仅实现读取和写入(和结束)。没有数据可用时读取阻塞,数据可用时写入阻塞。这种方式只涉及一个字节缓冲区。其余部分的 linq 复制已打开以进行优化;-) 添加了 End 方法,因此在没有数据可用且不再写入数据时执行 Read 时不会发生阻塞。

您必须从单独的线程写入此流。我在下面展示这个:

    // create a large object
    var obj = new List<ToSerialize>();
    for(int i = 0; i <= 1000; i ++)
        obj.Add(new ToSerialize { Test = "This is my very loooong message" });
    // create my special stream to read from
    var ms = new PullStream();
    new Thread(x =>
    {
        ProtoBuf.Serializer.Serialize(ms, obj);
        ms.End();
    }).Start();
    var buffer = new byte[100];
    // stream to write back to (just to show deserialization is working too)
    var ws = new MemoryStream();
    int read;
    while ((read = ms.Read(buffer, 0, 100)) != 0)
    {
        ws.Write(buffer, 0, read);
        Debug.WriteLine("read some data");
    }
    ws.Position = 0;
    var back = ProtoBuf.Serializer.Deserialize<List<ToSerialize>>(ws);

我希望这可以解决您的问题 :-) 无论如何编写代码很有趣。

问候, 雅科

于 2013-03-13T21:52:21.390 回答