11

I have an efficient C# application that receives 80 bytes of data at a rate of 5k to 10k records per second on a multi threaded CPU.

I need to now set up a in memory-cache to detect and filter duplicate records so I can suppress them from travelling further in the pipeline.

Cache specs (maximum thresholds)

  • 80 bytes of data
  • 10,000 records / second
  • 60 Seconds of cache = Key quantity = 60,000
  • (sub total 48000000 bytes = 48Mb )
  • Ideal cache size = 5 minutes (or 240Mb)
  • Acceptable runtime cache size bloat = 1 GB

Question

What is the best way to set up an in-memory cache, dictionary, hashtable, array, etc that will allow the most efficient lookups, purging of old cache data, and prevent expiration of data that is hit.

I looked at ASP.Net Cache, System.Runtime.MemoryCache but think I need something more lightweight and customized to achieve correct throughput. I'm also looking at the System.Collections.Concurrent as an alternative and this related whitepaper.

Does anyone have suggestions on what the best approach would be?

4

3 回答 3

10

Remember, don't prematurely optimise!

There may be a reasonably concise way of doing this without resorting to unmanaged code, pointers and the like.

A quick test on my old, ordinary laptop shows that you can add 1,000,000 entries to a HashSet while removing 100,000 entries in ~100ms. You can then repeat that with the same 1,000,000 values in ~60ms. This is for working with just longs - 80 byte data structures are obviously larger, but a simple benchmark is in order.

My recommendations:

  • Implement the 'lookup' and 'duplicate detection' as a HashSet, which is extremely fast for inserting, removing and finding.

  • Implement the actual buffer (that receives new events and expires old ones) as a suitably large circular/ring buffer. This will avoid memory allocations and deallocations, and can add entries to the front and remove them from the back. Here are some helpful links including one (the second one) that describes algorithms for expiring items in the cache:

Circular Buffer for .NET

Fast calculation of min, max, and average of incoming numbers

Generic C# RingBuffer

How would you code an efficient Circular Buffer in Java or C#

  • Note that the circular buffer is even better if you want your cache to be bounded by number of elements (say 100,000) rather than time of events (say the last 5 minutes).

  • When items are removed from the buffer (which searches from the end first), they can be removed from the HashSet also. No need to make both data structures the same.

  • Avoid multithreading until you need it! You have a naturally 'serial' workload. Unless you know one of your CPU threads can't handle the speed, keep it in a single thread. This avoids contention, locks, CPU cache misses and other multithreading headaches that tend to slow things down for workloads that are not embarrassingly parallel. My main caveat here is that you may want to offload the 'receiving' of the events to a different thread from the processing of them.

  • The above recommendation is the main idea behind Staged event-driven architecture (SEDA) that is used as the basis for high-performance and stable-behaviour event-driven systems (such as messaging queues).

The above design can be wrapped cleanly, and attempts to achieve the raw performance required with a minimum of complexity. This only provides a decent baseline from which efficiency can now be extracted and measured.

