我想出了这个解决方案。(尚未测试)通过网络上的大量弹跳。
Private Function ObserveUDP() As IObservable(Of bytes())
    Dim f = Function(observer)
                Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
                Dim client = New UdpClient(endpoint)
                Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
                      ( Nothing _
                      , Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
                      , Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
                      , Function(task As Task(Of UdpReceiveResult)) task.Result)
                Dim observable = obs.Select(Function(r) r.Buffer)
                dim handle = observable.Subscribe(observer)
                Dim df = Sub() 
                    client.Close()
                    handle.Dispose()
                End Sub
                Return Disposable.Create(df)
    End Function
    Return observable.Create(f)
End Function
我的要求是确保在订阅被删除时关闭 UDP 客户端。我很确定上面的代码很接近,但我认为它不太正确。任何输入将不胜感激。
* 编辑 *
其实上面的例子是完全错误的,只会同步创建大量的任务对象,而不是等待它们。经过一番反复试验,我提出了以下通用函数,用于展开一个被一遍又一遍地调用的等待对象。任何意见?
''' initializer - a function that initializes and returns the state object
''' generator   - a function that asynchronously using await generates each value
''' finalizer   - a function for cleaning up the state object when the sequence is unsubscribed
Private Function ObservableAsyncSeq(Of T, I)( _
    initializer As Func(Of I), _
    generator As Func(Of I, Task(Of T)), _
    finalizer As Action(Of I))  As IObservable(Of T)
    Dim q = Function(observer As IObserver(Of T))
                Dim go = True
                Try
                    Dim r = Async Sub()
                                Dim ii As I = initializer()
                                While go
                                    Dim result = Await generator(ii)
                                    observer.OnNext(result)
                                End While
                                finalizer(ii)
                                observer.OnCompleted()
                            End Sub
                    Task.Run(r)
                Catch ex As Exception
                    observer.OnError(ex)
                End Try
                ' Disposable for stopping the sequence as per
                ' the observable contract
                Return Sub() go = False
            End Function
    Return Observable.Create(q)
End Function
以及与 UDP 一起使用的示例
Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate)
    Dim initializer = Function()
                          Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
                          Return New UdpClient(endpoint)
                      End Function
    Dim finalizer = Function(client As UdpClient)
                        client.Close()
                    End Function
    Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult)
                        Return client.ReceiveAsync()
                    End Function
    Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer))
End Function