2

在做了一些研究之后,我求助于有关如何有效地从并发集合中删除两个项目的任何反馈。我的情况涉及通过 UDP 传入的消息,这些消息当前被放入 BlockingCollection。一旦集合中有两个用户,我需要安全地取两个用户并处理它们。我见过几种不同的技术,包括下面列出的一些想法。我当前的实现如下,但我认为有一种更简洁的方法可以做到这一点,同时确保用户以两个为一组进行处理。这是这种情况下的唯一限制。

当前实施:

    private int userQueueCount = 0;
    public BlockingCollection<User> UserQueue = new BlockingCollection<User>();

    public void JoinQueue(User u)
    {
           UserQueue.Add(u);
           Interlocked.Increment(ref userQueueCount);

           if (userQueueCount > 1)
           {
               IEnumerable<User> users = UserQueue.Take(2);
               if(users.Count==2) {
                 Interlocked.Decrement(ref userQueueCount);
                 Interlocked.Decrement(ref userQueueCount);
                 ... do some work with users but if only one 
                 is removed I'll run into problems
               }

           }
    }

我想做的是这样的事情,但我目前无法在生产环境中测试它以确保完整性。

 Parallel.ForEach(UserQueue.Take(2), (u) => { ... });

或者更好:

    public void JoinQueue(User u)
    {
           UserQueue.Add(u);
           // if needed? increment
           Interlocked.Increment(ref userQueueCount);
           UserQueue.CompleteAdding();
    }

然后在某处实现:

        Task.Factory.StartNew(() =>
        {
            while (userQueueCount > 1) OR (UserQueue.Count > 1) If it's safe?
            {
                IEnumerable<User> users = UserQueue.Take(2);
                ... do stuff
            }

        });

问题是我不确定我能否保证在条件(Count > 1)和 Take(2) 之间,我确保 UserQueue 至少有两个项目要处理?传入的 UDP 消息是并行处理的,因此我需要一种方法来安全地将项目从阻塞/并发集合中以两个成对的方式拉出。

有没有更好/更安全的方法来做到这一点?

修改后的评论: 这个问题的目的实际上只是为了实现一种稳定/线程安全的方法来处理 .Net 4.0 中并发集合中的项目。它不必很漂亮,它只需要在并行环境中以无序的二对二的形式处理项目的任务中保持稳定。

4

4 回答 4

2

这是我在粗略代码中所做的:

ConcurrentQueuequeue = new ConcurrentQueue(); //can use a BlockingCollection too (as it's just a blocking ConcurrentQueue by default anyway)

public void OnUserStartedGame(User joiningUser)
{
   User waitingUser;
   if (this.gameQueue.TryDequeue(out waitingUser)) //if there's someone waiting, we'll get him
      this.MatchUsers(waitingUser, joiningUser);
   else
      this.QueueUser(joiningUser); //it doesn't matter if there's already someone in the queue by now because, well, we are using a queue and it will sort itself out.
}

private void QueueUser(User user)
{
   this.gameQueue.Enqueue(user);
}

private void MatchUsers(User first, User second)
{
   //not sure what you do here
}

基本思想是,如果有人想要开始游戏并且您的队列中有人,您匹配他们并开始游戏 - 如果没有人,将他们添加到队列中。充其量你一次只能有一个用户在队列中,但如果没有,那也不错,因为当其他用户开始游戏时,等待的用户将逐渐删除,并且在队列为空之前不会添加新用户再次。

于 2012-06-04T20:58:20.737 回答
1

如果由于某种原因我无法将成对的用户放入集合中,我会使用 ConcurrentQueue 并尝试一次 TryDequeue 2 个项目,如果我只能得到一个 - 把它放回去。根据需要等待。

于 2012-06-04T20:22:20.197 回答
1

我认为这里最简单的解决方案是使用锁定:您将为所有消费者拥有一把锁(生产者不会使用任何锁),这将确保您始终以正确的顺序获取用户:

User firstUser;
User secondUser;

lock (consumerLock)
{
    firstUser = userQueue.Take();
    secondUser = userQueue.Take();
}

Process(firstUser, secondUser);

另一种选择是有两个队列:一个用于单个用户,一个用于成对用户,并有一个将它们从第一个队列转移到第二个队列的过程。

如果你不介意浪费另一个线程,你可以用两个BlockingCollections 做到这一点:

while (true)
{
    var firstUser = incomingUsers.Take();
    var secondUser = incomingUsers.Take();

    userPairs.Add(Tuple.Create(firstUser, secondUser));
}

您不必担心这里的锁定,因为单个用户的队列将只有一个消费者,而对的消费者现在可以Take()安全地使用 simple。

如果您确实关心浪费线程并且可以使用 TPL 数据流,则可以使用BatchBlock<T>,它将传入的项目组合成n 个项目的批次,其中n在创建块时配置,因此您可以将其设置为 2。

于 2012-06-05T09:27:55.517 回答
0

愿这能有所帮助

public static IList<T> TakeMulti<T>(this BlockingCollection<T> me, int count = 100) where T : class
{
    T last = null;
    if (me.Count == 0)
    {
        last = me.Take(); // blocking when queue is empty
    }

    var result = new List<T>(count);

    if (last != null)
    {
        result.Add(last);
    }

    //if you want to take more item on this time.
    //if (me.Count < count / 2)
    //{
    //    Thread.Sleep(1000);
    //}

    while (me.Count > 0 && result.Count <= count)
    {
        result.Add(me.Take());
    }

    return result;
}
于 2015-08-17T06:39:31.500 回答