0

我目前正在编写一个 IRC 机器人。我想避免过多的洪水,所以我决定创建一个消息队列,每隔 X 毫秒发送下一条消息,但我的尝试失败了。第 43 行:

unset.Add((string)de.Key);

引发 OutOfMemory 异常。我完全不知道我做错了什么。

也许我还应该解释这种(可能很复杂)排队方式背后的一般想法。

首先,主要Hashtable queueht存储ConcurrentQueue<string>类型,其中消息的目标用作键。我希望机器人遍历哈希表,从每个队列发送一条消息(如果队列被清空,则删除密钥)。我想不出一个合适的方法来处理哈希表本身,所以我决定创建另一个队列,ConcurrentQueue<string> queue它会在清空队列时存储键和它们的使用顺序。

假设一个队列中有数百个项目的假设情况(这是可能的),任何新请求都会被天知道延迟多长时间(消息之间的内置延迟加上延迟),所以我有方法 Add() rebuild queue。我创建了一个深层副本queueht(或者我希望如此)并queue基于这个一次性副本生成一个新副本,并在此过程中摆脱它。

我认为我的思路和/或代码是非常错误的,因为我几乎没有线程经验,集合比简单数组和 OOP 习惯/约定更复杂。我非常感谢通过解释解决我的问题。提前致谢!

编辑:发布整个班级。

class SendQueue
{
    Hashtable queueht;
    ConcurrentQueue<string> queue;
    Timer tim;
    IRCBot host;
    public SendQueue(IRCBot host)
    {
        this.host = host;
        this.tim = new Timer();
        this.tim.Elapsed += new ElapsedEventHandler(this.SendNewMsg);
        this.queueht = new Hashtable();
        this.queue = new ConcurrentQueue<string>();
    }
    public void Add(string target, string msg)
    {
        try
        {
            this.queueht.Add(target, new ConcurrentQueue<string>());
        }
        finally
        {
            ((ConcurrentQueue<string>)this.queueht[target]).Enqueue(msg);
        }
        Hashtable ht = new Hashtable(queueht);
        List<string> unset = new List<string>();
        while (ht.Count > 0)
        {
            foreach (DictionaryEntry de in ht)
            {
                ConcurrentQueue<string> cq = (ConcurrentQueue<string>)de.Value;
                string res;
                if (cq.TryDequeue(out res))
                    this.queue.Enqueue((string)de.Key);
                else
                    unset.Add((string)de.Key);
            }
        }
        if (unset.Count > 0)
            foreach (string item in unset)
                ht.Remove(item);
    }
    private void SendNewMsg(object sender, ElapsedEventArgs e)
    {
        string target;
        if (queue.TryDequeue(out target))
        {
            string message;
            if (((ConcurrentQueue<string>)queueht[target]).TryDequeue(out message))
                this.host.Say(target, message);
        }
    }
}

EDIT2:我知道这while (ht.Count > 0)将无限期执行。它只是以前版本的一部分,看起来像这样:

while (ht.Count > 0)
{
    foreach (DictionaryEntry de in ht)
    {
        ConcurrentQueue<string> cq = (ConcurrentQueue<string>)de.Value;
        string res;
        if (cq.TryDequeue(out res))
            this.queue.Enqueue((string)de.Key);
        else
            ht.Remove((string)de.Key);
    }
}

但是集合在评估时不能被修改(我发现很难),所以它不再那样了。我只是忘了更改条件while

我冒昧地尝试了 TheThing 的解决方案。虽然它似乎实现了它的目的,但它并没有发送任何消息......这是它的最终形式:

