这个实现很糟糕,因为它会disposing
在SubscriberSocket
第一个订阅终止时结束。当我运行它时,似乎什么都没有发布。[这使用 Rx 3.0.0]
如何修改Receive
函数来解决这个问题?
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using NetMQ;
using NetMQ.Sockets;
using System.Reactive.Linq;
namespace App1
{
class MainClass
{
// publisher for testing, should be an external data publisher in real environment
public static Thread StartPublisher(PublisherSocket s)
{
s.Bind("inproc://test");
var thr = new Thread(() =>
{
Console.WriteLine("Start publishing...");
while (true)
{
Thread.Sleep(500);
bool more = false;
s.SendFrame("hello", more);
}
});
thr.Start();
return thr;
}
public static IObservable<string> Receive(SubscriberSocket subp)
{
return Observable
.Create<string>(o =>
Observable.Using<string, SubscriberSocket>(() =>
{
subp.Connect("inproc://test");
subp.Subscribe("");
return subp;
}, sub =>
Observable
.FromEventPattern<EventHandler<NetMQSocketEventArgs>, NetMQSocketEventArgs>(
h => sub.ReceiveReady += h,
h => sub.ReceiveReady -= h)
.Select(x => sub.ReceiveFrameString()))
.Subscribe(o));
}
public static void Main(string[] args)
{
var sub = new SubscriberSocket();
var pub = new PublisherSocket();
StartPublisher(pub);
Receive(sub).Subscribe(Console.WriteLine);
Console.ReadLine();
}
}
}