2

我遇到了一些阻塞问题。我试图从推到拉。即我想在这里访问我的数据,在这种情况下,它是通过我的 Observable 处理后的数组。

type HistoryBar = 
    {Open: decimal; High: decimal; Low: decimal; Close: decimal; Time: DateTime; Volume: int; RequestId: int; Index: int; Total: int}

let transformBar =
    client.HistoricalData
    |> Observable.map(fun args ->
        {
            Open = args.Open
            High = args.High
            Low =  args.Low
            Close = args.Close
            Time = args.Date
            Volume = args.Volume
            RequestId = args.RequestId
            Index = args.RecordNumber
            Total = args.RecordTotal
       }
    )

let groupByRequest (obs:IObservable<HistoryBar>) = 
    let bars = obs.GroupByUntil((fun x -> x.RequestId), (fun x -> x.Where(fun y -> y.Index = y.Total - 1)))
    bars.SelectMany(fun (x:IGroupedObservable<int, HistoryBar>) -> x.ToArray())

let obs = transformBar |> groupByRequest

client.RequestHistoricalData(1, sym, DateTime.Now, TimeSpan.FromDays(10.0), BarSize.OneDay, HistoricalDataType.Midpoint, 0)

如果我订阅了 obs,那么只要我打电话给client.RequestHistoricalData一切都很好。我想做的是将 obs 转换为基础类型,在这种情况下是HistoryBar []. 我试过使用waitToEnumberable没有运气。在这里提取我最后创建的数据的正确方法是什么?

编辑,添加人为的 C# 示例代码以显示库的正常工作方式。我在这里真正想了解的是如何从可观察到标准列表或数组。我不确定是否需要一个可变结构才能做到这一点。如果我不得不猜测,我会说不。

static void Main(string[] args)
{
...
client.HistoricalData += client_HistoricalData;
client.RequestHistoricalData(1, sym, DateTime.Today, TimeSpan.FromDays(10), BarSize.OneDay, HistoricalDataType.Midpoint, 0);
....
}

static void client_HistoricalData(object sender, HistoricalDataEventArgs e)
{

Console.WriteLine("Open: {0}, High: {1}, Low: {2}, Close: {3}, Date: {4}, RecordId: {5}, RecordIndex: {6}", e.Open, e.High, e.Low, e.Close, e.Date, e.RequestId, e.RecordNumber);
}
4

1 回答 1

2

这个问题并没有很清楚地首先如何加载数据(无论是惰性/时变等),所以我假设它是一个有时间限制的值流。

从您的代码看来,您似乎想在流完成时找到流中的最后一个值。该Last方法为您提供完成时推送的流中的最后一个值 - 然而,这是同步的并且阻塞直到流完成。非阻塞版本,LastAsync返回一个 Observable,它在源完成时产生一个值。

let from0To4 = 
    Observable.Interval(TimeSpan.FromSeconds(0.1)).Take(5)

let lastValue =
    from0To4.LastAsync()

let disposable =
    lastValue |> Observable.subscribe(log)

要将 Observable 转换为列表而不部分阻塞,您可以使用这些Buffer方法。要在 Observable 完成之前缓冲所有值,请使用ToList.

let fullBuffer =
    from0To4.ToList()

let disposable =
    fullBuffer |> Observable.subscribe(fun ls -> printfn "Buffer(%d): %A" ls.Count ls)

输出:

缓冲区(5):seq [0L; 1升;2升;3升;...]

于 2012-12-02T19:33:21.790 回答