一种选择是在控制台应用程序中进行处理。你所拥有的看起来像是一个标准的生产者-消费者问题,一个生产者(获取电子邮件的线程)和多个消费者。这很容易用BlockingCollection处理。
我假设您的消息类型(您从邮件服务器获得的内容)称为MailMessage
.
因此,您创建了一个BlockingCollection<MailMessage>
at 类范围。我还将假设您有一个每 60 秒计时一次的计时器来收集消息并将它们排入队列:
private BlockingCollection<MailMessage> MailMessageQueue =
new BlockingCollection<MailMessage>();
// Timer is created as a one-shot and re-initialized at each tick.
// This prevents the timer proc from being re-entered if it takes
// longer than 60 seconds to run.
System.Threading.Timer ProducerTimer = new System.Threading.Timer(
TimerProc, null, TimeSpan.FromSeconds(60), TimeSpan.FromMilliseconds(-1));
void TimerProc(object state)
{
var newMessages = GetMessagesFromServer();
foreach (var msg in newMessages)
{
MailMessageQueue.Add(msg);
}
ProducerTimer.Change(TimeSpan.FromSeconds(60), TimeSpan.FromMilliseconds(-1));
}
您的消费者线程只是读取队列:
void MessageProcessor()
{
foreach (var msg in MailMessageQueue.GetConsumingEnumerable())
{
ProcessMessage();
}
}
计时器将使生产者每分钟运行一次。要启动消费者(假设您想要其中两个):
var t1 = Task.Factory.StartNew(MessageProcessor, TaskCreationOptions.LongRunning);
var t2 = Task.Factory.StartNew(MessageProcessor, TaskCreationOptions.LongRunning);
因此,您将有两个线程处理消息。
拥有比可用 CPU 内核更多的处理线程是没有意义的。生产者线程可能不需要大量 CPU 资源,因此您不必为它专门分配一个线程。每当它在做它的事情时,它只会短暂地减慢消息处理。
我在上面的描述中跳过了一些细节,特别是线程的取消。当您想停止程序,但让消费者完成处理消息时,只需终止生产者计时器并将队列设置为完成添加:
MailMessageQueue.CompleteAdding();
消费者将清空队列并退出。您当然希望等待任务完成(请参阅 参考资料Task.Wait
)。
如果您希望能够在不清空队列的情况下杀死消费者,则需要查看Cancellation。
的默认后备存储BlockingCollection
是 a ConcurrentQueue
,这是一个严格的 FIFO。如果您想对事物进行优先级排序,则需要提出一个实现IProducerConsumerCollection接口的并发优先级队列。.NET 没有这样的东西(甚至没有优先级队列类),但是在您的情况下,使用锁来防止并发访问的简单二进制堆就足够了;你不是在说很难打这个东西。
当然,您需要某种方式来确定消息的优先级。可能按附件数量排序,以便更快地处理没有附件的邮件。另一种选择是有两个单独的队列:一个用于带有 0 或 1 个附件的消息,另一个用于带有大量附件的消息。您可以让您的一个消费者专用于 0 或 1 队列,这样简单的消息总是有很好的机会首先被处理,其他消费者从 0 或 1 队列中取出,除非它是空的,然后从另一个队列中取出. 它会让你的消费者变得更复杂一点,但不会那么复杂。
如果您选择将消息处理移动到单独的程序中,则需要某种方式将数据从生产者持久保存到消费者。有很多可能的方法可以做到这一点,但我只是没有看到它的优势。