20

我试图弄清楚使用队列的最佳方式是什么。我有一个返回 DataTable 的进程。每个 DataTable 依次与前一个 DataTable 合并。有一个问题,在最终 BulkCopy (OutOfMemory) 之前保存的记录太多。

所以,我决定我应该立即处理每个传入的 DataTable。考虑ConcurrentQueue<T>...但我不知道该WriteQueuedData()方法如何知道将表出列并将其写入数据库。

例如:

public class TableTransporter
{
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();

    public TableTransporter()
    {
        tableQueue.OnItemQueued += new EventHandler(WriteQueuedData);   // no events available
    }

    public void ExtractData()
    {
        DataTable table;

        // perform data extraction
        tableQueue.Enqueue(table);
    }

    private void WriteQueuedData(object sender, EventArgs e)
    {
        BulkCopy(e.Table);
    }
}

我的第一个问题是,除了我实际上没有要订阅的任何事件之外,如果我ExtractData()异步调用这将是我所需要的全部吗?其次,我是否遗漏了一些关于ConcurrentQueue<T>函数的方式以及需要某种形式的触发器来与排队的对象异步工作?

更新 我刚刚派生了一个ConcurrentQueue<T>具有 OnItemQueued 事件处理程序的类。然后:

new public void Enqueue (DataTable Table)
{
    base.Enqueue(Table);
    OnTableQueued(new TableQueuedEventArgs(Table));
}

public void OnTableQueued(TableQueuedEventArgs table)
{
    EventHandler<TableQueuedEventArgs> handler = TableQueued;

    if (handler != null)
    {
        handler(this, table);
    }
}

对此实施有任何担忧吗?

4

3 回答 3

26

根据我对问题的理解,您缺少一些东西。

并发队列是一种数据结构,旨在接受多个线程读取和写入队列,而无需显式锁定数据结构。(所有爵士乐都在幕后处理,或者集合以不需要锁定的方式实现。)

考虑到这一点,您尝试使用的模式似乎是“生产者/消费者”。首先,您有一些任务产生工作(并将项目添加到队列中)。其次,您还有第二个任务消耗队列中的东西(并从队列中取出项目)。

所以你真的想要两个线程:一个添加项目,第二个删除项目。因为您使用的是并发集合,所以可以有多个线程添加项目和多个线程删除项目。但显然,并发队列上的争用越多,成为瓶颈的速度就越快。

于 2010-12-29T02:49:30.607 回答
23

我认为ConcurrentQueue仅在极少数情况下有用。它的主要优点是它是无锁的。然而,通常生产者线程必须以某种方式通知消费者线程有数据可供处理。线程之间的这种信号需要锁,并否定了使用ConcurrentQueue. 同步线程的最快方法是使用Monitor.Pulse(),它只在锁内有效。所有其他同步工具甚至更慢。

当然,消费者可以不断地检查队列中是否有东西,这样可以不加锁地工作,但是对处理器资源的浪费是巨大的。如果消费者在检查之间等待,那就更好了。

在写入队列时引发线程是一个非常糟糕的主意。使用ConcurrentQueueto save 可能会完全浪费执行 1 微秒eventhandler,这可能需要 1000 倍的时间。

如果所有处理都在事件处理程序或异步调用中完成,那么问题是为什么还需要队列?最好将数据直接传递给处理程序,并且根本不使用队列。

请注意,ConcurrentQueue允许并发的实现相当复杂。在大多数情况下,最好使用普通Queue<>并锁定对队列的每次访问。由于队列访问只需要微秒,因此两个线程在同一微秒内访问队列的可能性极小,并且几乎不会因为锁定而出现任何延迟。使用Queue<>带锁定的法线通常会导致比ConcurrentQueue.

于 2014-07-18T08:22:26.480 回答
3

这是我想出的完整解决方案:

public class TableTransporter
{
    private static int _indexer;

    private CustomQueue tableQueue = new CustomQueue();
    private Func<DataTable, String> RunPostProcess;
    private string filename;

    public TableTransporter()
    {
        RunPostProcess = new Func<DataTable, String>(SerializeTable);
        tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued);
    }

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e)
    {
        //  do something with table
        //  I can't figure out is how to pass custom object in 3rd parameter
        RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename);
    }

    public void ExtractData()
    {
        // perform data extraction
        tableQueue.Enqueue(MakeTable());
        Console.WriteLine("Table count [{0}]", tableQueue.Count);
    }

    private DataTable MakeTable()
    { return new DataTable(String.Format("Table{0}", _indexer++)); }

    private string SerializeTable(DataTable Table)
    {
        string file = Table.TableName + ".xml";

        DataSet dataSet = new DataSet(Table.TableName);

        dataSet.Tables.Add(Table);

        Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file);
        string xmlstream = String.Empty;

        using (MemoryStream memstream = new MemoryStream())
        {
            XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet));
            XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8);

            xmlSerializer.Serialize(xmlWriter, dataSet);
            xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray());

            using (var fileStream = new FileStream(file, FileMode.Create))
                fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2);
        }
        filename = file;

        return file;
    }

    private void PostComplete(IAsyncResult iasResult)
    {
        string file = (string)iasResult.AsyncState;
        Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file);

        RunPostProcess.EndInvoke(iasResult);
    }

    public static String UTF8ByteArrayToString(Byte[] ArrBytes)
    { return new UTF8Encoding().GetString(ArrBytes); }

    public static Byte[] StringToUTF8ByteArray(String XmlString)
    { return new UTF8Encoding().GetBytes(XmlString); }
}

public sealed class CustomQueue : ConcurrentQueue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;

    public CustomQueue()
    { }
    public CustomQueue(IEnumerable<DataTable> TableCollection)
        : base(TableCollection)
    { }

    new public void Enqueue (DataTable Table)
    {
        base.Enqueue(Table);
        OnTableQueued(new TableQueuedEventArgs(Table));
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

public class TableQueuedEventArgs : EventArgs
{
    #region Fields
    #endregion

    #region Init
    public TableQueuedEventArgs(DataTable Table)
    {this.Table = Table;}
    #endregion

    #region Functions
    #endregion

    #region Properties
    public DataTable Table
    {get;set;}
    #endregion
}

作为概念证明,它似乎工作得很好。最多我看到 4 个工作线程。

于 2010-12-29T06:03:15.500 回答