11

我试图找出Rx的 C++版本中的调度模型。

了解 C# 版本,其中有一个带有一个 Schedule 方法的简单接口;C++ 版本似乎相当复杂,包含调度程序、工作程序和协调等内容。

对我来说,一个主要的缺失部分是线程池调度程序的实现,它是否存在其他名称?我将如何实现它自己?我应该把它写在PPL (Windows) 之上吗?如果我需要在它上面有一个序列化的(Actor like)观察者,我应该使用什么?在这里这里偷看可以表明这不是一项微不足道的任务。

这确实有助于获得有关该主题的某种概述,因为官方文档是自动生成的并且仍然非常稀疏。

4

1 回答 1

8

是的,生成的文档是新的,并且计划尚未记录。

rxcpp v2 中的调度程序基于 RxJava 使用的调度程序和工作程序构造(涉及 Eric Meijer)RxJava 的文档将解释调度程序和工作程序。rxcpp 增加了 schedulable、coordination 和 coordinator。

scheduler拥有由该now()方法公开的时间线。scheduler也是worker那个时间线中 s 的工厂。由于调度程序拥有时间线,因此可以构建时间旅行的调度程序。virtual-scheduler 是 test-scheduler 的基础,它使用它在毫秒内完成多秒测试。

worker拥有schedulable时间线的待处理队列并具有生命周期。当schedulable到达an 的时间时,schedulable运行。队列维护插入顺序,以便当 Nschedulable具有相同的目标时间时,它们会按照它们插入队列的顺序运行。worker保证每个都在下schedulable一个schedulable开始之前完成。当worker的生命周期被取消订阅时,所有挂起schedulable的 s 都将被丢弃。

schedulable拥有一个函数,拥有一个worker和一个lifetime。当schedulable的生命周期被取消订阅时,该schedulable函数将不会被调用。被schedulable传递给函数并允许函数重新调度自己或在同一个工作人员上安排其他事情。

新概念是协调和协调。我添加了这些以简化运营商实现并在运营商实现中引入按使用付费。具体来说,在 Rx.NET 和 RxJava 中,操作员使用原子操作和同步原语来协调来自多个流的消息,即使所有流都在同一个线程上(如 UI 事件)。默认情况下使用 rxcpp 中的identity_. . .协调,并且没有开销。和协调分别使用互斥锁syncronize_. . .observe_on_. . .队列到工作人员,以安全地交错多个流。

coordinationcoordinators 的工厂并且有scheduler.

coordinatorworker, 并且是协调observables、subscribers 和schedulable函数的工厂。

所有接受多个流或及时处理的操作符(甚至 subscribe_on 和 observe_on)都采用协调参数,而不是调度程序。

以下是一些提供的函数,它们将使用特定的调度程序产生协调。

  • 身份立即()
  • identity_current_thread()
  • identity_same_worker(worker w)
  • 序列化事件循环()
  • serialize_new_thread()
  • serialize_same_worker(worker w)
  • 观察_on_event_loop()
  • 观察_on_new_thread()

目前还没有线程池调度程序。线程池调度程序需要依赖于线程池实现,因为我不想编写线程池。我的计划是为 windows 线程池、apple 线程池和 boost asio 执行器池创建一个调度程序。要回答的一个问题是这些特定于平台的构造是否应该存在于 rxcpp 存储库中或具有特定于平台的存储库。

欢迎贡献,意见和想法!

于 2015-05-18T01:52:51.423 回答