class User
{
    public User(string username)
    {
        this.Username = username;
        this.RequestQueue = new Queue<string>();
    }
    public User(string username, string message)
        : this(username)
    {
        this.RequestQueue.Enqueue(message);
    }
    public string Username { get; set; }
    public Queue<string> RequestQueue { get; private set; }
}
class SendQueue
{
    Timer tim;
    IRCBot host;
    public bool shouldRun = false;
    public Dictionary<string, User> Users;  //Dictionary of users currently being processed
    public ConcurrentQueue<User> UserQueue; //List of order for which users should be processed
    public SendQueue(IRCBot launcher)
    {
        this.Users = new Dictionary<string, User>();
        this.UserQueue = new ConcurrentQueue<User>();
        this.tim = new Timer(WorkerTick, null, Timeout.Infinite, 450);
        this.host = launcher;
    }
    public void Add(string username, string request)
    {
        lock (this.UserQueue) //For threadsafety
        {
            if (this.Users.ContainsKey(username))
            {
                //The user is in the user list. That means he has previously sent request that are awaiting to be processed.
                //As such, we can safely add his new message at the end of HIS request list.

                this.Users[username].RequestQueue.Enqueue(request); //Add users new message at the end of the list
                return;
            }
            //User is not in the user list. Means it's his first request. Create him in the user list and add his message
            var user = new User(username, request);
            this.Users.Add(username, user); //Create the user and his message
            this.UserQueue.Enqueue(user); //Add the user to the last of the precessing users.
        }
    }
    public void WorkerTick(object sender)
    {
        if (shouldRun)
        {
            //This tick runs every 400ms and processes next message to be sent.
            lock (this.UserQueue) //For threadsafety
            {
                User user;
                if (this.UserQueue.TryDequeue(out user))            //Pop the next user to be processed.
                {
                    string message = user.RequestQueue.Dequeue();   //Pop his request
                    this.host.Say(user.Username, message);
                    if (user.RequestQueue.Count > 0)                //If user has more messages waiting to be processed
                    {
                        this.UserQueue.Enqueue(user);               //Add him at the end of the userqueue
                    }
                    else
                    {
                        this.Users.Remove(user.Username);           //User has no more messages, we can safely remove him from the user list
                    }
                }
            }
        }
    }
}

我尝试切换到ConcurrentQueue,它应该也可以工作(尽管以更线程安全的方式,并不是说我对线程安全一无所知)。我也尝试切换到System.Threading.Timer,但这也无济于事。我很久以前就没有想法了。

编辑:作为一个彻头彻尾的白痴,我没有设置 Timer 启动的时间。将 bool 部分更改为更改计时器的到期时间和间隔的 Start() 方法使其工作。问题解决了。

4

3 回答 3

0

据我所知,您希望能够按顺序排列用户以及他们的每个请求。

这意味着,如果一个用户请求像 1000 个请求,其他人仍然可以发送他们的请求,并且机器人以 FIFO 方式为每个用户提供 1 个请求。

如果是这样,那么您需要的是一种类似于此功能的方式:

class User
{
    public User(string username)
    {
        this.Username = username;
        this.RequestQueue = new Queue<string>();
    }

    public User(string username, string message)
        : this(username)
    {
        this.RequestQueue.Enqueue(message);
    }

    public string Username { get; set; }
    public Queue<string> RequestQueue { get; private set; }
}


///......................

public class MyClass
{
    public MyClass()
    {
        this.Users = new Dictionary<string, User>();
        this.UserQueue = new Queue<User>();
    }

    public Dictionary<string, User> Users; //Dictionary of users currently being processed
    public Queue<User> UserQueue; //List of order for which users should be processed

    public void OnMessageRecievedFromIrcChannel(string username, string request)
    {
        lock (this.UserQueue) //For threadsafety
        {
            if (this.Users.ContainsKey(username))
            {
                //The user is in the user list. That means he has previously sent request that are awaiting to be processed.
                //As such, we can safely add his new message at the end of HIS request list.

                this.Users[username].RequestQueue.Enqueue(request); //Add users new message at the end of the list
                return;
            }

            //User is not in the user list. Means it's his first request. Create him in the user list and add his message
            var user = new User(username, request);
            this.Users.Add(username, user); //Create the user and his message
            this.UserQueue.Enqueue(user); //Add the user to the last of the precessing users.
        }
    }

