2

需要:具有 TCP 连接的长时间运行程序

AC# 4.0 (VS1010, XP) 程序需要使用 TCP 连接到主机,发送和接收字节,有时会正确关闭连接并稍后重新打开。周围的代码是使用 Rx.NetObservable风格编写的。数据量很低,但程序应该连续运行(通过妥善处理资源来避免内存泄漏)。

下面的文字很长,因为我解释了我搜索和发现的内容。它现在似乎起作用了。

总体问题是:由于 Rx 有时不直观,解决方案是否良好?那会可靠吗(比如说,它可以运行多年而不会出现问题)吗?

到目前为止的解决方案

发送

该程序获得NetworkStream这样的:

TcpClient tcpClient = new TcpClient();
LingerOption lingerOption = new LingerOption(false, 0); // Make sure that on call to Close(), connection is closed immediately even if some data is pending.
tcpClient.LingerState = lingerOption;

tcpClient.Connect(remoteHostPort);
return tcpClient.GetStream();

异步发送很容易。Rx.Net 允许使用比传统解决方案更短、更简洁的代码来处理这个问题。我用EventLoopScheduler. 需要发送的操作用 表示IObservable。使用ObserveOn(sendRecvThreadScheduler)保证所有发送操作都在该线程上完成。

sendRecvThreadScheduler = new EventLoopScheduler(
    ts =>
    {
        var thread = new System.Threading.Thread(ts) { Name = "my send+receive thread", IsBackground = true };
        return thread;
    });
        // Loop code for sending not shown (too long and off-topic).

到目前为止,这是出色且完美的。

收到

看起来,为了接收数据,Rx.Net 还应该允许比传统解决方案更短、更简洁的代码。在阅读了几个资源(例如http://www.introtorx.com/)和 stackoverflow 之后,似乎一个非常简单的解决方案是将异步编程桥接到 Rx.Net 就像在https://stackoverflow.com/a/14464068 /1429390

public static class Ext
{
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
    {
        // to hold read data
        var buffer = new byte[bufferSize];
        // Step 1: async signature => observable factory
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead,
            stream.EndRead);
        return Observable.While(
            // while there is data to be read
            () => stream.CanRead,
            // iteratively invoke the observable factory, which will
            // "recreate" it such that it will start from the current
            // stream position - hence "0" for offset
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

它主要工作。我可以发送和接收字节。

关闭时间

这是事情开始出错的时候。

有时我需要关闭流并保持清洁。基本上这意味着:停止读取,结束接收字节的 observable,打开一个新的连接。

一方面,当远程主机强制关闭连接时,BeginRead()/EndRead()立即循环消耗所有返回零字节的 CPU。我让更高级别的代码注意到这一点(在高级元素可用的上下文中)和清理(包括关闭和处理流)Subscribe()ReadObservable这也很有效,我负责处理Subscribe().

    someobject.readOneStreamObservableSubscription = myobject.readOneStreamObservable.Subscribe(buf =>
    {
        if (buf.Length == 0)
        {
                MyLoggerLog("Read explicitly returned zero bytes.  Closing stream.");
                this.pscDestroyIfAny();
        }
    });

有时,我只需要关闭流。但显然这必须导致在异步读取中抛出异常。c# - 过早中止 BeginRead 和 BeginWrite 的正确方法?- 堆栈溢出

我添加了一个CancellationToken导致Observable.While()序列结束的。这对避免这些异常没有多大帮助,因为BeginRead()可以睡很长时间。

observable 中未处理的异常导致程序退出。搜索提供的.net - 出现异常后继续使用订阅 - 堆栈溢出建议添加一个Catch以有效地恢复损坏Observable的空的。

代码如下所示:

public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize, CancellationToken token)
{
    // to hold read data
    var buffer = new byte[bufferSize];
    // Step 1: async signature => observable factory
    var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
        stream.BeginRead,
        stream.EndRead);
    return Observable.While(
        // while there is data to be read
        () =>
        {
            return (!token.IsCancellationRequested) && stream.CanRead;
        },
        // iteratively invoke the observable factory, which will
        // "recreate" it such that it will start from the current
        // stream position - hence "0" for offset
        Observable.Defer(() =>
            {
                if ((!token.IsCancellationRequested) && stream.CanRead)
                {
                    return asyncRead(buffer, 0, bufferSize);
                }
                else
                {
                    return Observable.Empty<int>();
                }
            })
            .Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.
        .Select(readBytes => buffer.Take(readBytes).ToArray()));
}

现在怎么办?问题

这似乎运作良好。检测到远程主机强行关闭连接或不再可达的情况,导致更高级别的代码关闭连接并重试。到目前为止,一切都很好。

我不确定事情是否完全正确。

一方面,该行:

.Catch(Observable.Empty<int>()) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.

感觉就像命令式代码中空 catch 块的不好做法。实际代码确实记录了异常,更高级别的代码检测到没有回复并正确处理,所以应该认为它还可以(见下文)?

.Catch((Func<Exception, IObservable<int>>)(ex =>
{
    MyLoggerLogException("On asynchronous read from network.", ex);
    return Observable.Empty<int>();
})) // When BeginRead() or EndRead() causes an exception, don't choke but just end the Observable.

此外,这确实比大多数传统解决方案要短。

解决方案是正确的还是我错过了一些更简单/更清洁的方法?

是否有一些可怕的问题对于响应式扩展的向导来说是显而易见的?

感谢您的关注。

4

0 回答 0