3

我想出了这个解决方案。(尚未测试)通过网络上的大量弹跳。

Private Function ObserveUDP() As IObservable(Of bytes())


    Dim f = Function(observer)
                Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
                Dim client = New UdpClient(endpoint)

                Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
                      ( Nothing _
                      , Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
                      , Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
                      , Function(task As Task(Of UdpReceiveResult)) task.Result)

                Dim observable = obs.Select(Function(r) r.Buffer)

                dim handle = observable.Subscribe(observer)

                Dim df = Sub() 
                    client.Close()
                    handle.Dispose()
                End Sub

                Return Disposable.Create(df)

    End Function

    Return observable.Create(f)

End Function

我的要求是确保在订阅被删除时关闭 UDP 客户端。我很确定上面的代码很接近,但我认为它不太正确。任何输入将不胜感激。

* 编辑 *

其实上面的例子是完全错误的,只会同步创建大量的任务对象,而不是等待它们。经过一番反复试验,我提出了以下通用函数,用于展开一个被一遍又一遍地调用的等待对象。任何意见?

''' initializer - a function that initializes and returns the state object
''' generator   - a function that asynchronously using await generates each value
''' finalizer   - a function for cleaning up the state object when the sequence is unsubscribed

Private Function ObservableAsyncSeq(Of T, I)( _
    initializer As Func(Of I), _
    generator As Func(Of I, Task(Of T)), _
    finalizer As Action(Of I))  As IObservable(Of T)

    Dim q = Function(observer As IObserver(Of T))
                Dim go = True
                Try
                    Dim r = Async Sub()
                                Dim ii As I = initializer()
                                While go
                                    Dim result = Await generator(ii)
                                    observer.OnNext(result)
                                End While
                                finalizer(ii)
                                observer.OnCompleted()
                            End Sub
                    Task.Run(r)
                Catch ex As Exception
                    observer.OnError(ex)
                End Try

                ' Disposable for stopping the sequence as per
                ' the observable contract
                Return Sub() go = False

            End Function

    Return Observable.Create(q)
End Function

以及与 UDP 一起使用的示例

Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate)
    Dim initializer = Function()
                          Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
                          Return New UdpClient(endpoint)
                      End Function

    Dim finalizer = Function(client As UdpClient)
                        client.Close()
                    End Function

    Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult)
                        Return client.ReceiveAsync()
                    End Function

    Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer))

End Function
4

3 回答 3

5

您可以使用Observable.Using提到的 Enigmativity 或简单地使用接受 an作为返回参数的常规Observable.Create方法- 这足以安全处理。IDisposable

使用迭代器或异步非常好。我列出了一种更类似于 Rx 的方法来做到这一点:

Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
    Return Observable.Using(Of T, UdpClient)(
        Function() New UdpClient(endpoint),
        Function(udpClient) _
            Observable.Defer(Function() udpClient.ReceiveAsync().ToObservable()) _
            .Repeat() _
            .Select(Function(result) processor(result.Buffer))
    )
End Function

传统方式:

Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
    Return Observable.Using(
        Function() New UdpClient(endpoint),
        Function(udpClient) Observable.Defer( _
        Observable.FromAsyncPattern(
            AddressOf udpClient.BeginReceive,
            Function(iar)
                Dim remoteEp = TryCast(iar.AsyncState, IPEndPoint)
                Return udpClient.EndReceive(iar, remoteEp)
            End Function)
        ).Repeat() _
         .Select(processor)
    )
End Function

测试:

Shared Sub Main()
    Using UdpStream(New IPEndPoint(IPAddress.Loopback, 13200),
                    Function(bytes) String.Join(",", bytes)
                    ).Subscribe(AddressOf Console.WriteLine)
        Console.ReadLine()
    End Using

    Console.WriteLine("Done")
    Console.ReadKey()
End Sub
于 2012-09-28T20:29:34.550 回答
2

看一下Observable.Using- 它专门用于创建一个使用一次性资源生成其值的可观察对象,并在完成后自动处理资源。

您会发现UdpClient具有相同的Close&Dispose方法实现,因此Close如果您调用Dispose.

从反射器:

void IDisposable.Dispose()
{
    this.Dispose(true);
}

public void Close()
{
    this.Dispose(true);
}

这是 的签名Using

Public Shared Function Using(Of TSource, TResource As IDisposable)(
    ByVal resourceFactory As Func(Of TResource),
    ByVal observableFactory As Func(Of TResource, IObservable(Of TSource)))
        As IObservable(Of TSource)
于 2012-09-28T00:19:55.780 回答
1

我以前没有使用过 UDPClient,但您似乎正在使用任务(基数 =1)来尝试接收数据流(基数 = 很多)。似乎可以解决这个问题,您重复了查询。这意味着您的查询将执行此操作

  1. 创建一个UDP客户端
  2. 调用数据请求
  3. 接收它获得的第一个数据
  4. 将数据推送到序列上
  5. 关闭序列
  6. 处理 UDPClient
  7. 创建 UDPClient(返回步骤 1)
  8. 调用数据请求
  9. 接收它获得的第一个数据
  10. ....直到您处理连接。

我认为您应该能够通过拉入字节流来读取套接字/网络连接。我在我的博客文章中向您展示了如何做到这一点:

http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator

这样,您只需打开一个连接并在收到字节时推送它们。

快速谷歌一下,我还发现对 UDPClient 实现的可靠性存在担忧。 http://www.codeproject.com/Articles/1938/Issues-with-UdpClient-Receive

高温高压

using System;
using System.IO;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace MyLib
{
    public static class ObservableExtensions
    {
        //TODO: Could potentially upgrade to using tasks/Await-LC
        public static IObservable<byte> ToObservable(
            this Stream source,
            int buffersize,
            IScheduler scheduler)
        {
            var bytes = Observable.Create<byte>(o =>
            {
                var initialState = new StreamReaderState(source, buffersize);
                var currentStateSubscription = new SerialDisposable();
                Action<StreamReaderState, Action<StreamReaderState>> iterator =
                (state, self) =>
                    currentStateSubscription.Disposable = state.ReadNext()
                        .Subscribe(
                            bytesRead =>
                            {
                                for (int i = 0; i < bytesRead; i++)
                                {
                                    o.OnNext(state.Buffer[i]);
                                }
                                if (bytesRead > 0)
                                    self(state);
                                else
                                    o.OnCompleted();
                            },
                            o.OnError);
                var scheduledWork = scheduler.Schedule(initialState, iterator);
                return new CompositeDisposable(currentStateSubscription, scheduledWork);
            });
            return Observable.Using(() => source, _ => bytes);
        }

        private sealed class StreamReaderState
        {
            private readonly int _bufferSize;
            private readonly Func<byte[], int, int, IObservable<int>> _factory;
            public StreamReaderState(Stream source, int bufferSize)
            {
                _bufferSize = bufferSize;
                _factory = Observable.FromAsyncPattern<byte[], int, int, int>(
                source.BeginRead,
                source.EndRead);
                Buffer = new byte[bufferSize];
            }
            public IObservable<int> ReadNext()
            {
                return _factory(Buffer, 0, _bufferSize);
            }
            public byte[] Buffer { get; set; }
        }
    }
}
于 2012-10-03T09:27:39.073 回答