10

我无法使用FSharp.ChartingFSharp.Control.Reactive创建我的一些数据的动画可视化。

本质上,我有一个无限的点生成器。我的真实生成器比下面的例子更复杂,但是这个简化的例子重现了这个问题:

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

这个move函数有类型(int * int) list -> seq<(int * int) list>。对于每次迭代,它将输入列表中的所有点平移 1,以便这些点向上和向右移动。

我想在LiveChart.Point. 生成的序列move可以转换为适当的Observable,但它本身运行得很快,所以我先放慢一点:

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

这使我能够从点序列中创建一个 Observable:

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable

我还可以看到,如果我打印到控制台,它可以工作:

obs.Subscribe (fun x -> printfn "%O" x)

通过打印到控制台,也很清楚这会阻塞执行环境;例如,如果您将脚本发送到F# Interactive (FSI),它会继续打印,您必须取消评估或重置会话才能停止它。

我的理论是,这是因为它obs与执行环境在同一线程上运行。

如果我尝试利用LiveChart.Point它,也会发生同样的情况:

let lc = obs |> LiveChart.Point
lc.ShowChart()

如果我将此发送给 FSI,则不会发生任何事情(未显示图表),并且 FSI 会阻塞。

这似乎与我的理论一致,即观察者与图表在同一线程上运行。

如何让 Observer 在不同的线程上运行?

我发现Observable.observeOn,这需要一个IScheduler. 浏览 MSDN,我发现了NewThreadSchedulerThreadPoolSchedulerTaskPoolScheduler,它们都实现了IScheduler. 这些类的名称听起来很有希望,但我找不到它们!

根据文档,它们都定义在 中System.Reactive.dll,但是虽然我拥有FSharp.Control.Reactive的所有依赖项,但我在任何地方都没有该程序集。搜索互联网也没有透露从哪里得到它。

它是反应式扩展的旧版本还是新版本?我什至在寻找正确的方向吗?

如何在 a 上可视化我的无限点序列LiveChart


这是重现该问题的完整脚本:

#r @"../packages/Rx-Interfaces.2.2.5/lib/net45/System.Reactive.Interfaces.dll"
#r @"../packages/Rx-Core.2.2.5/lib/net45/System.Reactive.Core.dll"
#r @"../packages/Rx-Linq.2.2.5/lib/net45/System.Reactive.Linq.dll"
#r @"../packages/FSharp.Control.Reactive.3.2.0/lib/net40/FSharp.Control.Reactive.dll"
#r @"../packages/FSharp.Charting.0.90.12/lib/net40/FSharp.Charting.dll"
#r "System.Windows.Forms.DataVisualization"

open System
open FSharp.Control.Reactive
open FSharp.Charting

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable

//obs.Subscribe (fun x -> printfn "%O" x)

let lc = obs |> LiveChart.Point
lc.ShowChart()

已安装的 NuGet 包:

Id                                  Versions
--                                  --------
FSharp.Charting                     {0.90.12}
FSharp.Control.Reactive             {3.2.0}
FSharp.Core                         {3.1.2}
Rx-Core                             {2.2.5}
Rx-Interfaces                       {2.2.5}
Rx-Linq                             {2.2.5}
4

2 回答 2

6

诀窍是使用 subscribeOn。来自introtorx.com上的 subscribeOn 和 observeOn:

我想在这里指出的一个陷阱是,最初几次我使用这些重载时,我对它们实际上做了什么感到困惑。您应该使用 SubscribeOn 方法来描述您希望如何安排任何预热和后台处理代码。例如,如果您将 SubscribeOn 与 Observable.Create 一起使用,则传递给 Create 方法的委托将在指定的调度程序上运行。

ObserveOn 方法用于声明您希望将通知安排到的位置。我建议 ObserveOn 方法在使用 STA 系统时最有用,最常见的是 UI 应用程序。

完整脚本如下:

#r @"packages/Rx-Interfaces.2.2.5/lib/net45/System.Reactive.Interfaces.dll"
#r @"packages/Rx-PlatformServices.2.2.5/lib/net45/System.Reactive.PlatformServices.dll"
#r @"packages/Rx-Core.2.2.5/lib/net45/System.Reactive.Core.dll"
#r @"packages/Rx-Linq.2.2.5/lib/net45/System.Reactive.Linq.dll"
#r @"packages/FSharp.Control.Reactive.3.2.0/lib/net40/FSharp.Control.Reactive.dll"
#r @"packages/FSharp.Charting.0.90.12/lib/net40/FSharp.Charting.dll"
#r "System.Windows.Forms.DataVisualization"

open System
open FSharp.Control.Reactive
open FSharp.Charting
open System.Reactive.Concurrency

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable
    |> Observable.subscribeOn NewThreadScheduler.Default 

let lc = obs |> LiveChart.Point
lc.ShowChart()

通常在 UI 应用程序中,您会将 subscribeOn 和 observeOn 配对,以确保将结果传递回 UI 线程。这里似乎不需要,因为看起来图表为您处理了这个(对我有用)。

于 2015-08-21T12:52:12.533 回答
1

调度程序在 System.Reactive.PlatformServices.dll 中定义(由 Rx-PlatformServices 包安装)。

我在这里找到了它们: https ://github.com/Reactive-Extensions/Rx.NET/tree/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency

例如使用新线程调度器: (observable).ObserveOn(System.Reactive.Concurrency.NewThreadScheduler.Default)

于 2015-08-21T08:17:10.833 回答