我正在从数据库中读取时间序列数据,但这些数据点是不同地方的分区。因此,例如,如果有 1 Hz 数据,则每 3600 个数据点(一小时)存储在不同的分区中。当我读取这些数据点时,我必须以正确的逻辑顺序将它们返回给用户。所以,我必须返回 00:00 小时的 3600 个数据点,然后返回 01:00 小时的 3600 个数据点,……直到 23:00 小时。自然,数据库查询存在延迟。假设每个查询需要 10 毫秒。这意味着如果我进行 concat,它将有 24 * 10 ms = 240 ms 的延迟。我需要的是具有许多并行性的热连接。我不能按顺序执行所有这些查询,然后执行常规 concat,因为用户可能需要 1 年的数据。目前,我所拥有的是这样的:
public IObservable<int> ReadSensorPartitions( /* Guid sensorId*/)
{
return Observable.Create<int>(async o =>
{
try
{
await Task.Delay(10).ConfigureAwait(false);
Observable.Range(1, 1000).Subscribe(o);
}
catch (Exception e)
{
o.OnError(e);
}
});
}
public IObservable<int> ReadSensorData(int partitionNum)
{
return Observable.Create<int>(async o =>
{
try
{
await Task.Delay(10).ConfigureAwait(false);
Observable.Range(partitionNum*1000, 999).Subscribe(o);
}
catch (Exception e)
{
o.OnError(e);
}
});
}
public IObservable<int> ConcatObservables(IObservable<IObservable<int>> observables)
{
return ReadSensorPartitions().Select(partitionNum => ReadSensorData(partitionNum)).Concat();
}
public async Task TestConcatObservables()
{
var taskCompletionSource = new TaskCompletionSource<bool>();
long count = 0;
var sw = Stopwatch.StartNew();
ConcatObservables(ReadSensorPartitions().Select(partitionNum => ReadSensorData(partitionNum)))
.Subscribe(x =>
{
count++;
}, ex =>
{
}, () =>
{
taskCompletionSource.SetResult(true);
});
await taskCompletionSource.Task.ConfigureAwait(false);
sw.Stop();
Console.WriteLine("TestConcatObservables Duration for {0} datapoint(s) is {1} ms", count, sw.Elapsed.TotalMilliseconds);
}
对于 1 年的数据,这至少需要 87.6 秒。我需要这样的东西
public IObservable<int> HotConcatObservables(IObservable<IObservable<int>> observables, int parallelism)
{
}
例如,如果将并行度设置为 24,则前 24 次读取会立即全部预热并缓冲。然后我们返回分区 1,然后返回 2,……直到 24,一旦分区 1 完成流式传输,我们立即预热分区 25。一旦分区 2 完成流式传输,我们预热分区 26,等等。所有这些数据点都必须返回以正确的逻辑顺序。
我花了很多时间来做这件事,但我做的最好的(使用 IConnectableObservable)比 Merge 慢 5 倍(即使并行度是相同数量的分区)。反正有没有。这可以有效地完成吗?我真的很感谢任何帮助。