0

我正在从数据库中读取时间序列数据,但这些数据点是不同地方的分区。因此,例如,如果有 1 Hz 数据,则每 3600 个数据点(一小时)存储在不同的分区中。当我读取这些数据点时,我必须以正确的逻辑顺序将它们返回给用户。所以,我必须返回 00:00 小时的 3600 个数据点,然后返回 01:00 小时的 3600 个数据点,……直到 23:00 小时。自然,数据库查询存在延迟。假设每个查询需要 10 毫秒。这意味着如果我进行 concat,它将有 24 * 10 ms = 240 ms 的延迟。我需要的是具有许多并行性的热连接。我不能按顺序执行所有这些查询,然后执行常规 concat,因为用户可能需要 1 年的数据。目前,我所拥有的是这样的:

public IObservable<int> ReadSensorPartitions( /* Guid sensorId*/)
        {
            return Observable.Create<int>(async o =>
            {
                try
                {
                    await Task.Delay(10).ConfigureAwait(false);
                    Observable.Range(1, 1000).Subscribe(o);
                }
                catch (Exception e)
                {
                    o.OnError(e);
                }
            });
        }

        public IObservable<int> ReadSensorData(int partitionNum)
        {
            return Observable.Create<int>(async o =>
            {
                try
                {
                    await Task.Delay(10).ConfigureAwait(false);
                    Observable.Range(partitionNum*1000, 999).Subscribe(o);
                }
                catch (Exception e)
                {
                    o.OnError(e);
                }
            });
        }

        public IObservable<int> ConcatObservables(IObservable<IObservable<int>> observables)
        {
            return ReadSensorPartitions().Select(partitionNum => ReadSensorData(partitionNum)).Concat();
        } 

        public async Task TestConcatObservables()
        {
            var taskCompletionSource = new TaskCompletionSource<bool>();
            long count = 0;
            var sw = Stopwatch.StartNew();
            ConcatObservables(ReadSensorPartitions().Select(partitionNum => ReadSensorData(partitionNum)))
                .Subscribe(x =>
                {
                    count++;
                }, ex =>
                {

                }, () =>
                {
                    taskCompletionSource.SetResult(true);
                });
            await taskCompletionSource.Task.ConfigureAwait(false);
            sw.Stop();
            Console.WriteLine("TestConcatObservables Duration for {0} datapoint(s) is {1} ms", count, sw.Elapsed.TotalMilliseconds);
        }

对于 1 年的数据,这至少需要 87.6 秒。我需要这样的东西

public IObservable<int> HotConcatObservables(IObservable<IObservable<int>> observables, int parallelism)
        {

        }

例如,如果将并行度设置为 24,则前 24 次读取会立即全部预热并缓冲。然后我们返回分区 1,然后返回 2,……直到 24,一旦分区 1 完成流式传输,我们立即预热分区 25。一旦分区 2 完成流式传输,我们预热分区 26,等等。所有这些数据点都必须返回以正确的逻辑顺序。

我花了很多时间来做这件事,但我做的最好的(使用 IConnectableObservable)比 Merge 慢 5 倍(即使并行度是相同数量的分区)。反正有没有。这可以有效地完成吗?我真的很感谢任何帮助。

4

0 回答 0