(Note: If you need persistence for the cache, look at Kyoto Cabinet. If you need the cache to be visible to other users or distributed, look at Redis.

于 2012-05-12T22:19:24.850 回答
0

I don't have anything to back this up but I do like some weekend practice :)

To solve the purging you could use a circular cache where the latest values overwrite the oldest (you won't have an exact n minutes cache that way of course) so you only need to remember the offset where your last record went. You could initialize the cache by filling it with copies of the first record, to prevent it from matching a record with only 0's with the cache's uninitialized data.

You could then simply start matching from the first byte, if the record doesn't match, skip that record's remaining bytes and attempt to match the next until the end of the cache.

If the records contain a header followed by data you may want to match backwards to increase the speed at which you find nonmatching data.

于 2012-05-12T14:58:50.440 回答
0

Here is a sample that will work with a single thread. The code uses two dictionaries to track the data. One dictionary is used to track records per interval hashDuplicateTracker and a second dictionary to age out certain values of the dictionary HashesByDate

Bugs: CheckDataFreshness has a few bugs in it relating to ElementAt()... I'm working through this.

Some improvements I should make

  • Replace the Linq operator ElementAt(x) with something else
  • Make sure CheckDataFreshness runs no more frequently than once per interval

To make this multithreaded

  • Replace Dictionary with ConcurrentDictionary for FrequencyOfMatchedHash, DecrementRecordHash,
  • Get a sorted version of ConcurrentDictionary or use locks for HashesByDate

 public class FrequencyOfMatchedHash : Dictionary<int,int>
{ 
    public void AddRecordHash(int hashCode)
    {
        if (this.ContainsKey(hashCode))
        {
            this[hashCode]++;
        }
        else
        {
            this.Add(hashCode, 1);
        }
    }
    public void DecrementRecordHash(int hashCode)
    {
        if (this.ContainsKey(hashCode))
        {
            var val = this[hashCode];
            if (val <= 1)
            {
                this.Remove(hashCode);
            }
        } 
    }

    public override string ToString()
    {
        return this.Count + " records";
    }
}

public class HashDuplicateTracker : Dictionary<int, int >
{

    internal void AddRecord(int recordHash)
    {
        if (this.ContainsKey(recordHash))
        {
            this[recordHash]++;
        }
        else
        {
            this.Add(recordHash, 1);
        }
    }
}


public class HashesByDate : SortedDictionary<DateTime, FrequencyOfMatchedHash>
{
    internal void AddRecord(DateTime dt, int recordHash)
    {
        if (this.ContainsKey(dt))
        {
            this[dt].AddRecordHash(recordHash);
        }
        else
        {

            var tmp = new FrequencyOfMatchedHash();
            tmp.AddRecordHash(recordHash);

            var tmp2 = new FrequencyOfMatchedHash();
            tmp2.AddRecordHash(recordHash);
            this.Add(dt, tmp);
        }
    }
}
public class DuplicateTracker
{
    HashDuplicateTracker hashDuplicateTracker = new HashDuplicateTracker();

    // track all the hashes by date
    HashesByDate hashesByDate = new HashesByDate();


    private TimeSpan maxRange;
    private int average;

    public DuplicateTracker(TimeSpan range)
    {
        this.maxRange = range;
    }

    public void AddRecordHash(DateTime dt, int recordHash)
    {
        if (hashesByDate.Count == 0)
        {
            hashDuplicateTracker.AddRecord(recordHash);
            hashesByDate.AddRecord(dt, recordHash);

            return;
        }
        else
        {
            // Cleanup old data
            DateTime maxDate = hashesByDate.ElementAt(hashesByDate.Count - 1).Key;
            DateTime oldestPermittedEntry = maxDate - maxRange;

            if (dt >= oldestPermittedEntry)
                try
                {
                    hashDuplicateTracker.AddRecord(recordHash);
                    hashesByDate.AddRecord(dt, recordHash);

                    CheckDataFreshness(oldestPermittedEntry);
                }
                catch (ArgumentException e)
                {
                    // An entry with the same key already exists.
                    // Increment count/freshness
                    hashesByDate[dt].AddRecordHash(recordHash);
                    hashDuplicateTracker[recordHash]++;
                    CheckDataFreshness(oldestPermittedEntry);
                }
        }
    }


    /// <summary>
    /// This should be called anytime data is added to the collection
    /// 
    /// If threading issues are addressed, a more optimal solution would be to run this on an independent thread.
    /// </summary>
    /// <param name="oldestEntry"></param>
    private void CheckDataFreshness(DateTime oldestEntry)
    {
        while (hashesByDate.Count > 0)
        {
            DateTime currentDate = hashesByDate.ElementAt(0).Key;

            if (currentDate < oldestEntry)
            {
                var hashesToDecrement = hashesByDate.ElementAt(0).Value;

                for (int i = 0; i < hashesToDecrement.Count; i++)
                {
                    int currentHash = hashesToDecrement.ElementAt(0).Key;
                    int currentValue = hashesToDecrement.ElementAt(0).Value;

                    // decrement counter for hash
                    int tmpResult = hashDuplicateTracker[currentHash] - currentValue;
                    if (tmpResult == 0)
                    {
                        // prevent endless memory growth.
                        // For performance this might be deferred 
                        hashDuplicateTracker.Remove(tmpResult);
                    }
                    else
                    {
                        hashDuplicateTracker[currentHash] = tmpResult;
                    }

                    // remove item
                    continue;
                }

                hashesByDate.Remove(currentDate);

            }
            else
                break;
        }
    }

 }
于 2012-05-13T19:39:40.530 回答