0

我已经有了一些想法,但如果可能的话,我想听听大家的一些不同意见和替代方案。

我有一个 Windows 控制台应用程序,它使用 Exchange Web 服务连接到 Exchange 并下载电子邮件。目标是获取每个单独的消息对象、提取元数据、解析附件等。应用程序每 60 秒检查一次收件箱。我连接到收件箱和获取消息对象没有问题。这一切都很好。

这是我接受您输入的地方: 当我收到一个消息对象时,我立即想要处理该消息并完成上面解释的所有忙碌工作。我正在考虑几种不同的方法:

  • 将电子邮件对象排列在一个表中并逐个处理它们。
  • 将电子邮件对象传递给本地 Windows 服务以完成繁忙的工作。

我认为数据库排队不是一个好方法,因为有时需要处理多个电子邮件对象。如果在处理具有 5 个附件的高优先级电子邮件之前处理具有 30 个附件的低优先级电子邮件,这是不公平的。换句话说,堆栈中较低的电子邮件不应该需要排队等待处理。这就像在商店里排队等候,只有一个寄存器让你面前的傻瓜扫描 100 件商品。这不公平。我的电子邮件对象的概念相同。

我对 Windows 服务方法有些不确定。但是,我非常有信心我可以安装一个服务来监听,等待处理新电子邮件的指令。如果我有 5 个单独的电子邮件对象,我可以对 Windows 服务和进程进行 5 次单独调用而不会发生冲突吗?

我愿意接受建议或替代方法。但是,解决方案必须使用 .NET 技术堆栈来呈现。

4

2 回答 2

1

一种选择是在控制台应用程序中进行处理。你所拥有的看起来像是一个标准的生产者-消费者问题,一个生产者(获取电子邮件的线程)和多个消费者。这很容易用BlockingCollection处理。

我假设您的消息类型(您从邮件服务器获得的内容)称为MailMessage.

因此,您创建了一个BlockingCollection<MailMessage>at 类范围。我还将假设您有一个每 60 秒计时一次的计时器来收集消息并将它们排入队列:

private BlockingCollection<MailMessage> MailMessageQueue =
    new BlockingCollection<MailMessage>();

// Timer is created as a one-shot and re-initialized at each tick.
// This prevents the timer proc from being re-entered if it takes
// longer than 60 seconds to run.
System.Threading.Timer ProducerTimer = new System.Threading.Timer(
    TimerProc, null, TimeSpan.FromSeconds(60), TimeSpan.FromMilliseconds(-1));


void TimerProc(object state)
{
    var newMessages = GetMessagesFromServer();
    foreach (var msg in newMessages)
    {
        MailMessageQueue.Add(msg);
    }
    ProducerTimer.Change(TimeSpan.FromSeconds(60), TimeSpan.FromMilliseconds(-1));
}

您的消费者线程只是读取队列:

void MessageProcessor()
{
    foreach (var msg in MailMessageQueue.GetConsumingEnumerable())
    {
        ProcessMessage();
    }
}

计时器将使生产者每分钟运行一次。要启动消费者(假设您想要其中两个):

var t1 = Task.Factory.StartNew(MessageProcessor, TaskCreationOptions.LongRunning);
var t2 = Task.Factory.StartNew(MessageProcessor, TaskCreationOptions.LongRunning);

因此,您将有两个线程处理消息。

拥有比可用 CPU 内核更多的处理线程是没有意义的。生产者线程可能不需要大量 CPU 资源,因此您不必为它专门分配一个线程。每当它在做它的事情时,它只会短暂地减慢消息处理。

我在上面的描述中跳过了一些细节,特别是线程的取消。当您想停止程序,但让消费者完成处理消息时,只需终止生产者计时器并将队列设置为完成添加:

MailMessageQueue.CompleteAdding();

消费者将清空队列并退出。您当然希望等待任务完成(请参阅 参考资料Task.Wait)。

如果您希望能够在不清空队列的情况下杀死消费者,则需要查看Cancellation

的默认后备存储BlockingCollection是 a ConcurrentQueue,这是一个严格的 FIFO。如果您想对事物进行优先级排序,则需要提出一个实现IProducerConsumerCollection接口的并发优先级队列。.NET 没有这样的东西(甚至没有优先级队列类),但是在您的情况下,使用锁来防止并发访问的简单二进制堆就足够了;你不是在说很难打这个东西。

当然,您需要某种方式来确定消息的优先级。可能按附件数量排序,以便更快地处理没有附件的邮件。另一种选择是有两个单独的队列:一个用于带有 0 或 1 个附件的消息,另一个用于带有大量附件的消息。您可以让您的一个消费者专用于 0 或 1 队列,这样简单的消息总是有很好的机会首先被处理,其他消费者从 0 或 1 队列中取出,除非它是空的,然后从另一个队列中取出. 它会让你的消费者变得更复杂一点,但不会那么复杂。

如果您选择将消息处理移动到单独的程序中,则需要某种方式将数据从生产者持久保存到消费者。有很多可能的方法可以做到这一点,但我只是没有看到它的优势。

于 2013-11-06T21:37:28.930 回答
0

我在这里有点新手,但似乎最初的方法可能是有一个单独的高优先级队列。每当一个工作人员可以获取一条新消息时,它可以执行以下操作:

If DateTime.Now - lowPriorityQueue.Peek.AddedTime < maxWaitTime Then
    ProcessMessage(lowPriorityQueue.Dequeue())
Else If highPriorityQueue.Count > 0 Then
    ProcessMessage(highPriorityQueue.Dequeue())
Else
    ProcessMessage(lowPriorityQueue.Dequeue())
End If

在单个线程中,虽然您仍然可以让一条消息阻塞其他消息,但可以更快地处理更高优先级的消息。

根据大多数消息的处理速度,如果队列变得太大或太旧,应用程序可以在新线程上创建新的工作程序。

请告诉我我是否完全不在基地。

于 2013-11-06T20:08:53.570 回答