需要:具有 TCP 连接的长时间运行程序
AC# 4.0 (VS1010, XP) 程序需要使用 TCP 连接到主机,发送和接收字节,有时会正确关闭连接并稍后重新打开。周围的代码是使用 Rx.NetObservable
风格编写的。数据量很低,但程序应该连续运行(通过妥善处理资源来避免内存泄漏)。
下面的文字很长,因为我解释了我搜索和发现的内容。它现在似乎起作用了。
总体问题是:由于 Rx 有时不直观,解决方案是否良好?那会可靠吗(比如说,它可以运行多年而不会出现问题)吗?
到目前为止的解决方案
发送
该程序获得NetworkStream
这样的:
TcpClient tcpClient = new TcpClient();
LingerOption lingerOption = new LingerOption(false, 0); // Make sure that on call to Close(), connection is closed immediately even if some data is pending.
tcpClient.LingerState = lingerOption;
tcpClient.Connect(remoteHostPort);
return tcpClient.GetStream();
异步发送很容易。Rx.Net 允许使用比传统解决方案更短、更简洁的代码来处理这个问题。我用EventLoopScheduler
. 需要发送的操作用 表示IObservable
。使用ObserveOn(sendRecvThreadScheduler)
保证所有发送操作都在该线程上完成。
sendRecvThreadScheduler = new EventLoopScheduler(
ts =>
{
var thread = new System.Threading.Thread(ts) { Name = "my send+receive thread", IsBackground = true };
return thread;
});
// Loop code for sending not shown (too long and off-topic).
到目前为止,这是出色且完美的。
收到
看起来,为了接收数据,Rx.Net 还应该允许比传统解决方案更短、更简洁的代码。在阅读了几个资源(例如http://www.introtorx.com/)和 stackoverflow 之后,似乎一个非常简单的解决方案是将异步编程桥接到 Rx.Net 就像在https://stackoverflow.com/a/14464068 /1429390:
public static class Ext
{
public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
{
// to hold read data
var buffer = new byte[bufferSize];
// Step 1: async signature => observable factory
var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
stream.BeginRead,
stream.EndRead);
return Observable.While(
// while there is data to be read
() => stream.CanRead,
// iteratively invoke the observable factory, which will
// "recreate" it such that it will start from the current
// stream position - hence "0" for offset
Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
.Select(readBytes => buffer.Take(readBytes).ToArray()));
}
}
它主要工作。我可以发送和接收字节。
关闭时间
这是事情开始出错的时候。
有时我需要关闭流并保持清洁。基本上这意味着:停止读取,结束接收字节的 observable,打开一个新的连接。
一方面,当远程主机强制关闭连接时,BeginRead()/EndRead()
立即循环消耗所有返回零字节的 CPU。我让更高级别的代码注意到这一点(在高级元素可用的上下文中)和清理(包括关闭和处理流)Subscribe()
。ReadObservable
这也很有效,我负责处理Subscribe()
.
someobject.readOneStreamObservableSubscription = myobject.readOneStreamObservable.Subscribe(buf =>
{
if (buf.Length == 0)
{
MyLoggerLog("Read explicitly returned zero bytes. Closing stream.");
this.pscDestroyIfAny();
}
});
有时,我只需要关闭流。但显然这必须导致在异步读取中抛出异常。c# - 过早中止 BeginRead 和 BeginWrite 的正确方法?- 堆栈溢出
我添加了一个CancellationToken
导致Observable.While()
序列结束的。这对避免这些异常没有多大帮助,因为BeginRead()
可以睡很长时间。
observable 中未处理的异常导致程序退出。搜索提供的.net - 出现异常后继续使用订阅 - 堆栈溢出建议添加一个Catch以有效地恢复损坏Observable
的空的。
代码如下所示:
public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize, CancellationToken token)
{
// to hold read data
var buffer = new byte[bufferSize];
// Step 1: async signature => observable factory
var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
stream.BeginRead,
stream.EndRead);
return Observable.While(
// while there is data to be read
() =>
{
return (!token.IsCancellationRequested) && stream.CanRead;
},
// iteratively invoke the observable factory, which will
// "recreate" it such that it will start from the current
// stream position - hence "0" for offset
Observable.Defer(() =>
{
if ((!token.IsCancellationRequested) && stream.CanRead)
{
return asyncRead(buffer, 0, bufferSize);
}
else
{
return Observable.Empty<int>();
}
})
.Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.
.Select(readBytes => buffer.Take(readBytes).ToArray()));
}
现在怎么办?问题
这似乎运作良好。检测到远程主机强行关闭连接或不再可达的情况,导致更高级别的代码关闭连接并重试。到目前为止,一切都很好。
我不确定事情是否完全正确。
一方面,该行:
.Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.
感觉就像命令式代码中空 catch 块的不好做法。实际代码确实记录了异常,更高级别的代码检测到没有回复并正确处理,所以应该认为它还可以(见下文)?
.Catch((Func<Exception, IObservable<int>>)(ex =>
{
MyLoggerLogException("On asynchronous read from network.", ex);
return Observable.Empty<int>();
})) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.
此外,这确实比大多数传统解决方案要短。
解决方案是正确的还是我错过了一些更简单/更清洁的方法?
是否有一些可怕的问题对于响应式扩展的向导来说是显而易见的?
感谢您的关注。