4

我对数据并发处理有疑问。我的电脑很快就用完了 RAM。关于如何修复我的并发实现的任何建议?

普通类:

public class CalculationResult
{
    public int Count { get; set; }
    public decimal[] RunningTotals { get; set; }

    public CalculationResult(decimal[] profits)
    {
        this.Count = 1;
        this.RunningTotals = new decimal[12];
        profits.CopyTo(this.RunningTotals, 0);
    }

    public void Update(decimal[] newData)
    {
        this.Count++;

        // summ arrays
        for (int i = 0; i < 12; i++)
            this.RunningTotals[i] = this.RunningTotals[i] + newData[i];
    }

    public void Update(CalculationResult otherResult)
    {
        this.Count += otherResult.Count;

        // summ arrays
        for (int i = 0; i < 12; i++)
            this.RunningTotals[i] = this.RunningTotals[i] + otherResult.RunningTotals[i];
    }
}

代码的单核实现如下:

Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
foreach (var i in itterations)
{
    // do the processing
    // ..
    string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing

    if (combinations.ContainsKey(combination))
        combinations[combination].Update(newData);
    else
        combinations.Add(combination, new CalculationResult(newData));
}

多核实现:

ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();
Parallel.ForEach(itterations, (i, state) => 
{
    Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
    // do the processing
    // ..
    // add combination to combinations -> same logic as in single core implementation
    results.Add(combinations);
});
Dictionary<string, CalculationResult> combinationsReal = new Dictionary<string, CalculationResult>();
foreach (var item in results)
{
    foreach (var pair in item)
    {
        if (combinationsReal.ContainsKey(pair.Key))
            combinationsReal[pair.Key].Update(pair.Value);
        else
            combinationsReal.Add(pair.Key, pair.Value);
    }
}

我遇到的问题是几乎每个combinations字典都以其中的930k记录结尾,这平均会消耗400 [MB]RAM内存。

现在,在单核实现中只有一个这样的字典。所有检查都是针对一本字典执行的。但这是一种缓慢的方法,我想使用多核优化。

在多核实现中,ConcurrentBag创建了一个包含所有combinations字典的实例。多线程作业完成后 - 所有字典都聚合为一个。这种方法适用于少量并发迭代。例如,对于 4 次迭代,我的RAM使用量为~ 1.5 [GB]. 当我设置并行迭代的全部数量时,问题就出现了,即 200!再多的 PCRAM也不足以容纳所有字典,每本字典都有数百万条记录!

我一直在考虑使用ConcurrentDictioanary,直到我发现“TryAdd”方法在我的情况下不能保证添加数据的完整性,因为我还需要对运行总计进行更新。

唯一真正的多线程选项是,而不是将全部添加combinations到字典中 - 是将它们保存到某个数据库中。数据聚合将是一个select带有group by子句的 1 个 SQL 语句的问题......但我不喜欢为此创建一个临时表并运行数据库实例的想法......

是否有解决如何同时处理数据而不用完 RAM 的方法?

编辑:也许真正的问题应该是 - 如何RunningTotals在使用时更新线程安全ConcurrentDictionary?我刚刚遇到了这个线程,遇到了类似的问题ConcurrentDictionary,但我的情况似乎更复杂,因为我有一个需要更新的数组。我还在调查这件事。

EDIT2:这是一个有效的解决方案ConcurrentDictionary。我需要做的就是为字典键添加一个锁。

ConcurrentDictionary<string, CalculationResult> combinations = new ConcurrentDictionary<string, CalculationResult>();
Parallel.ForEach(itterations, (i, state) => 
{
    // do the processing
    // ..
    string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing

    if (combinations.ContainsKey(combination)) {
        lock(combinations[combination])
            combinations[combination].Update(newData);
    }
    else
        combinations.TryAdd(combination, new CalculationResult(newData));
});

单线程代码执行时间为1m 48s,而此解决方案执行时间1m 7s为 4 次迭代(性能提升 37%)。我仍然想知道 SQL 方法是否会更快,有数百万条记录?我可能会在明天对其进行测试并更新。

编辑 3:对于那些想知道ConcurrentDictionary更新值有什么问题的人 - 在有和没有锁定的情况下运行此代码。

public class Result
{
    public int Count { get; set; }
}

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Start");

        List<int> keys = new List<int>();
        for (int i = 0; i < 100; i++)
            keys.Add(i);

        ConcurrentDictionary<int, Result> dict = new ConcurrentDictionary<int, Result>();
        Parallel.For(0, 8, i =>
        {
            foreach(var key in keys)
            {
                if (dict.ContainsKey(key))
                {
                    //lock (dict[key]) // uncomment this
                        dict[key].Count++;
                }
                else
                    dict.TryAdd(key, new Result());
            }
        });

        // any output here is incorrect behavior. best result = no lines
        foreach (var item in dict)
            if (item.Value.Count != 7) { Console.WriteLine($"{item.Key}; {item.Value.Count}"); }

        Console.WriteLine($"Finish");
        Console.ReadKey();
    }
}