    //**********************************

    public void WorkerTick()
    {
        //This tick runs every 400ms and processes next message to be sent.
        lock (this.UserQueue) //For threadsafety
        {
            var user = this.UserQueue.Dequeue(); //Pop the next user to be processed.
            var message = user.RequestQueue.Dequeue(); //Pop his request

            /////PROCESSING MESSAGE GOES HERE

            if (user.RequestQueue.Count > 0) //If user has more messages waiting to be processed
            {
                this.UserQueue.Enqueue(user); //Add him at the end of the userqueue
            }
            else
            {
                this.Users.Remove(user.Username); //User has no more messages, we can safely remove him from the user list
            }
        }
    }
}

基本上,我们有一个用户队列。我们弹出下一个用户,处理他的第一个请求,如果他有更多请求等待处理,则将他添加到用户列表的末尾。

希望这能清除一些功能。作为记录,上面的代码更像是伪代码而不是功能代码 xD

于 2012-07-30T12:09:15.917 回答
0

据我所知,您永远不会逃脱,while因为您永远不会从临时哈希表中删除项目,ht直到它之外。因此,计数将始终为> 0

于 2012-07-30T12:30:23.990 回答
0

试试这个:

class User
{
    public User(string username)
    {
        this.Username = username;
        this.RequestQueue = new Queue<string>();
    }

    private static readonly TimeSpan _minPostThreshold = new TimeSpan(0,0,5); //five seconds

    public void PostMessage(string message)
    {
        var lastMsgTime = _lastMessageTime;
        _lastMessageTime = DateTime.Now;
        if (lastMsgTime != default(DateTime))
        {
            if ((_lastMessageTime - lastMsgTime) < _minPostThreshold)
            {
                return;
            }
        }

        _requestQueue.Enqueue(message);     
    }

    public string NextMessage
    {
        get
        {
            if (!HasMessages)
            {
                return null;
            }

            return _requestQueue.Dequeue();
        }
    }

    public bool HasMessages
    {
        get{return _requestQueue.Count > 0;}
    }

    public string Username { get; set; }
    private Queue<string> _requestQueue { get; private set; }
    private DateTime _lastMessageTime;
}

class SendQueue
{
    Timer tim;
    IRCBot host;
    public bool shouldRun = false;
    public Dictionary<string, User> Users;  //Dictionary of users currently being processed
    private Queue<User> _postQueue = new Queue<User>();

    public SendQueue(IRCBot launcher)
    {
        this.Users = new Dictionary<string, User>();
        this.tim = new Timer(WorkerTick, null, Timeout.Infinite, 450);
        this.host = launcher;
    }

    public void Add(string username, string request)
    {
        User targetUser;
        lock (Users) //For threadsafety
        {
            if (!Users.TryGetValue(username, out targetUser))
            {
                //User is not in the user list. Means it's his first request. Create him in the user list and add his message
                targetUser = new User(username);
                Users.Add(username, targetUser); //Create the user and his message
            }

            targetUser.PostMessage(request);
        }

        lock(_postQueue)
        {
            _postQueue.Enqueue(targetUser);
        }
    }

    public void WorkerTick(object sender)
    {
        if (shouldRun)
        {
            User nextUser = null;

            lock(_postQueue)
            {
                if (_postQueue.Count > 0)
                {
                    nextUser = _PostQueue.Dequeue();
                }
            }

            if (nextUser != null)
            {                
                host.Say(nextUser.Username, nextUser.NextMessage);
            }
        }
    }
}

更新:在更好地理解要求后更改。

这提供了每个用户的洪水控制和整体节流。它也简单得多。

请注意,这是动态编写的,甚至还没有编译,并且可能需要考虑一些围绕用户实例的线程问题,但它应该可以工作。

于 2012-08-03T19:04:29.033 回答