15

We have up to 30 GB of GZipped log files per day. Each file holds 100.000 lines and is between 6 and 8 MB when compressed. The simplified code in which the parsing logic has been stripped out, utilises the Parallel.ForEach loop.

The maximum number of lines processed peaks at MaxDegreeOfParallelism of 8 on the two-NUMA node, 32 logical CPU box (Intel Xeon E7-2820 @ 2 GHz):

using System;

using System.Collections.Concurrent;

using System.Linq;
using System.IO;
using System.IO.Compression;

using System.Threading.Tasks;

namespace ParallelLineCount
{
    public class ScriptMain
    {
        static void Main(String[] args)
        {
            int    maxMaxDOP      = (args.Length > 0) ? Convert.ToInt16(args[0]) : 2;
            string fileLocation   = (args.Length > 1) ? args[1] : "C:\\Temp\\SomeFiles" ;
            string filePattern    = (args.Length > 1) ? args[2] : "*2012-10-30.*.gz";
            string fileNamePrefix = (args.Length > 1) ? args[3] : "LineCounts";

            Console.WriteLine("Start:                 {0}", DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
            Console.WriteLine("Processing file(s):    {0}", filePattern);
            Console.WriteLine("Max MaxDOP to be used: {0}", maxMaxDOP.ToString());
            Console.WriteLine("");

            Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");

            for (int maxDOP = 1; maxDOP <= maxMaxDOP; maxDOP++)
            {

                // Construct ConcurrentStacks for resulting strings and counters
                ConcurrentStack<Int64> TotalLines = new ConcurrentStack<Int64>();
                ConcurrentStack<Int64> TotalSomeBookLines = new ConcurrentStack<Int64>();
                ConcurrentStack<Int64> TotalLength = new ConcurrentStack<Int64>();
                ConcurrentStack<int>   TotalFiles = new ConcurrentStack<int>();

                DateTime FullStartTime = DateTime.Now;

                string[] files = System.IO.Directory.GetFiles(fileLocation, filePattern);

                var options = new ParallelOptions() { MaxDegreeOfParallelism = maxDOP };

                //  Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
                Parallel.ForEach(files, options, currentFile =>
                    {
                        string filename = System.IO.Path.GetFileName(currentFile);
                        DateTime fileStartTime = DateTime.Now;

                        using (FileStream inFile = File.Open(fileLocation + "\\" + filename, FileMode.Open))
                        {
                            Int64 lines = 0, someBookLines = 0, length = 0;
                            String line = "";

                            using (var reader = new StreamReader(new GZipStream(inFile, CompressionMode.Decompress)))
                            {
                                while (!reader.EndOfStream)
                                {
                                    line = reader.ReadLine();
                                    lines++; // total lines
                                    length += line.Length;  // total line length

                                    if (line.Contains("book")) someBookLines++; // some special lines that need to be parsed later
                                }

                                TotalLines.Push(lines); TotalSomeBookLines.Push(someBookLines); TotalLength.Push(length);
                                TotalFiles.Push(1); // silly way to count processed files :)
                            }
                        }
                    }
                );

                TimeSpan runningTime = DateTime.Now - FullStartTime;

                // Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");
                Console.WriteLine("{0},{1},{2},{3},{4},{5},{6},{7}",
                    maxDOP.ToString(),
                    TotalFiles.Sum().ToString(),
                    Convert.ToInt32(runningTime.TotalMilliseconds).ToString(),
                    TotalLength.Sum().ToString(),
                    TotalLines.Sum(),
                    TotalSomeBookLines.Sum().ToString(),
                    Convert.ToInt64(TotalLines.Sum() / runningTime.TotalMilliseconds).ToString(),
                    Convert.ToInt64(TotalLength.Sum() / runningTime.TotalMilliseconds).ToString());

            }
            Console.WriteLine();
            Console.WriteLine("Finish:                " + DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
        }
    }
}

Here's a summary of the results, with a clear peak at MaxDegreeOfParallelism = 8:

enter image description here

The CPU load (shown aggregated here, most of the load was on a single NUMA node, even when DOP was in 20 to 30 range):

enter image description here

The only way I've found to make CPU load cross 95% mark was to split the files across 4 different folders and execute the same command 4 times, each one targeting a subset of all files.

Can someone find a bottleneck?

4

4 回答 4

9

It's likely that one problem is the small buffer size used by the default FileStream constructor. I suggest you use a larger input buffer. Such as:

using (FileStream infile = new FileStream(
    name, FileMode.Open, FileAccess.Read, FileShare.None, 65536))

The default buffer size is 4 kilobytes, which has the thread making many calls to the I/O subsystem to fill its buffer. A buffer of 64K means that you will make those calls much less frequently.

I've found that a buffer size of between 32K and 256K gives the best performance, with 64K being the "sweet spot" when I did some detailed testing a while back. A buffer size larger than 256K actually begins to reduce performance.

Also, although this is unlikely to have a major effect on performance, you probably should replace those ConcurrentStack instances with 64-bit integers and use Interlocked.Add or Interlocked.Increment to update them. It simplifies your code and removes the need to manage the collections.

Update:

Re-reading your problem description, I was struck by this statement:

The only way I've found to make CPU load cross 95% mark was to split the files across 4 different folders and execute the same command 4 times, each one targeting a subset of all files.

That, to me, points to a bottleneck in opening files. As though the OS is using a mutual exclusion lock on the directory. And even if all the data is in the cache and there's no physical I/O required, processes still have to wait on this lock. It's also possible that the file system is writing to the disk. Remember, it has to update the Last Access Time for a file whenever it's opened.

If I/O really is the bottleneck, then you might consider having a single thread that does nothing but load files and stuff them into a BlockingCollection or similar data structure so that the processing threads don't have to contend with each other for a lock on the directory. Your application becomes a producer/consumer application with one producer and N consumers.

于 2012-11-01T13:30:27.877 回答
2

The reason for this is usually that threads synchronize too much.

Looking for synchronization in your code I can see heavy syncing on the collections. Your threads are pushing the lines individually. This means that each line incurs at best an interlocked operation and at worst a kernel-mode lock wait. The interlocked operations will contend heavily because all threads race to get their current line into the collection. They all try to update the same memory locations. This causes cache line pinging.

Change this to push lines in bigger chunks. Push line-arrays of 100 lines or more. The more the better.

In other words, collect results in a thread-local collection first and only rarely merge into the global results.

You might even want to get rid of the manual data pushing altogether. This is what PLINQ is made for: Streaming data concurrently. PLINQ abstracts away all the concurrent collection manipulations in a well-performing way.

于 2012-11-01T12:58:11.187 回答
2

I don't think Parallelizing the disk reads is helping you. In fact, this could be seriously impacting your performance by creating contention in reading from multiple areas of storage at same time.

I would restructure the program to first do a single-threaded read of raw file data into a memory stream of byte[]. Then, do a Parallel.ForEach() on each stream or buffer to decompress and count the lines.

You take an initial IO read hit up front but let the OS/hardware optimize the hopefully mostly sequential reads, then decompress and parse in memory.

Keep in mind that operations like decomprless, Encoding.UTF8.ToString(), String.Split(), etc. will use large amounts of memory, so clean up references to/dispose of old buffers as you no longer need them.

I'd be surprised if you can't cause the machine to generate some serious waste hit this way.

Hope this helps.

于 2013-01-03T08:24:35.673 回答
2

The problem, I think, is that you are using blocking I/O, so your threads cannot fully take advantage of parallelism.

If I understand your algorithm right (sorry, I'm more of a C++ guy) this is what you are doing in each thread (pseudo-code):

while (there is data in the file)
    read data
    gunzip data

Instead, a better approach would be something like this:

N = 0
read data block N
while (there is data in the file)
    asyncRead data block N+1
    gunzip data block N
    N = N + 1
gunzip data block N

The asyncRead call does not block, so basically you have the decoding of block N happening concurrently with the reading of block N+1, so by the time you are done decoding block N you might have block N+1 ready (or close to be ready if I/O is slower than decoding).

Then it's just a matter of finding the block size that gives you the best throughput.

Good luck.

于 2013-01-04T05:58:12.197 回答