0

首先,有 1500 多个“声誉”的人可以为“ContinueWith”创建一个标签(并用它标记这个问题)吗?谢谢!

很抱歉这篇文章的长度,但我不想浪费任何人试图帮助我的时间,因为我遗漏了相关细节。也就是说,它仍然可能发生。:)

现在是细节。我正在开发一个订阅几个 ActiveMQ 队列主题的服务。其中两个主题有些相关。一种是“公司更新”,一种是“产品更新”。两者的“ID”都是 CompanyID。公司主题包括产品主题中的数据。必需,因为其他订阅者需要产品数据但不想/不需要订阅产品主题。由于我的服务是多线程的(超出我们判断的要求),当消息到达时,我添加了一个任务以使用 AddOrUpdate处理ConcurrentDictionary中的每一个,其中更新参数只是一个ContinueWith(见下文)。这样做是为了防止可能发生的同时更新,因为这些主题和订阅者是“持久的”,因此如果我的侦听器服务离线(无论出于何种原因),我们可能会以同一 CompanyID 的多条消息(公司和/或产品)结束。

现在,我的实际问题(终于!)在任务(无论是一个任务,还是 ConcurrentWith 任务链中的最后一个)完成后,我想将它从 ConcurrentDictionary 中删除(显然)。如何?我已经想到并从同事那里得到了一些想法,但我并不真正喜欢其中任何一个。我不会列出这些想法,因为您的答案可能是我有但不喜欢的想法之一,但它最终可能是最好的。

与我的描述不同,我试图压缩代码片段以防止您不得不上下滚动太多。:)

nrtq = 与问题无关

public interface IMessage
{
  long CompantId { get; set; }
  void Process();
}
public class CompanyMessage : IMessage
{ //implementation, nrtq }
public class ProductMessage : IMessage
{ //implementation, nrtq }

public class Controller
{
  private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>();
  //other needed declarations, nrtq

  public Controller(){//constructor stuff, nrtq }

  public StartSubscribers()
  {
    //other code, nrtq
    _companySubscriber.OnMessageReceived += HandleCompanyMsg;
    _productSubscriber.OnMessageReceived += HandleProductMsg;
  }

  private void HandleCompanyMsg(string msg)
  {
    try {
      //other code, nrtq
      QueueItUp(new CompanyMessage(message));
    } catch (Exception ex) { //other code, nrtq }
  }

  private void HandleProductMsg(string msg)
  {
    try {
      //other code, nrtq
      QueueItUp(new ProductMessage(message));
    } catch (Exception ex) { //other code, nrtq }
  }

  private static void QueueItUp(IMessage message)
  {
    _workers.AddOrUpdate(message.CompanyId,
      x => {
        var task = new Task(message.Process);
        task.Start();
        return task;
      },
      (x, y) => y.ContinueWith((z) => message.Process())
    );
  }

谢谢!

4

1 回答 1

0

我暂时不会“接受”这个答案,因为我很想看看其他人是否能提出更好的解决方案。

一位同事想出了一个解决方案,我稍微调整了一下。是的,我知道将lock语句与ConcurrentDictionary. 我现在真的没有时间看看是否有更好的集合类型可以使用。基本上,我们不只是ContinueWith()对现有任务执行 a ,而是将任务替换为自身加上另一个任务,最后使用ContinueWith().

这有什么区别?很高兴你问!:) 如果我们刚刚做了 aContinueWith()那么一旦链中的第一个任务完成!worker.Value.IsCompleted就会返回。true但是,通过将任务替换为两个(或更多)链式任务,那么就集合而言,只有一个任务并且在链中的所有任务完成之前!worker.Value.IsCompleted不会返回。true

我承认我有点担心用它自己+(新任务)替换任务,因为如果任务在被替换时恰好正在运行怎么办。好吧,我对此进行了测试,并没有遇到任何问题。我相信正在发生的事情是,由于任务在自己的线程中运行并且集合只是持有指向它的指针,因此正在运行的任务不受影响。通过将其替换为自身+(新任务),我们维护了指向执行线程的指针,并在它完成时获得“通知”,以便下一个任务可以“继续”或IsCompleted返回true

此外,“清理”循环的工作方式以及它的位置,意味着我们将“完成”任务在集合中徘徊,但直到下一次“清理”运行,也就是下一次收到消息。同样,我做了很多测试,看看是否会因此导致内存问题,但我的服务从未使用超过 20 MB 的 RAM,即使每秒处理数百条消息也是如此。我们将不得不收到一些相当大的消息并且有很多长时间运行的任务才能导致问题,但请记住这一点,因为您的情况可能会有所不同。

如上,在下面的代码中,nrtq = 与问题无关。

public interface IMessage
{
  long CompantId { get; set; }
  void Process();
}
public class CompanyMessage : IMessage
{ //implementation, nrtq }
public class ProductMessage : IMessage
{ //implementation, nrtq }

public class Controller
{
  private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>();
  //other needed declarations, nrtq

  public Controller(){//constructor stuff, nrtq }

  public StartSubscribers()
  {
    //other code, nrtq
    _companySubscriber.OnMessageReceived += HandleCompanyMsg;
    _productSubscriber.OnMessageReceived += HandleProductMsg;
  }

  private void HandleCompanyMsg(string msg)
  {
    //other code, nrtq
    QueueItUp(new CompanyMessage(message));
  }

  private void HandleProductMsg(string msg)
  {
    //other code, nrtq
    QueueItUp(new ProductMessage(message));
  }

  private static void QueueItUp(IMessage message)
  {
    lock(_workers)
    {
      foreach (var worker in Workers)
      {
        if (!worker.Value.IsCompleted) continue;
        Task task;
        Workers.TryRemove(worker.Key, out task);
      }
      var id = message.CompanyId;
      if (_workers.ContainsKey(id))
        _workers[id] = _workers[id].ContinueWith(x => message.Process());
      else
      {
        var task = new Task(y => message.Process(), id);
        _workers.TryAdd(id, task);
        task.Start();
      }
    }
  }
于 2012-06-19T19:34:32.123 回答