-1

这个实现很糟糕,因为它会disposingSubscriberSocket第一个订阅终止时结束。当我运行它时,似乎什么都没有发布。[这使用 Rx 3.0.0]

如何修改Receive函数来解决这个问题?

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;

using NetMQ;
using NetMQ.Sockets;

using System.Reactive.Linq;

namespace App1
{
    class MainClass
    {
        // publisher for testing, should be an external data publisher in real environment
        public static Thread StartPublisher(PublisherSocket s)
        {
            s.Bind("inproc://test");
            var thr = new Thread(() =>
            {
                Console.WriteLine("Start publishing...");
                while (true)
                {
                    Thread.Sleep(500);
                    bool more = false;
                    s.SendFrame("hello", more);
                }
            });

            thr.Start();
            return thr;
        }

        public static IObservable<string> Receive(SubscriberSocket subp)
        {
            return Observable
                .Create<string>(o =>
                    Observable.Using<string, SubscriberSocket>(() =>
                    {
                        subp.Connect("inproc://test");
                        subp.Subscribe("");
                        return subp;
                    }, sub =>
                        Observable
                        .FromEventPattern<EventHandler<NetMQSocketEventArgs>, NetMQSocketEventArgs>(
                            h => sub.ReceiveReady += h,
                            h => sub.ReceiveReady -= h)
                         .Select(x => sub.ReceiveFrameString()))
                .Subscribe(o));
        }

        public static void Main(string[] args)
        {
            var sub = new SubscriberSocket();
            var pub = new PublisherSocket();

            StartPublisher(pub);
            Receive(sub).Subscribe(Console.WriteLine);

            Console.ReadLine();
        }
    }
}
4

1 回答 1

0

在我看来,你应该做这样的事情:

void Main()
{
    var address = "inproc://test";

    var pub = new PublisherSocket();
    pub.Bind(address);

    var pubSubscription =
        Observable
            .Interval(TimeSpan.FromMilliseconds(500.0))
            .Subscribe(n => pub.SendFrame("hello", false));

    Receive(address).Subscribe(Console.WriteLine);

    Console.ReadLine();
}

public static IObservable<string> Receive(string address)
{
    return Observable
        .Create<string>(o =>
            Observable.Using<string, SubscriberSocket>(() =>
            {
                var ss = new SubscriberSocket();
                ss.Connect(address);
                ss.Subscribe("");
                return ss;
            }, ss =>
                Observable
                .FromEventPattern<EventHandler<NetMQSocketEventArgs>, NetMQSocketEventArgs>(
                    h => ss.ReceiveReady += h,
                    h => ss.ReceiveReady -= h)
                 .Select(x => ss.ReceiveFrameString()))
        .Subscribe(o));
}

首先,这意味着创建SubscriberSocket内部Observable.Create- 这是它所属的地方。

于 2017-10-15T00:01:00.570 回答