4

我希望你能忍受我。我想提供尽可能多的信息。主要问题是如何创建将由多个线程使用的结构(如堆栈),该结构将弹出一个值并使用它来处理一个大平面文件,并可能一次又一次地循环直到处理整个文件。如果一个文件有 100.000 条记录,可由 5 个线程使用 2.000 个行块处理,那么每个线程将获得 10 个块来处理。

我的目标是在平面文件中移动数据(带有 Header...Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Footer structure) 到具有简单恢复模式的 OLTP DB 中(可能是完整的)到 3 个表中:第一个表示 Subheader 行中存在的 Subheader 唯一键,第二个中间表 SubheaderGroup,表示 2000 条记录块中的详细信息行的分组(需要将 Subheader 的 Identity PK 作为其 FK,第三个表示 FK 指向 Subheader PK 的详细信息行。

我正在执行手动事务管理,因为我可以拥有数万个详细信息行,并且我在加载期间使用在目标表中设置为 0 的特殊字段,然后在文件处理结束时我正在执行事务更新更改此值为 1 可以指示其他应用程序加载完成。

我想将此平面文件切成多个相等的部分(相同的行数),可以使用多个线程处理并使用 SqlBulkCopy 使用 IDataReader 导入,该 IDataReader 是从目标表元数据创建的)。

我想使用生产者/消费者模式(如下面的链接中所述 - pdf 分析和代码示例)将 SqlBulkCopy 与 SqlBulkCopyOptions.TableLock 选项一起使用。 http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx 这种模式可以创建多个生产者,并且等量的消费者需要订阅生产者才能消费该行。

在TestSqlBulkCopy 项目中,DataProducer.cs 文件中有一个方法可以模拟生产数千条记录。

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

此方法将在新线程的上下文中执行。我希望这个新线程只读取原始平面文件的唯一块,另一个线程将开始处理下一个块。然后,消费者将使用 SqlBulkCopy ADO.NET 类将数据(泵送给他们)移动到 SQL Server DB。

所以这里的问题是关于主程序规定每个线程应该处理什么 lineFrom 到 lineTo ,我认为这应该在线程创建期间发生。第二种解决方案可能是让线程共享一些结构并使用它们独有的东西(如线程号或序列号)来查找共享结构(可能是堆栈并弹出一个值(在执行此操作时锁定堆栈),然后下一个线程将然后选取下一个值。主程序将选取平面文件并确定块的大小并创建堆栈。

那么有人可以提供一些代码片段,关于多个线程如何处理一个文件并且只获得该文件的唯一部分的伪代码吗?

谢谢,拉德

4

1 回答 1

3

对我来说效果很好的是使用队列来保存未处理的工作和字典来跟踪正在进行的工作:

  1. 创建一个工作类,该类接受文件名、起始行和行数,并具有执行数据库插入的更新方法。传递一个回调方法,worker 用来在完成时发出信号。
  2. 加载一个带有工作类实例的队列,每个块一个。
  3. 生成一个调度程序线程,该线程将工作实例出列,启动其更新方法,并将工作实例添加到字典中,由其线程的 ManagedThreadId 键入。执行此操作,直到达到最大允许线程数,如 Dictionary.Count 所述。调度程序等待一个线程完成,然后启动另一个。有几种方法让它等待。
  4. 当每个线程完成时,它的回调会从 Dictionary 中删除它的 ManagedThreadId。如果线程由于错误(例如连接超时)而退出,则回调可以将工作线程重新插入队列。这是更新 UI 的好地方。
  5. 您的 UI 可以显示活动线程、总进度和每个块的时间。它可以让用户调整活动线程的数量、暂停处理、显示错误或提前停止。
  6. 当队列和字典为空时,您就完成了。

作为控制台应用程序的演示代码:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
于 2010-01-14T19:46:47.423 回答