2

我有一个哈希集。有时,新值会添加到此哈希集中。我想要做的是让计时器在添加后一分钟从集合中删除每个元素。

我还是 rx 的新手,但这似乎是使用它的理想场合。

我试过这样的事情:

AddItem(string item)
{
  _mySet.Add(item);
  var timer = Observable.Timer(TimeSpan.FromSeconds(60), _scheduler);
  timer
      .Take(1)
      .Do(item => RemoveItem(item))
      .Subscribe(_ => Console.WriteLine("Removed {0}", item));
}

它似乎工作正常(通过单元测试)。

有人认为这种方法有什么问题吗?

4

4 回答 4

3
  1. 您在调用中的 lambdaDo看起来不正确 -Observable.Timer产生 int 值,但您的集合是HashSet<string>- 这不应该编译。我猜这只是一个错字。

  2. Do: 一般来说,您的订阅应该在Subscribe. Do用于副作用(我不喜欢流中副作用的想法,所以我避免使用它,但它对调试很有用)。

  3. Take:Observable.Timer在终止前只产生一个值,因此不需要Take操作符

我会把你的函数写成:

AddItem(string item)
{
    _mySet.Add(item);
    Observable.Timer(TimeSpan.FromSeconds(60), _scheduler)
        .Subscribe(_ => RemoveItem(item));
}
于 2013-03-21T20:11:19.200 回答
2

您无需创建序列即可执行此操作。您已经是一个好公民并明确使用调度程序,所以只需使用它!

您可以将其用于您的代码

AddItem(string item)
{
  _mySet.Add(item);
  //Note this does return an IDisposable if you want to cancel the subscription.
  _scheduler.Schedule(
    TimeSpan.FromSeconds(60),
    ()=>
    { 
        RemoveItem(item);
        Console.WriteLine("Removed {0}", item);
    });
}

这基本上意味着幕后工作要少得多。考虑一下 Observable.Timer 方法正在进行的所有工作,实际上您只希望它做的只是安排一个带有值(您忽略)的 OnNext。

我还假设即使对 Rx 一无所知的用户也能够阅读此计划代码。IE。“添加此项目后,我安排此删除操作在 60 秒内运行)。

于 2013-03-22T15:45:20.413 回答
0

如果你使用的是 ReactiveUI,一个名为的类ReactiveCollection在这里肯定会有所帮助,你可以像这样使用它:

theCollection.ItemsAdded
    .SelectMany(x => Observable.Timer(TimeSpan.FromSeconds(60), _scheduler).Select(_ => x))
    .Subscribe(x => theCollection.Remove(x));
于 2013-03-21T20:06:11.940 回答
-3

对不起,不是要挑剔你,但是:

总是丢弃 IDISPOSABLES!!!!!!

(编辑:好的,不知道今天早上我在咖啡里放了什么,但我回答了一大堆废话;我会留下上面的只是因为总的来说,你确实想确保处理任何东西IDisposable,但是为了弥补接下来的喋喋不休......)

该调用Subscribe创建了一个您不会处理的订阅,因此对该方法的多次调用只会排队越来越多的垃圾 - 现在在这种特定情况下,它不是世界末日,因为Timer唯一触发一次,但仍然...处置!

如果你真的想使用这种方法(我认为更好的方法是让一些正在运行的线程/任务“倾向于”你的值,当它认为有必要时删除它),至少尝试类似于:

好吧,忽略所有那些被淘汰的废话。的实现Observable.Timer是这样的:

public static IObservable<long> Timer(TimeSpan dueTime)
{
    return s_impl.Timer(dueTime);
}

这反过来又调用了这个:

public virtual IObservable<long> Timer(TimeSpan dueTime)
{
    return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);
}

这叫...

private static IObservable<long> Timer_(TimeSpan dueTime, IScheduler scheduler)
{
    return new Timer(dueTime, null, scheduler);
}

这就是事情变得有趣的地方 -Timer是 a Producer<long>,肉味的地方是:

private IDisposable InvokeStart(IScheduler self, object state)
{
    this._pendingTickCount = 1;
    SingleAssignmentDisposable disposable = new SingleAssignmentDisposable();
    this._periodic = disposable;
    disposable.Disposable = self.SchedulePeriodic<long>(1L, this._period, new Func<long, long>(this.Tock));
    try
    {
        base._observer.OnNext(0L);
    }
    catch (Exception exception)
    {
        disposable.Dispose();
        exception.Throw();
    }
    if (Interlocked.Decrement(ref this._pendingTickCount) > 0)
    {
        SingleAssignmentDisposable disposable2 = new SingleAssignmentDisposable {
            Disposable = self.Schedule<long>(1L, new Action<long, Action<long>>(this.CatchUp))
        };
        return new CompositeDisposable(2) { disposable, disposable2 };
    }
    return disposable;
}

现在,base._observer.OnNext,这是设置为在计时器滴答声上触发的内部接收器,其中Invokeon 是:

private void Invoke()
{
    base._observer.OnNext(0L);
    base._observer.OnCompleted();
    base.Dispose();
}

所以是的。它会自动处理自己 - 并且不会有任何“挥之不去的订阅”浮动。

嗯……乌鸦很好吃。:|

于 2013-03-21T16:25:43.413 回答