3

我需要在 Azure 环境中将处理序列(如如何使用 .net RX 组织数据处理器序列)拆分为多个计算单元。
这个想法是将 Observable 序列序列化到 Azure 队列(或服务总线)并将其反序列化。
如果生产者或消费者失败,另一方应该能够继续生产/消费。

任何人都可以提出一种优雅的方法以及使用什么(Azure Queues 或 Service Bus)吗?
有没有人使用过 TCP Observable 提供程序 - http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider来解决此类问题对于其中一方的失败是否安全?

4

2 回答 2

2

想象一下,您有一个消息队列,其中包含以下 API

class MQ {

    public MQ();

    // send a single message from your message queue
    public void send(string keyPath, string msg);

    // Receive a single message from your message queue
    public async Task<string> receive(keyPath);

}

为了使这个 RX 兼容

class MQRX: IObserver<string> {
    MQ _mq;
    string _keyPath

    MQRX(string keyPath){
        _mq = mq;
        _keyPath = keyPath;
    }

    IObservable<string> Observe(){
        return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
    }

    void OnNext(string msg){
        _mq.send(msg);
    }

    void OnError(Exception e){
        // The message queue might not
        // support serializing exceptions
        // or it might or you might build
        // a protocol for it.
    }
}

以容错方式使用它。注意 如果 OnError 传递的上游抛出异常,Retry 将重新订阅

new MQRX("users/1/article/2").
    Retry().
    Subscribe((msg)=>Console.Writeln(msg));

例如,在写作方面,您可以每两秒发送一条消息,如果出现错误,则重试订阅生成器。请注意,在 Observable.Interval 中不太可能出现错误,它只会在每个时间间隔生成一条消息,但想象一下从文件或其他消息队列中读取。

var mq = new MQRX("users/1/article/2");

Observable.Interval(TimeSpan.FromSeconds(2)).
    Select((x)=>x.ToString()).

请注意,您可能应该使用 IObservable Catch 扩展方法,而不是盲目地重试,因为您可能会一遍又一遍地遇到相同的错误。重试()。订阅(MQ);

于 2012-11-13T13:20:51.990 回答
0

我用 30 行 VB 代码编写了自己的 UDP 到 RX 包装器,它可以处理超时。我猜 TCP 包装器会是类似的。

Imports System.Reactive
Imports System.Reactive.Linq
Imports System.Reactive.Threading.Tasks
Imports System.Threading
Imports System.Net
Imports System.Net.Sockets

Public Module UDP
    ''' <summary>
    ''' Generates an IObservable as a UDP stream on the IP endpoint with an optional
    ''' timeout value between results.
    ''' </summary>
    ''' <param name="timeout"></param>
    ''' <returns></returns>
    ''' <remarks></remarks>
    Public Function StreamObserver(
                localPort As Integer, 
                Optional timeout As Nullable(Of TimeSpan) = Nothing
                ) As IObservable(Of UdpReceiveResult)

        Return Linq.Observable.Using(Of UdpReceiveResult, UdpClient)(
            Function() New UdpClient(localPort),
            Function(udpClient)
                Dim o = Observable.Defer(Function() udpClient.
                                                                ReceiveAsync().
                                                                ToObservable())
                If Not timeout Is Nothing Then
                    Return Observable.Timeout(o.Repeat(), timeout.Value)
                Else
                    Return o.Repeat()
                End If
            End Function
        )
    End Function
End Module

如果需要,您可以对写入端执行相同的操作。然后,您只需使用普通的 RX 技术将数据序列化为 UDP 帧。

new UDP.StreamObserver(8080, TimeSpan.FromSeconds(2)).
        Select( function(udpResult) MyDeserializeFunction(udpResult)).
        Subscribe( sub (result) DoSomething(result), 
                   sub(error) DoSomethingWithError ) 
于 2012-11-12T12:56:21.887 回答