2

So something weird has started happening in my program. So I have multiple threads queued up with ThreadPool.QueueUserWorkItem. Until recently each thread in the threadpool would receive a List<MyObject> from an array and each insert the list into my MySql database. Recent changes to the handling of the list forced me to create one big list containing all the small lists (am using List.AddRange). I then split this list into smaller parts and sent each part to the same insert thread from before. Now the everything works perfectly with 7 threads or less. My program originally was using 8 threads to perform everything since I recall reading something that threads should be 2 * number of cores.

Now when I run the program with 8 threads it starts throwing all kinds of MySqlExceptions which just seem to be thrown at random. Example: All threads query a certain table that is never changed. Some threads will return the select * from table fine, while 1 random thread will suddenly say the table doesn't exist or that my DataTable is empty which is impossible since it's the same table and query. I've had all kinds of other exceptions such as OpenDataReader, Connection must valid and open, NullReferenceException when trying to close the Connection etc..

Note: Each thread holds a separate connection to the database and therefore shouldn't be conflicting with each other.

I'm not sure about pasting the code here since the code is pretty long. Plus I'm not so sure its something with my code since as I stated the problem arises when I use 8 threads or more. I don't want to take the easy road and just use 7 threads since there might be something underneath that I have no clue about that will cause problems in the future when I implement extra changes. I can edit my question accordingly if anyone asks me to show specific pieces.

public MySqlDb 
{ 
    MySqlConnection mySqlConnection;
    MySqlDataAdapter mySqlDataAdapter;
    public int ExecuteInsert(string query)
    {
        using (MySqlCommand mySqlCommand = new MySqlCommand())
        {
            mySqlCommand.CommandText = query;
            using (mySqlCommand.Connection = GetConnection())
            {
                  mySqlCommand.Connection.Open();
                  using (mySqlDataAdapter = new MySqlDataAdapter())
                  {
                       try
                       {
                           mySqlDataAdapter.InsertCommand = mySqlCommand;
                           return mySqlDataAdapter.InsertCommand.ExecuteNonQuery();
                       }
                       catch (MySqlException mse)
                       {
                            logger.Warn("MySqlException : " + mse.Message);
                            throw;
                       }
                       finally
                       {
                            mySqlCommand.Connection.Close();
                       }
                  }
             }
         }
    }

    public MySqlConnection GetConnection()
    {
        connectionString.DefaultCommandTimeout = 3600;
        myConnection = new MySqlConnection(connectionString.GetConnectionString(true));
        return myConnection;
    }
}

My connection string is loaded from my .xml file and is just a basic connection string

<add name="MySqlConnectionString" providerName="System.Data.SqlClient" connectionString="Server=server_name;Database=db_name; Uid=my_username;Pwd=my_pass;charset=utf8;port=3306;" </add>

Update 1: So after trying to figure out the problem I've cut back the amount of inserts I do to 1 at a time. Each thread contains an Object that represents my MySql table. From one of the exceptions when I print out the query it seems that Insert() is called twice without doing the actual mySql.ExecuteInsert(query.ToString()). I don't see any reason it should do that unless somehow some of the variables are getting shared amongst the threads even though each has its own separate Object. Below is the relevant code. If this is getting too long to be readable as a proper question please comment on this and how it should be done.

class PartsInventories 
{
    MySqlDb mySql;

    public const String PARTS_INVENTORIES = "parts_inventories";

    // find the optimum of best perfomance while not raising MAX_PACKETS_ALLOWED exception - right now 6000 - 2012-06-05
    public const int MAX_PARTS_IN_QUERY = 1;
    public StringBuilder query;
    bool havePartsToInsert = false;
    int insertedCount = 0;

    public PartsInventories()
    {
        mySql = new MySqlDb();
    }

    public void Init(int usersCompanyId, string mailId,
            string appendDate, bool toInsertPrices, string fileId)
    {
        // init query string and shared query variables here
        insertedCount = 0;
        query = new StringBuilder(GetQueryString());
    }

    private string GetQueryString()
    {
        return @"INSERT INTO  parts_inventories (part_number,base_part_number,
            date_code,quantity,price,currency_types_id,delivery,rohs,parts_pkg_type_id,parts_mfg_id,
            users_company_id,notes,in_stock,mail_id,append_date,file_id,ad_price) VALUES ";
    }

    private void InsertPart(ParsedInformation part, bool toInsertPrices)
    {
        // assign all specific query variables here
         query.AppendFormat("({0},{1},{2},{3},{4},{5},{6},{7},{8},{9},{10},{11},{12},{13},'{14}',{15},{16}),",
                    PartNumber, BasePartNumber, DateCode, Quantity, Price, CurrencyTypesId, Delivery,
                    Rohs, PartsPkgTypeId, PartsMfgId, UsersCompanyId, Notes, InStock, MailId, AppendDate, FileId, adPrice);
    }

    public void InsertPart(ParsedInformation part)
    {
        InsertPart(part, toInsertPrices);
        insertedCount++;
        if (insertedCount % MAX_PARTS_IN_QUERY == 0)
        {
            Insert();
            query = new StringBuilder(GetQueryString());
        }                
    }

    public void Insert()
    {
        if (!havePartsToInsert) return;

        query.Remove(query.Length - 1, 1);

        query.Append(@" ON DUPLICATE KEY UPDATE part_number = VALUES(part_number), date_code = VALUES(date_code),
        quantity = VALUES(quantity), price = VALUES(price), currency_types_id = VALUES(currency_types_id),
        delivery = VALUES(delivery),rohs = VALUES(rohs), parts_pkg_type_id = VALUES(parts_pkg_type_id),
        parts_mfg_id = VALUES(parts_mfg_id), notes = VALUES(notes), in_stock = VALUES(in_stock),
        mail_id = VALUES(mail_id), append_date = VALUES(append_date), sent_rfq = NULL, from_website = NULL, from_lti = NULL, 
        file_id = VALUES(file_id), ad_price = VALUES(ad_price);");

        try
        {
            logger.DebugFormat("Inserted/Updated {0} parts.", mySql.ExecuteInsert(query.ToString()));
            havePartsToInsert = false;
        }
        catch
        {
            logger.Debug("problem inserting");
            havePartsToInsert = false;
            throw;
        }
    }
}

Each thread is a class and receives the list from the class creating and invoking the threads.

class ThreadInserter 
{
    ManualResetEvent doneEvent;
    PartsInventories partsInventories;
    public ThreadSafeInserter(List<ParsedInformation> allParts, ManualResetEvent doneEvent, string mailId, int usersCompanyId,
            string appendDate, bool toInsertPrices, bool hasStockInBody, string fileId)
    {
        this.doneEvent = doneEvent;
        partsInventories = new PartsInventories();
    }

    private void InsertAllParts(int threadIndex)
    {
        partsInventories.Init(usersCompanyId, mailId, appendDate, toInsertPrices, fileId);

        if (allParts != null)
        {
            foreach (ParsedInformation part in allParts)
            {
                // another thread has thrown an exception so aborting all threads
                // set from the main invoking the threadcallback 
                if (doneEvent.WaitOne(0)) 
                {
                    logger.Warn("Thread aborted.");
                    return; 
                }
                partsInventories.InsertPart(part);
            }
        }
        partsInventories.Insert(); 
        doneEvent.Set();
    }
}
4

0 回答 0