我想出了这个解决方案。(尚未测试)通过网络上的大量弹跳。
Private Function ObserveUDP() As IObservable(Of bytes())
Dim f = Function(observer)
Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
Dim client = New UdpClient(endpoint)
Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
( Nothing _
, Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
, Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
, Function(task As Task(Of UdpReceiveResult)) task.Result)
Dim observable = obs.Select(Function(r) r.Buffer)
dim handle = observable.Subscribe(observer)
Dim df = Sub()
client.Close()
handle.Dispose()
End Sub
Return Disposable.Create(df)
End Function
Return observable.Create(f)
End Function
我的要求是确保在订阅被删除时关闭 UDP 客户端。我很确定上面的代码很接近,但我认为它不太正确。任何输入将不胜感激。
* 编辑 *
其实上面的例子是完全错误的,只会同步创建大量的任务对象,而不是等待它们。经过一番反复试验,我提出了以下通用函数,用于展开一个被一遍又一遍地调用的等待对象。任何意见?
''' initializer - a function that initializes and returns the state object
''' generator - a function that asynchronously using await generates each value
''' finalizer - a function for cleaning up the state object when the sequence is unsubscribed
Private Function ObservableAsyncSeq(Of T, I)( _
initializer As Func(Of I), _
generator As Func(Of I, Task(Of T)), _
finalizer As Action(Of I)) As IObservable(Of T)
Dim q = Function(observer As IObserver(Of T))
Dim go = True
Try
Dim r = Async Sub()
Dim ii As I = initializer()
While go
Dim result = Await generator(ii)
observer.OnNext(result)
End While
finalizer(ii)
observer.OnCompleted()
End Sub
Task.Run(r)
Catch ex As Exception
observer.OnError(ex)
End Try
' Disposable for stopping the sequence as per
' the observable contract
Return Sub() go = False
End Function
Return Observable.Create(q)
End Function
以及与 UDP 一起使用的示例
Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate)
Dim initializer = Function()
Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
Return New UdpClient(endpoint)
End Function
Dim finalizer = Function(client As UdpClient)
client.Close()
End Function
Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult)
Return client.ReceiveAsync()
End Function
Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer))
End Function