9

假设我有两个返回整数 1 到 5 的序列。

第一个返回 1、2 和 3 非常快,但 4 和 5 每个需要 200 毫秒。

public static IEnumerable<int> FastFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i > 3) Thread.Sleep(200);
        yield return i;
    }
}

第二个以 200 毫秒的延迟返回 1、2 和 3,但快速返回 4 和 5。

public static IEnumerable<int> SlowFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i < 4) Thread.Sleep(200);
        yield return i;
    }
}

联合这两个序列给我的只是数字 1 到 5。

FastFirst().Union(SlowFirst());

我不能保证这两种方法中的哪一种在什么时候有延迟,所以执行的顺序不能保证我的解决方案。因此,我想并行化联合,以最大限度地减少我的示例中的(人为)延迟。

一个真实的场景:我有一个返回一些实体的缓存和一个返回所有实体的数据源。我希望能够从内部并行化对缓存和数据源的请求的方法返回一个迭代器,以便缓存结果尽可能快地产生。

注 1:我意识到这仍然在浪费 CPU 周期;我不是在问如何防止序列迭代它们的慢元素,而是问我如何尽可能快地合并它们。

更新 1:我已经定制了 achitaka-san 的出色响应以接受多个生产者,并使用 ContinueWhenAll 设置 BlockingCollection 的 CompleteAdding 一次。我只是把它放在这里,因为它会因为缺少评论格式而丢失。任何进一步的反馈都会很棒!

public static IEnumerable<TResult> SelectAsync<TResult>(
    params IEnumerable<TResult>[] producer)
{
    var resultsQueue = new BlockingCollection<TResult>();

    var taskList = new HashSet<Task>();
    foreach (var result in producer)
    {
        taskList.Add(
            Task.Factory.StartNew(
                () =>
                    {
                        foreach (var product in result)
                        {
                            resultsQueue.Add(product);
                        }
                    }));
    }

    Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());

    return resultsQueue.GetConsumingEnumerable();
}
4

2 回答 2

3

看看这个。第一种方法只是按照结果的顺序返回所有内容。第二个检查唯一性。如果你把它们链接起来,你会得到你想要的结果。

public static class Class1
{
    public static IEnumerable<TResult> SelectAsync<TResult>(
        IEnumerable<TResult> producer1,
        IEnumerable<TResult> producer2,
        int capacity)
    {
        var resultsQueue = new BlockingCollection<TResult>(capacity);
        var producer1Done = false;
        var producer2Done = false;

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer1)
            {
                resultsQueue.Add(product);
            }
            producer1Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer2)
            {
                resultsQueue.Add(product);
            }
            producer2Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        return resultsQueue.GetConsumingEnumerable();
    }


    public static IEnumerable<TResult> SelectAsyncUnique<TResult>(this IEnumerable<TResult> source)
    {
        HashSet<TResult> knownResults = new HashSet<TResult>();
        foreach (TResult result in source)
        {
            if (knownResults.Contains(result)) {continue;}
            knownResults.Add(result);
            yield return result;
        }
    }
}
于 2011-11-09T14:04:59.510 回答
0

与从数据库中获取相比,缓存几乎是即时的,因此您可以先从缓存中读取并返回这些项目,然后从数据库中读取并返回除了在缓存中找到的项目之外的项目。

如果您尝试将其并行化,您将增加很多复杂性,但获得的收益却很小。

编辑:

如果源的速度没有可预测的差异,您可以在线程中运行它们并使用同步哈希集来跟踪您已经获得的项目,将新项目放入队列中,然后让主线程读取从队列中:

public static IEnumerable<TItem> GetParallel<TItem, TKey>(Func<TItem, TKey> getKey, params IEnumerable<TItem>[] sources) {
  HashSet<TKey> found = new HashSet<TKey>();
  List<TItem> queue = new List<TItem>();
  object sync = new object();
  int alive = 0;
  object aliveSync = new object();
  foreach (IEnumerable<TItem> source in sources) {
    lock (aliveSync) {
      alive++;
    }
    new Thread(s => {
      foreach (TItem item in s as IEnumerable<TItem>) {
        TKey key = getKey(item);
        lock (sync) {
          if (found.Add(key)) {
            queue.Add(item);
          }
        }
      }
      lock (aliveSync) {
        alive--;
      }
    }).Start(source);
  }
  while (true) {
    lock (sync) {
      if (queue.Count > 0) {
        foreach (TItem item in queue) {
          yield return item;
        }
        queue.Clear();
      }
    }
    lock (aliveSync) {
      if (alive == 0) break;
    }
    Thread.Sleep(100);
  }
}

测试流:

public static IEnumerable<int> SlowRandomFeed(Random rnd) {
  int[] values = new int[100];
  for (int i = 0; i < 100; i++) {
    int pos = rnd.Next(i + 1);
    values[i] = i;
    int temp = values[pos];
    values[pos] = values[i];
    values[i] = temp;
  }
  foreach (int value in values) {
    yield return value;
    Thread.Sleep(rnd.Next(200));
  }
}

测试:

Random rnd = new Random();
foreach (int item in GetParallel(n => n, SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd))) {
  Console.Write("{0:0000 }", item);
}
于 2011-11-09T13:41:17.720 回答