3

在应用程序退出时,我无法正确处理使用 RX 创建的线程。我在 Process Explorer 中看到,应用程序关闭后,线程仍在运行,导致 IO 异常。

class Program
{
    static void Main(string[] args)
    {
        CompositeDisposable subsriptions = new CompositeDisposable();

       subscriptions.Add(Observable.Interval(TimeSpan.FromSeconds(15))
                .Subscribe(_ =>
                {
                    getData();

                }));
        Thread.Sleep(TimeSpan.FromSeconds(20));     
            subscriptions.Dispose();
         }   
    }
}

如果您看到我是否取消了对 subscription.Dispose() 的注释,则线程将终止而不会获得任何数据。任何帮助,将不胜感激。谢谢

4

3 回答 3

1

您正在寻找的模式与此类似

class Program {

   public string GetData(){
       return "Hello";
   }

   public string async GetDataAsync(){

       return await Observable
            .Interval(TimeSpan.FromSeconds(15))
            .Take(1)
            .Select(()=>GetData());


   }

   static void Main(string[]args){

       var s = GetDataAsync().Wait();
   }

}

原因Wait是入口点 ,Main在这种情况下不能标记为asyncWait阻塞当前线程,直到返回的 TaskGetDataAsync产生一个值。

另请注意,IObservable 与 async/await 兼容,并将返回序列生成的最后一个值。这就是我添加的原因,Take(1)因为它只会产生 1 个刻度。

另一种选择是直接在 IObservable 上调用 Wait ,如

class Program {

   public string GetData(){
       return "Hello";
   }

   public IObservable<string> GetDataObservable(){

       return Observable
            .Interval(TimeSpan.FromSeconds(15))
            .Take(1)
            .Select(()=>GetData());


   }

   static void Main(string[]args){

       var s = GetDataObservable().Wait();
   }

}
于 2013-07-26T06:23:40.020 回答
1

subsriptions.Add(...)您需要和之间的某种延迟subscriptions.Dispose()。两者之间没有延迟,您的应用程序只是立即订阅和处置它们,没有时间让线程完成它们的工作。(并且Thread.Sleep(1000)不起作用,因为它在订阅函数内部,而不是主函数的一部分。)

于 2013-07-25T17:12:03.833 回答
0

你可以订阅你的 observable ,CancellationToken这将取消底层任务的执行:

static void Main(string[] args)
{
    var cts = new CancellationTokenSource();
    Observable.
        Interval(TimeSpan.FromSeconds(15)).
        Subscribe(_ => getData(), cts.Token));
    cts.CancelAfter(TimeSpan.FromSeconds(20));
}
于 2013-07-31T11:51:45.587 回答