We have up to 30 GB of GZipped log files per day. Each file holds 100.000 lines and is between 6 and 8 MB when compressed. The simplified code in which the parsing logic has been stripped out, utilises the Parallel.ForEach loop.
The maximum number of lines processed peaks at MaxDegreeOfParallelism of 8 on the two-NUMA node, 32 logical CPU box (Intel Xeon E7-2820 @ 2 GHz):
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.IO;
using System.IO.Compression;
using System.Threading.Tasks;
namespace ParallelLineCount
{
public class ScriptMain
{
static void Main(String[] args)
{
int maxMaxDOP = (args.Length > 0) ? Convert.ToInt16(args[0]) : 2;
string fileLocation = (args.Length > 1) ? args[1] : "C:\\Temp\\SomeFiles" ;
string filePattern = (args.Length > 1) ? args[2] : "*2012-10-30.*.gz";
string fileNamePrefix = (args.Length > 1) ? args[3] : "LineCounts";
Console.WriteLine("Start: {0}", DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
Console.WriteLine("Processing file(s): {0}", filePattern);
Console.WriteLine("Max MaxDOP to be used: {0}", maxMaxDOP.ToString());
Console.WriteLine("");
Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");
for (int maxDOP = 1; maxDOP <= maxMaxDOP; maxDOP++)
{
// Construct ConcurrentStacks for resulting strings and counters
ConcurrentStack<Int64> TotalLines = new ConcurrentStack<Int64>();
ConcurrentStack<Int64> TotalSomeBookLines = new ConcurrentStack<Int64>();
ConcurrentStack<Int64> TotalLength = new ConcurrentStack<Int64>();
ConcurrentStack<int> TotalFiles = new ConcurrentStack<int>();
DateTime FullStartTime = DateTime.Now;
string[] files = System.IO.Directory.GetFiles(fileLocation, filePattern);
var options = new ParallelOptions() { MaxDegreeOfParallelism = maxDOP };
// Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
Parallel.ForEach(files, options, currentFile =>
{
string filename = System.IO.Path.GetFileName(currentFile);
DateTime fileStartTime = DateTime.Now;
using (FileStream inFile = File.Open(fileLocation + "\\" + filename, FileMode.Open))
{
Int64 lines = 0, someBookLines = 0, length = 0;
String line = "";
using (var reader = new StreamReader(new GZipStream(inFile, CompressionMode.Decompress)))
{
while (!reader.EndOfStream)
{
line = reader.ReadLine();
lines++; // total lines
length += line.Length; // total line length
if (line.Contains("book")) someBookLines++; // some special lines that need to be parsed later
}
TotalLines.Push(lines); TotalSomeBookLines.Push(someBookLines); TotalLength.Push(length);
TotalFiles.Push(1); // silly way to count processed files :)
}
}
}
);
TimeSpan runningTime = DateTime.Now - FullStartTime;
// Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");
Console.WriteLine("{0},{1},{2},{3},{4},{5},{6},{7}",
maxDOP.ToString(),
TotalFiles.Sum().ToString(),
Convert.ToInt32(runningTime.TotalMilliseconds).ToString(),
TotalLength.Sum().ToString(),
TotalLines.Sum(),
TotalSomeBookLines.Sum().ToString(),
Convert.ToInt64(TotalLines.Sum() / runningTime.TotalMilliseconds).ToString(),
Convert.ToInt64(TotalLength.Sum() / runningTime.TotalMilliseconds).ToString());
}
Console.WriteLine();
Console.WriteLine("Finish: " + DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
}
}
}
Here's a summary of the results, with a clear peak at MaxDegreeOfParallelism = 8:
The CPU load (shown aggregated here, most of the load was on a single NUMA node, even when DOP was in 20 to 30 range):
The only way I've found to make CPU load cross 95% mark was to split the files across 4 different folders and execute the same command 4 times, each one targeting a subset of all files.
Can someone find a bottleneck?