我对数据并发处理有疑问。我的电脑很快就用完了 RAM。关于如何修复我的并发实现的任何建议?
普通类:
public class CalculationResult
{
public int Count { get; set; }
public decimal[] RunningTotals { get; set; }
public CalculationResult(decimal[] profits)
{
this.Count = 1;
this.RunningTotals = new decimal[12];
profits.CopyTo(this.RunningTotals, 0);
}
public void Update(decimal[] newData)
{
this.Count++;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + newData[i];
}
public void Update(CalculationResult otherResult)
{
this.Count += otherResult.Count;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + otherResult.RunningTotals[i];
}
}
代码的单核实现如下:
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
foreach (var i in itterations)
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination))
combinations[combination].Update(newData);
else
combinations.Add(combination, new CalculationResult(newData));
}
多核实现:
ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();
Parallel.ForEach(itterations, (i, state) =>
{
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
// do the processing
// ..
// add combination to combinations -> same logic as in single core implementation
results.Add(combinations);
});
Dictionary<string, CalculationResult> combinationsReal = new Dictionary<string, CalculationResult>();
foreach (var item in results)
{
foreach (var pair in item)
{
if (combinationsReal.ContainsKey(pair.Key))
combinationsReal[pair.Key].Update(pair.Value);
else
combinationsReal.Add(pair.Key, pair.Value);
}
}
我遇到的问题是几乎每个combinations
字典都以其中的930k
记录结尾,这平均会消耗400 [MB]
RAM内存。
现在,在单核实现中只有一个这样的字典。所有检查都是针对一本字典执行的。但这是一种缓慢的方法,我想使用多核优化。
在多核实现中,ConcurrentBag
创建了一个包含所有combinations
字典的实例。多线程作业完成后 - 所有字典都聚合为一个。这种方法适用于少量并发迭代。例如,对于 4 次迭代,我的RAM使用量为~ 1.5 [GB]
. 当我设置并行迭代的全部数量时,问题就出现了,即 200!再多的 PCRAM
也不足以容纳所有字典,每本字典都有数百万条记录!
我一直在考虑使用ConcurrentDictioanary
,直到我发现“TryAdd”方法在我的情况下不能保证添加数据的完整性,因为我还需要对运行总计进行更新。
唯一真正的多线程选项是,而不是将全部添加combinations
到字典中 - 是将它们保存到某个数据库中。数据聚合将是一个select
带有group by
子句的 1 个 SQL 语句的问题......但我不喜欢为此创建一个临时表并运行数据库实例的想法......
是否有解决如何同时处理数据而不用完 RAM 的方法?
编辑:也许真正的问题应该是 - 如何RunningTotals
在使用时更新线程安全ConcurrentDictionary
?我刚刚遇到了这个线程,遇到了类似的问题ConcurrentDictionary
,但我的情况似乎更复杂,因为我有一个需要更新的数组。我还在调查这件事。
EDIT2:这是一个有效的解决方案ConcurrentDictionary
。我需要做的就是为字典键添加一个锁。
ConcurrentDictionary<string, CalculationResult> combinations = new ConcurrentDictionary<string, CalculationResult>();
Parallel.ForEach(itterations, (i, state) =>
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination)) {
lock(combinations[combination])
combinations[combination].Update(newData);
}
else
combinations.TryAdd(combination, new CalculationResult(newData));
});
单线程代码执行时间为1m 48s
,而此解决方案执行时间1m 7s
为 4 次迭代(性能提升 37%)。我仍然想知道 SQL 方法是否会更快,有数百万条记录?我可能会在明天对其进行测试并更新。
编辑 3:对于那些想知道ConcurrentDictionary
更新值有什么问题的人 - 在有和没有锁定的情况下运行此代码。
public class Result
{
public int Count { get; set; }
}
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Start");
List<int> keys = new List<int>();
for (int i = 0; i < 100; i++)
keys.Add(i);
ConcurrentDictionary<int, Result> dict = new ConcurrentDictionary<int, Result>();
Parallel.For(0, 8, i =>
{
foreach(var key in keys)
{
if (dict.ContainsKey(key))
{
//lock (dict[key]) // uncomment this
dict[key].Count++;
}
else
dict.TryAdd(key, new Result());
}
});
// any output here is incorrect behavior. best result = no lines
foreach (var item in dict)
if (item.Value.Count != 7) { Console.WriteLine($"{item.Key}; {item.Value.Count}"); }
Console.WriteLine($"Finish");
Console.ReadKey();
}
}
编辑 4:经过反复试验,我无法优化 SQL 方法。结果证明这是最糟糕的主意:) 我使用了SQL Lite
数据库。内存中和文件中。带有事务和可重用的 SQL 命令参数。由于需要插入大量记录 - 性能不足。数据聚合是最简单的部分,但仅仅插入 400 万行就需要大量时间,我什至无法想象如何有效地处理这 2.4 亿条数据.. 到目前为止(也很奇怪) ,ConcurrentBag
方法似乎是我电脑上最快的。其次是一个ConcurrentDictionary
方法。ConcurrentBag
不过,在内存上有点重。感谢@Alisson的工作——现在可以将它用于更大的迭代集!