编辑 4:经过反复试验,我无法优化 SQL 方法。结果证明这是最糟糕的主意:) 我使用了SQL Lite数据库。内存中和文件中。带有事务和可重用的 SQL 命令参数。由于需要插入大量记录 -​​ 性能不足。数据聚合是最简单的部分,但仅仅插入 400 万行就需要大量时间,我什至无法想象如何有效地处理这 2.4 亿条数据.. 到目前为止(也很奇怪) ,ConcurrentBag方法似乎是我电脑上最快的。其次是一个ConcurrentDictionary方法。ConcurrentBag不过,在内存上有点重。感谢@Alisson的工作——现在可以将它用于更大的迭代集!

4

1 回答 1

2

所以,你只需要确保你不会有超过 4 个并发迭代,这是你的计算机资源的限制,并且只使用这台计算机,没有魔法。

我创建了一个类来控制并发执行和它将执行的并发任务的数量。

该类将包含以下属性:

public class ConcurrentCalculationProcessor
{
    private const int MAX_CONCURRENT_TASKS = 4;
    private readonly IEnumerable<int> _codes;
    private readonly List<Task<Dictionary<string, CalculationResult>>> _tasks;
    private readonly Dictionary<string, CalculationResult> _combinationsReal;

    public ConcurrentCalculationProcessor(IEnumerable<int> codes)
    {
        this._codes = codes;
        this._tasks = new List<Task<Dictionary<string, CalculationResult>>>();
        this._combinationsReal = new Dictionary<string, CalculationResult>();
    }
}

我将并发任务的数量设为 a const,但它可以是构造函数中的参数。

我创建了一种处理处理的方法。出于测试目的,我模拟了 900k 个元素的循环,将它们添加到字典中,最后返回它们:

private async Task<Dictionary<string, CalculationResult>> ProcessCombinations()
{
    Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
    // do the processing
    // here we should do something that worth using concurrency
    // like querying databases, consuming APIs/WebServices, and other I/O stuff
    for (int i = 0; i < 950000; i++)
        combinations[i.ToString()] = new CalculationResult(new decimal[] { 1, 10, 15 });
    return await Task.FromResult(combinations);
}

main 方法将并行启动任务,将它们添加到任务列表中,以便我们最近可以跟踪它们。

每次列表达到最大并发任务时,我们await都会调用一个方法ProcessRealCombinations

public async Task<Dictionary<string, CalculationResult>> Execute()
{
    ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();

    for (int i = 0; i < this._codes.Count(); i++)
    {
        // start the task imediately
        var task = ProcessCombinations();
        this._tasks.Add(task);
        if (this._tasks.Count() >= MAX_CONCURRENT_TASKS)
        {
            // if we have more than MAX_CONCURRENT_TASKS in progress, we start processing some of them
            // this will await any of the current tasks to complete, them process it (and any other task which may have been completed as well)...
            await ProcessCompletedTasks().ConfigureAwait(false);
        }
    }

    // keep processing until all the pending tasks have been completed...it should be no more than MAX_CONCURRENT_TASKS
    while(this._tasks.Any())
        await ProcessCompletedTasks().ConfigureAwait(false);

    return this._combinationsReal;
}

下一个方法ProcessCompletedTasks将等待至少一个现有任务完成。之后,它将从列表中取出所有已完成的任务(已完成的任务和可能已完成的任何其他任务),并获得它们的结果(组合)。

对于 each processedCombinations,它将与this._combinationsReal(使用您在问题中提供的相同逻辑)合并。

private async Task ProcessCompletedTasks()
{
    await Task.WhenAny(this._tasks).ConfigureAwait(false);
    var completedTasks = this._tasks.Where(t => t.IsCompleted).ToArray();
    // completedTasks will have at least one task, but it may have more ;)
    foreach (var completedTask in completedTasks)
    {
        var processedCombinations = await completedTask.ConfigureAwait(false);
        foreach (var pair in processedCombinations)
        {
            if (this._combinationsReal.ContainsKey(pair.Key))
                this._combinationsReal[pair.Key].Update(pair.Value);
            else
                this._combinationsReal.Add(pair.Key, pair.Value);
        }
        this._tasks.Remove(completedTask);
    }
}

对于每个processedCombinations合并的 in _combinationsReal,它将从列表中删除其各自的任务,然后继续(再次开始添加更多任务)。这将发生,直到我们为所有迭代创建了所有任务。

最后,我们继续处理它,直到列表中没有更多的任务。


如果您监控 RAM 消耗,您会注意到它会增加到大约 1.5 GB(当我们同时处理 4 个任务时),然后减少到大约 0.8 GB(当我们从列表中删除任务时)。至少这是在我的电脑上发生的事情。

这是一个fiddle,但是我不得不将 itens 的数量从 900k 减少到 100,因为 fiddle 限制了内存使用以避免滥用。

我希望这对你有所帮助。


关于所有这些东西需要注意的一件事是,如果您的ProcessCombinations(在处理这 900k 个项目时同时执行的方法)调用外部资源,例如从 HD 读取文件,在数据库,调用 API/WebService 方法。我猜该代码可能正在从外部资源读取 900k 个项目,那么这将减少处理它所需的时间。

如果项目先前已加载并且ProcessCombinations只是读取已经在内存中的数据,那么并发将根本没有帮助(实际上我相信它会使您的代码运行得更慢)。如果是这种情况,那么我们在错误的地方应用了并发。

当所述调用要访问外部资源(获取或存储数据)时,并行使用async调用可能会更有帮助,并且取决于外部资源可以支持多少并发调用,它可能仍然不会产生这样的差异。

于 2017-12-23T03:38:56.413 回答