3

我有一类范围

public class avl_range
{
    public long start { get; set; }
    public long end { get; set; }
}

如果我使用正常的FOR作品完美,但必须等待每个命令完成并且每个查询需要 8 秒,所以 10 个查询需要 80 秒。

在并行版本中,如果我只打印范围完美,但如果尝试执行命令说已经在进行中。

{“操作已在进行中。”}

我该如何解决这个问题?

var numbers = new List<avl_range>();
using (var conn = new NpgsqlConnection(strConnection))
    {
        conn.Open();

        Action<avl_range> forEachLoop = number => //Begin definition of forLoop
        {
             // only the console write line works ok
            Console.WriteLine(number.start + " - " + number.end);

            using (var cmd = new NpgsqlCommand())
            {
                cmd.Connection = conn;                            
                cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
                                                 , number.start
                                                 , number.end);
                // here cause the error.
                using (var reader = cmd.ExecuteReader())
                {
                    while (reader.Read())
                    {
                        Console.WriteLine(reader.GetString(0));
                    }
                }
            }
        };

        Parallel.ForEach(numbers, forEachLoop);
    }
 );

仅供参考:我试图解决这个问题我之前发布

4

2 回答 2

4

不能同时使用 Npgsql 连接 - 在任何给定时间点只能运行一个命令(换句话说,不支持 MARS)。

打开多个连接以并行执行查询肯定是有意义的。尽管建立一个新的物理连接很昂贵,但连接池非常轻量级,因此重用物理连接的开销很小。不这样做的主要原因是如果您需要多个操作在同一个事务中。

于 2017-01-27T20:33:44.360 回答
3

即使你可以让它与 MARS 一起工作,连接对象也几乎从来都不是线程安全的,你需要每个线程都有一个连接。Parallel.ForEach 有重载使这变得容易,它具有在线程开始和结束时运行的函数。

var numbers = new List<avl_range>();

Func<NpgsqlConnection> localInit => () => 
{
    var conn = new NpgsqlConnection(strConnection);
    conn.Open();
};

Action<NpgsqlConnection> localFinally = (conn) => conn.Dispose();

Func<avl_range, ParallelLoopState, NpgsqlConnection, NpgsqlConnection> forEachLoop = (number, loopState, conn) => //Begin definition of forLoop
{
     // only the console write line works ok
    Console.WriteLine(number.start + " - " + number.end);

    using (var cmd = new NpgsqlCommand())
    {
        cmd.Connection = conn;                            
        cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
                                         , number.start
                                         , number.end);
        // here cause the error.
        using (var reader = cmd.ExecuteReader())
        {
            while (reader.Read())
            {
                Console.WriteLine(reader.GetString(0));
            }
        }
    }
    return conn;
};

Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);

话虽如此,在大多数情况下,与数据库进行并发连接并不是正确的想法,瓶颈可能在其他地方,您应该使用分析器来查看真正减慢程序速度的原因并将您的精力集中在那里。


注释示例代码:

var numbers = GetDataForNumbers();
List<string> results = new List<string>();

Func<List<string>> localInit => () => new List<string>();

Func<avl_range, ParallelLoopState, List<string>, List<string>> forEachLoop = (number, loopState, localList) => //Begin definition of forLoop
{
    using (var conn = new NpgsqlConnection(strConnection))
    {
        conn.Open();

        //This line is going to slow your program down a lot, so i commented it out.
        //Console.WriteLine(number.start + " - " + number.end);

        using (var cmd = new NpgsqlCommand())
        {
            cmd.Connection = conn;                            
            cmd.CommandText = String.Format( "SELECT * FROM avl_db.process_near_link({0}, {1});"
                                             , number.start
                                             , number.end);
            using (var reader = cmd.ExecuteReader())
            {
                while (reader.Read())
                {
                    //Add a object to the thread local list, we don't need to lock here because we are the only thread with access to it.
                    localList.Add(reader.GetString(0));
                }
            }
        }
    }
    return localList;
};

Action<List<String>> localFinally = localList => 
{
    //Combine the local list to the main results, we need to lock here as more than one thread could be merging at once.
    lock(results)
    {
        results.AddRange(localList);
    }
};

Parallel.ForEach(numbers, localInit, forEachLoop, localFinally);

//results now contains strings from all the threads here.
于 2017-01-27T20:35:18.220 回答