2

经过对 StackOverflow 的大量努力和研究——其中大部分已经过时,因为 Reactive Extensions 代码最近发生了变化——我终于能够从这个从套接字读取数据的 Observable 方法中消除所有编译错误,我理解这一点代码比我一开始的要好得多。但不完全。有人可以用英语读给我听,然后回答两三个问题吗?

是否从该方法中提取了缓冲数据(或者如果我错了,应该如何提取)?是否有不再需要的部分?虽然我真的很喜欢与我的业务代码解耦,并将所有套接字代码保存在一两个方法中,但有没有更好的方法来做到这一点(解耦和可读)?

    public static IObservable<int> WhenDataReceived(this Socket socket, int byteCount, SocketFlags flags = SocketFlags.None)
    {
        Contract.Requires(byteCount > 0);

        return Observable.Create<int>(
            observer =>
            {
                byte[] buffer = new byte[byteCount];
                int remainder = byteCount;
                bool shutdown = false;

                return Observable.Defer<int>(() =>
                        Task.Factory.FromAsync<int>(socket.BeginReceive(buffer, buffer.Length - remainder, remainder, flags,
                        (result) =>
                        {
                            var read = (int)result.AsyncState;
                            remainder -= read;

                            if (read == 0)
                                shutdown = true;
                        },
                        null), socket.EndReceive).ToObservable())
                    .TakeWhile(_ => remainder > 0 && !shutdown)
                    .TakeLast(1)
                    .Subscribe(
                        observer.OnNext,
                        ex =>
                        {
                            var socketError = ex as SocketException;

                            if (socketError != null
                                && (socketError.SocketErrorCode == SocketError.Disconnecting
                                    || socketError.SocketErrorCode == SocketError.Shutdown))
                            {
                                observer.OnCompleted();
                            }
                            else { observer.OnError(ex); }
                        },
                        observer.OnCompleted);
            });
    }
}

调用它的函数仍然有我不理解的编译错误(.Do 和 .BitConverter 有一些无效的参数):

        static IObservable<string> StartClient(this IObserver<ScanInformation> observer, IPAddress ip, int port)
    {
        var client = Observable.Using(
            () => new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp),
            socket =>
            from _ in socket.WhenConnected(ip, port)
            from message in
                (from first in socket.WhenDataReceived(4)
                 let length = BitConverter.ToInt32(first, 0)
                 from message in
                     Observable.If(
                         condition: () => length > 0,
                         thenSource: from second in socket.WhenDataReceived(length)
                                     select Encoding.UTF8.GetString(second, 0, length),
                         elseSource: Observable.Return<string>(null))
                 select message)
                .Repeat()
                .TakeWhile(message => message != null)
            select message);

        return
            client.Do(observer).TakeLast(1);
    }
4

1 回答 1

5

这两个编译错误都是由于传递了错误类型的参数。

您在 Do() 上的编译错误是因为您的观察者是一个,IObserver<ScanInformation>但客户端是一个IObservable<string>. 您的意思是将字符串转换为 的实例ScanInformation吗?

您在 BitConverter 上的编译接受 abyte[]作为第一个参数(要转换的字节缓冲区),但是您传递了一个int大概在某个时候您从 WhenDataReceived 返回缓冲区;现在您正在传回读取的字节数。

Rx 并没有发生太大变化,以至于这种代码会被破坏。您的代码看起来可能遇到了一些复制/粘贴错误 - 以至于它可能比有用更令人困惑。查看这篇博客文章,了解一个合理的套接字读取实现,它使用 Rx 以相当简单的方式包装 TPL 调用。这个讨论可能也很有启发性。

ObservableSocketRxx 库中也有一个相当不错的。见这里

于 2013-10-14T07:40:24.610 回答