使用 ReadUntilClosedObservable1 从 NetworkStream 读取数据时,返回的数据已损坏,就像某些读取数据块重叠一样。
但是,当我使用 ReadUntilClosedObservable2 读取数据时,数据到达时没有问题。
我想使用 ReadUntilClosedObservable1,因为在 ReadUntilClosedObservable2 中重复读取流正在消耗 CPU。
如何按同步顺序获取消息?
更新:
return Observable.Timer(TimeSpan.Zero, interval, TaskPoolScheduler.Default)
.SelectMany(_ => readToEnd)
.Where(dataChunk => dataChunk.Length > 0);
我只是注意到readToEnd
to 在完成上一份工作之前被一次又一次地解雇。不需要同步吗?如果Observable.Timer
是问题,如果没有它,我怎样才能达到相同的效果,间隔阅读但不等待就开始?
public static IObservable<int> ReadObservable(this Stream stream, byte[] buffer
,int offset, int count)
{
return stream.ReadAsync(buffer, offset, count)
.ToObservable();
}
public static IObservable<byte[]> ReadObservable(this Stream stream,
int bufferSize)
{
var buffer = new byte[bufferSize];
return stream.ReadObservable(buffer, 0, buffer.Length)
.Select(cbRead =>
{
if (cbRead == 0)
{
return new byte[0];
}
if (cbRead == buffer.Length)
{
return buffer;
}
var dataChunk = new byte[cbRead];
Buffer.BlockCopy(buffer, 0, dataChunk,
0, cbRead);
return dataChunk;
});
}
public static IObservable<byte[]> ReadUntilClosedObservable1(this NetworkStream
stream, int bufferSize, TimeSpan interval)
{
var readToEnd = Observable.Defer(() => stream.ReadObservable(bufferSize))
.DoWhile(() => stream.DataAvailable)
.ToList()
.Select(dataChunks =>
{
var buffer = new List<byte>();
foreach (var dataChunk in dataChunks)
{
buffer.AddRange(dataChunk);
}
return buffer.ToArray();
});
return Observable.Timer(TimeSpan.Zero, interval, TaskPoolScheduler.Default)
.SelectMany(_ => readToEnd)
.Where(dataChunk => dataChunk.Length > 0);
}
public static IObservable<byte[]> ReadUntilClosedObservable2(this Stream stream
,int bufferSize)
{
return Observable.Defer(() => stream.ReadObservable(bufferSize))
.Repeat()
.Where(dataChunk => dataChunk.Length > 0);
}