0

我有一个将消息记录到数据库(或其他地方)的 Windows 服务项目。这些消息的频率可能高达每秒十个。由于发送和处理消息不应该延迟服务的主进程,因此我启动了一个新线程来处理每条消息。这意味着如果主进程需要发送 100 条日志消息,则会启动 100 个线程来处理每条消息。我了解到,当一个线程完成后,它会被清理,所以我不必处理它。只要我在线程中处理所有使用过的对象,一切都应该可以正常工作。

服务可能会进入导致关闭服务的异常。在服务关闭之前,它应该等待所有正在记录消息的线程。为此,它会在每次启动线程时将线程添加到列表中。当调用 wait-for-threads 方法时,检查列表中的所有线程是否还活着,如果是,则使用 join 等待它。

编码:

创建线程:

/// <summary>
    /// Creates a new thread and sends the message
    /// </summary>
    /// <param name="logMessage"></param>
    private static void ThreadSend(IMessage logMessage)
    {
        ParameterizedThreadStart threadStart = new ParameterizedThreadStart(MessageHandler.HandleMessage);
        Thread messageThread = new Thread(threadStart);
        messageThread.Name = "LogMessageThread";            
        messageThread.Start(logMessage);
        threads.Add(messageThread);
    }

等待线程结束:

    /// <summary>
    /// Waits for threads that are still being processed
    /// </summary>
    public static void WaitForThreads()
    {
        int i = 0;
        foreach (Thread thread in threads)
        {
            i++;
            if (thread.IsAlive)
            {
                Debug.Print("waiting for {0} - {1} to end...", thread.Name, i);
                thread.Join();
            }
        }
    }

现在我主要担心的是,如果这项服务运行一个月,它仍然会在列表中包含所有线程(数百万)(其中大部分都死了)。这会吃掉内存,我不知道有多少。总的来说,这对我来说似乎不是一个好习惯,我想清理完成的线程,但我不知道该怎么做。有没有人对此有好的或最佳实践?

4

5 回答 5

2

如果线程已死,请从列表中删除它们?

/// <summary>
/// Waits for threads that are still being processed
/// </summary>
public static void WaitForThreads()
{
    List<Thread> toRemove = new List<int>();

    int i = 0;
    foreach (Thread thread in threads)
    {
        i++;
        if (thread.IsAlive)
        {
            Debug.Print("waiting for {0} - {1} to end...", thread.Name, i);
            thread.Join();
        }
        else
        {
            toRemove.Add(thread);
        }
    }
    threads.RemoveAll(x => toRemove.Contains(x));
}

看看任务并行

于 2013-09-23T08:16:03.783 回答
2

首先:为每个日志消息创建一个线程不是一个好主意。使用ThreadPool或创建有限数量的工作线程来处理来自公共队列(生产者/消费者)的日志项。

第二:当然,您还需要从列表中删除线程引用!当线程方法结束时,它可以删除自己,或者您甚至可以定期执行此操作。例如,让计时器每半小时运行一次,检查死线程列表并删除它们。

于 2013-09-23T08:24:55.233 回答
1

如果您在这些线程中所做的一切都是日志记录,那么您可能应该有一个日志记录线程和一个主线程将消息放入的共享队列。日志记录线程然后可以读取队列和日志。使用BlockingCollection非常容易。

在服务的主线程中创建队列:

BlockingCollection<IMessage> LogMessageQueue = new BlockingCollection<IMessage>();

您的服务的主线程创建一个Logger(见下文)实例,该实例启动一个线程来处理日志消息。主线程将项目添加到LogMessageQueue. 记录器线程从队列中读取它们。当主线程想要关闭时,它会调用LogMessageQueue.CompleteAdding. 记录器将清空队列并退出。

主线程看起来像这样:

// start the logger
Logger _loggingThread = new Logger(LogMessageQueue);

// to log a message:
LogMessageQueue.Add(logMessage);

// when the program needs to shut down:
LogMessageQueue.CompleteAdding();

和记录器类:

class Logger
{
    BlockingCollection<IMessage> _queue;
    Thread _loggingThread;

    public Logger(BlockingCollection<IMessage> queue)
    {
        _queue = queue;
        _loggingThread = new Thread(LoggingThreadProc);
    }

    private void LoggingThreadProc(object state)
    {
        IMessage msg;
        while (_queue.TryTake(out msg, TimeSpan.Infinite))
        {
            // log the item
        }
    }
}

这样你就只有一个额外的线程,保证消息按照发送的顺序被处理(不是你当前的方法),而且你不必担心跟踪线程关闭等。

更新

如果您的某些日志消息需要时间来处理(例如您描述的电子邮件),您可以异步处理它们。例如:

while (_queue.TryTake(out msg, TimeSpan.Infinite))
{
    if (msg.Type == Email)
    {
        // start asynchronous task to send email
    }
    else
    {
        // write to log file
    }
}

这样,只有那些可能需要大量时间的消息才会异步运行。如果需要,您还可以在此处为电子邮件设置辅助队列。这样您就不会被一堆电子邮件线程所困扰。相反,您将其限制为一两个,或者可能是少数。

请注意,Logger如果需要,您还可以拥有多个实例,所有实例都从同一个消息队列中读取。只要确保他们每个人都写入不同的日志文件。队列本身将支持多个消费者。

于 2013-09-23T16:22:06.553 回答
0

我认为总的来说,解决您的问题的方法可能不是最佳实践。我的意思是,您不想创建 1000 条线程,而只想在数据库中存储 1000 条消息,对吗?而且您似乎想异步执行此操作。

但是为每条消息创建一个线程并不是一个好主意,实际上并不能解决这个问题......

相反,我会尝试实现消息队列之类的东西。您可以有多个队列,每个队列都有自己的线程。如果有消息进来,您将它们发送到其中一个队列(交替)......

队列要么等待一定数量的消息,要么总是等待一定的时间(例如 1 秒,取决于在数据库中存储例如 100 条消息需要多长时间),直到它尝试将排队的消息存储在数据库中. 这样,您实际上应该始终拥有恒定数量的线程,并且您不应该看到任何性能问题......

此外,它还可以让您批量插入数据,而不仅仅是通过数据库连接等开销来一一插入...

当然,如果您的数据库速度较慢,则任务能够存储消息,越来越多的消息将被排队......但对于您当前的解决方案也是如此。

于 2013-09-23T08:30:54.920 回答
0

由于多个答案和评论导致了我的解决方案,我将在此处发布完整的代码。

我使用线程池来管理此页面中用于 wating 功能的线程和代码。

创建线程:

private static void ThreadSend(IMessage logMessage)
        {    
            ThreadPool.QueueUserWorkItem(MessageHandler.HandleMessage, logMessage);
        }

等待线程完成:

public static bool WaitForThreads(int maxWaitingTime)
        {
            int maxThreads = 0;
            int placeHolder = 0;
            int availableThreads = 0;

            while (maxWaitingTime > 0)
            {
                System.Threading.ThreadPool.GetMaxThreads(out maxThreads, out placeHolder);
                System.Threading.ThreadPool.GetAvailableThreads(out availableThreads, out placeHolder);

                //Stop if all threads are available
                if (availableThreads == maxThreads)
                {
                    return true;
                }

                System.Threading.Thread.Sleep(TimeSpan.FromMilliseconds(1000));
                --maxWaitingTime;            
            }

            return false;
        }

或者,您可以在这些方法之外的某个位置添加它以限制池中的线程数量。

System.Threading.ThreadPool.SetMaxThreads(MaxWorkerThreads, MaxCompletionPortThreads);
于 2013-09-25T13:08:43.773 回答