0

I'm stuck on a problem and am wondering if I just have coded something incorrectly. The application polls every few seconds and grabs every record from a table whose sole purpose is to signify what records to act upon.

Please note I've left out the error handling code for space and readability

    //Producing Thread, this is triggered every 5 seconds... UGH, I hate timers

    foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
    {
        if (!ConcurrentDictionary.Contains(Record.Key))
            ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
    }

This code works great, with the irritating fact that it may/will select the same record multiple times until said record(s) is/are processed. By processed, each selected record is being written into its own newly created, uniquely named file. Then a stored procedure is called for that record's key to remove it from the database at which point that particular key is removed from the ConcurrentDictionary.

    // Consuming Thread, located within another loop to allow
    // the below code to continue to cycle until instructed
    // to terminate

    while (!ConcurrentDictionary.IsEmpty)
    {
        var Record = ConcurrentDictionary.Take(1).First();
        WriteToNewFile(Record.Value);
        RemoveFromDatabase(Record.Key);
        ConcurrentDictionary.TryRemove(Record.Key);
    }

For a throughput test I added 20k+ records into the table and then turned the application loose. I was quite surprised when I noticed 22k+ files that continued to increase well into 100k+ territory.

What am I doing wrong??? Have I completely misunderstood what the concurrent dictionary is used for? Did I forget a semi-colon somewhere?

4

2 回答 2

0

What am I doing wrong???

The foreach (add) loop is trying to add any record not in the database to the dictionary.

The while (remove) loop is removing items from the database and then the dictionary, also writing them to file.

This logic looks correct. But there is a race:

GetRecordsFromDataBase(); // returns records 1 through 10.

switch context to remove loop.

    WriteToNewFile(Record.Value);    // write record 5
    RemoveFromDatabase(Record.Key);  // remove record 5 from db
    ConcurrentDictionary.TryRemove(Record.Key); // remove record 5 from dictionary

switch back to add loop

 ConcurrentDictionary.TryAdd(Record.Key, Record.Value); // add record 5 even though it is not in the DB becuase it was part of the records returned by ConcurrentDictionary.TryAdd(Record.Key, Record.Value);;

After the item is removed the foreach loop adds it again. This is why your file count is multiplying.

foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
    {
        if (!ConcurrentDictionary.Contains(Record.Key)) // this if is not required. try add will do.
            ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
    }

Try something like this: add loop:

   foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
            {
               if (ConcurrentDictionary.TryAdd(Record.Key, false)) // only adds the record if it has not been processed.
               {
                   ConcurrentQueue.Enque(record) // enqueue the record
               } 
            }

Remove loop

var record;//   you will need to specify the type

    if (ConcurrentQueue.TryDequeue(record))
    {
         if (ConcurrentDictionary.TryUpdate(record.key,true,false)) // update the value from true to false
         {
            WriteToNewFile(Record.Value);    // write record 5
            RemoveFromDatabase(Record.Key);  // remove record 5 from db
         }
    }

This will leave items in the dictionary for each record processed. You can remove them from the dictionary eventually but multithreading involving a db can be tricky.

于 2014-07-28T18:51:30.030 回答
0

首先,消除对 Contains 的调用。TryAdd 已检查重复项,如果该项已存在则返回 false。

foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
{
        ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}

我看到的下一个问题是我认为 ConcurrentDictionary.Take(1).First() 不是从字典中获取项目的好方法,因为它不是原子的。我认为您想改用BlockingCollection()。它是专门为实现生产者-消费者模式而设计的。

最后,我认为您的问题实际上与字典无关,而与数据库有关。字典本身是线程安全的,但你的字典与数据库不是原子的。所以假设记录 A 在数据库中。GetRecordsFromDataBase() 将其拉出并将其添加到字典中。然后它开始处理记录 A(我假设这是在另一个线程中)。然后,第一个循环再次调用 GetRecordsFromDataBase() 并再次获取记录 A。同时,记录 A 被处理并从数据库中删除。但为时已晚!GetRecordsFromDataBase() 已经抓住了它!因此,初始循环在将其删除后再次将其添加到字典中。

我认为您可能需要获取要处理的记录,并将它们完全移动到另一个表中。这样,他们就不会被第二次接走。在 C# 级别而不是数据库级别执行此操作将是一个问题。要么,要么您不想在处理记录时将记录添加到队列中。

于 2014-07-28T18:33:29.153 回答