4

我正在调用一个工作方法,该方法调用数据库,然后迭代并产生返回值以进行并行处理。为了防止它破坏数据库,我在那里有一个 Thread.Sleep 来暂停对数据库的执行。但是,这似乎会阻止仍在 Parallel.ForEach 中发生的执行。实现此目的以防止阻塞的最佳方法是什么?

private void ProcessWorkItems()
{
    _cancellation = new CancellationTokenSource();
    _cancellation.Token.Register(() => WorkItemRepository.ResetAbandonedWorkItems());

    Task.Factory.StartNew(() =>
        Parallel.ForEach(GetWorkItems().AsParallel().WithDegreeOfParallelism(10), workItem =>
        {
            var x = ItemFactory(workItem);
            x.doWork();
        }), _cancellation.Token);
}

private IEnumerable<IAnalysisServiceWorkItem> GetWorkItems()
{
    while (!_cancellation.IsCancellationRequested)
    {
        var workItems = WorkItemRepository.GetItemList(); //database call

        workItems.ForEach(item =>
        {
            item.QueueWorkItem(WorkItemRepository);
        });

        foreach (var item in workItems)
        {
            yield return item;
        }

        if (workItems.Count == 0)
        {
            Thread.Sleep(30000); //sleep this thread for 30 seconds if no work items.
        }
    }

    yield break;
}

编辑:我将其更改为包含答案,但它仍然无法按我的预期工作。我将 .AsParallel().WithDegreeOfParallelism(10) 添加到 GetWorkItems() 调用中。当我认为 Parallel 即使在基本线程处于休眠状态时也应该继续执行时,我的期望是否不正确?

示例:我有 15 个项目,它迭代并抓取 10 个项目并启动它们。每个完成后,它都会从 GetWorkItems 请求另一个,直到它尝试请求第 16 个项目。此时,它应该停止尝试抓取更多项目,但应该继续处理项目 11-15,直到这些项目完成。并行应该如何工作?因为它目前没有这样做。它目前正在做的是当它完成 6 时,它锁定后续 10 仍然在 Parallel.ForEach 中运行。

4

4 回答 4

8

我建议您创建一个工作项的BlockingCollection(一个队列),以及一个每 30 秒调用一次数据库以填充它的计时器。就像是:

BlockingCollection<WorkItem> WorkItems = new BlockingCollection<WorkItem>();

在初始化时:

System.Threading.Timer WorkItemTimer = new Timer((s) =>
    {
        var items = WorkItemRepository.GetItemList(); //database call
        foreach (var item in items)
        {
            WorkItems.Add(item);
        }
    }, null, 30000, 30000);

这将每 30 秒查询一次数据库中的项目。

为了安排要处理的工作项,您有许多不同的解决方案。最接近您所拥有的是:

WorkItem item;

while (WorkItems.TryTake(out item, Timeout.Infinite, _cancellation))
{
    Task.Factory.StartNew((s) =>
        {
            var myItem = (WorkItem)s;
            // process here
        }, item);
}

这消除了任何线程中的阻塞,并让 TPL 决定如何最好地分配并行任务。

编辑:

实际上,更接近您所拥有的是:

foreach (var item in WorkItems.GetConsumingEnumerable(_cancellation))
{
    // start task to process item
}

您也许可以使用:

Parallel.Foreach(WorkItems.GetConsumingEnumerable(_cancellation).AsParallel ...

我不知道这是否会奏效或效果如何。也许值得尝试一下 。. .

编辑结束

一般来说,我的建议是将其视为生产者/消费者应用程序,生产者是定期查询数据库以获取新项目的线程。我的示例每 N(在本例中为 30)秒查询一次数据库,如果平均而言,您可以每 30 秒清空一次工作队列,这将很有效。这将使从项目发布到数据库到您获得结果的平均延迟不到一分钟。

您可以降低轮询频率(从而降低延迟),但这会导致更多的数据库流量。

你也可以用它变得更漂亮。例如,如果您在 30 秒后轮询数据库并获得大量项目,那么您可能很快就会得到更多,并且您希望在 15 秒(或更短)内再次轮询。相反,如果您在 30 秒后轮询数据库但没有得到任何结果,那么您可能需要等待更长时间才能再次轮询。

您可以使用一次性计时器设置这种自适应轮询。也就是说,您在创建计时器时为最后一个参数指定 -1,这会导致它只触发一次。您的计时器回调计算出在下一次轮询之前等待多长时间,并调用Timer.Change以使用新值初始化计时器。

于 2011-09-26T23:22:27.153 回答
3

您可以使用.WithDegreeOfParallelism()扩展方法来强制 Plinq 同时运行任务。C# Threading Handbook的Call Blocking or I/O Intensive部分有一个很好的例子

于 2011-09-26T20:58:18.963 回答
2

您可能会与分区程序发生冲突。

因为您正在传递一个 IEnumerable,Parallel.ForEach 将使用一个块分区器,它可以尝试一次从块中的枚举中获取一些元素。但是你的 IEnumerable.MoveNext 可以睡觉,这会让事情变得不愉快。

您可以编写自己的分区器,一次返回一个元素,但无论如何,我认为像 Jim Mischel 的建议这样的生产者/消费者方法会更好。

于 2011-09-27T13:04:14.263 回答
0

你想用睡眠来完成什么?据我所知,您正试图避免冲击数据库调用。我不知道有更好的方法来做到这一点,但理想情况下,您的GetItemList呼叫似乎会被阻塞,直到数据可供处理。

于 2011-09-26T22:15:12.633 回答