首先,有 1500 多个“声誉”的人可以为“ContinueWith”创建一个标签(并用它标记这个问题)吗?谢谢!
很抱歉这篇文章的长度,但我不想浪费任何人试图帮助我的时间,因为我遗漏了相关细节。也就是说,它仍然可能发生。:)
现在是细节。我正在开发一个订阅几个 ActiveMQ 队列主题的服务。其中两个主题有些相关。一种是“公司更新”,一种是“产品更新”。两者的“ID”都是 CompanyID。公司主题包括产品主题中的数据。必需,因为其他订阅者需要产品数据但不想/不需要订阅产品主题。由于我的服务是多线程的(超出我们判断的要求),当消息到达时,我添加了一个任务以使用 AddOrUpdate处理ConcurrentDictionary中的每一个,其中更新参数只是一个ContinueWith(见下文)。这样做是为了防止可能发生的同时更新,因为这些主题和订阅者是“持久的”,因此如果我的侦听器服务离线(无论出于何种原因),我们可能会以同一 CompanyID 的多条消息(公司和/或产品)结束。
现在,我的实际问题(终于!)在任务(无论是一个任务,还是 ConcurrentWith 任务链中的最后一个)完成后,我想将它从 ConcurrentDictionary 中删除(显然)。如何?我已经想到并从同事那里得到了一些想法,但我并不真正喜欢其中任何一个。我不会列出这些想法,因为您的答案可能是我有但不喜欢的想法之一,但它最终可能是最好的。
与我的描述不同,我试图压缩代码片段以防止您不得不上下滚动太多。:)
nrtq = 与问题无关
public interface IMessage
{
long CompantId { get; set; }
void Process();
}
public class CompanyMessage : IMessage
{ //implementation, nrtq }
public class ProductMessage : IMessage
{ //implementation, nrtq }
public class Controller
{
private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>();
//other needed declarations, nrtq
public Controller(){//constructor stuff, nrtq }
public StartSubscribers()
{
//other code, nrtq
_companySubscriber.OnMessageReceived += HandleCompanyMsg;
_productSubscriber.OnMessageReceived += HandleProductMsg;
}
private void HandleCompanyMsg(string msg)
{
try {
//other code, nrtq
QueueItUp(new CompanyMessage(message));
} catch (Exception ex) { //other code, nrtq }
}
private void HandleProductMsg(string msg)
{
try {
//other code, nrtq
QueueItUp(new ProductMessage(message));
} catch (Exception ex) { //other code, nrtq }
}
private static void QueueItUp(IMessage message)
{
_workers.AddOrUpdate(message.CompanyId,
x => {
var task = new Task(message.Process);
task.Start();
return task;
},
(x, y) => y.ContinueWith((z) => message.Process())
);
}
谢谢!