干得好。
注意几点:
- 这允许用户提供适当的调度程序来控制并发运行的位置,并产生每个循环以防止它变得过于混乱(尽管在控制台上等待显然很粘...)
- 我们不应该
OnCompleted
在这个例子中调用,因为终止的唯一方法是取消订阅 - 你努力在取消后不再发送消息
- 我们也不
OnNext
在这里发送取消邮件。
这是代码:
public static IObservable<string> ConsoleInputObservable(
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<string>(o =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) =>
{
while(!ct.IsCancellationRequested)
{
var next = Console.ReadLine();
if(ct.IsCancellationRequested)
return;
o.OnNext(next);
await ctrl.Yield();
}
});
});
}
附录
@MartinLiversage 评论说,有多个订阅者的行为是不可取的——这促使了这个附录。您可以简单地Publish()
使用上面的代码,但鉴于控制台的性质是应用程序只有一个,并且一次只能有一个线程读取它,因此需要采用不同的方法。
我忽略了上面的这一点,因为我觉得问题可能更多是关于线程方面而不是控制台的性质。如果您真的对在控制台中输入的报告行感兴趣,那么像下面这样的某种主循环可能更实用 - 这代表了Subject
.
static void Main()
{
Subject<string> sc = new Subject<string>();
// kick off subscriptions here...
// Perhaps with `ObserveOn` if background processing is required
sc.Subscribe(x => Console.WriteLine("Subscriber1: " + x));
sc.Subscribe(x => Console.WriteLine("Subscriber2: " + x));
string input;
while((input = Console.ReadLine()) != "q")
{
sc.OnNext(input);
}
sc.OnCompleted();
Console.WriteLine("Finished");
}