在 RabbitMQ 站点的RPC教程中,有一种方法可以传递一个“相关 id”,它可以识别您的消息给队列中的用户。
我建议您将某种 id 与您的消息一起使用到前 3 个队列中,然后有另一个过程将消息从 3 个队列中取出到某种类型的存储桶中。当这些桶收到我假设完成的 3 个任务时,将最终消息发送到第 4 个队列进行处理。
如果您为一个用户向每个队列发送超过 1 个工作项,您可能需要进行一些预处理以找出特定用户放入队列中的项目数量,以便在 4 之前出队的进程知道在排队之前预期有多少项目向上。
我在 C# 中做我的 rabbitmq,所以很抱歉我的伪代码不是 php 样式
// Client
byte[] body = new byte[size];
body[0] = uniqueUserId;
body[1] = howManyWorkItems;
body[2] = command;
// Setup your body here
Queue(body)
// Server
// Process queue 1, 2, 3
Dequeue(message)
switch(message.body[2])
{
// process however you see fit
}
processedMessages[message.body[0]]++;
if(processedMessages[message.body[0]] == message.body[1])
{
// Send to queue 4
Queue(newMessage)
}
对更新 #1 的响应
与其将客户端视为终端,不如将客户端视为服务器上的进程。因此,如果您在这样的服务器上设置 RPC 客户端,那么您需要做的就是让服务器处理用户唯一 ID 的生成并将消息发送到适当的队列:
public function call($uniqueUserId, $workItem) {
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
serialize(array($uniqueUserId, $workItem)),
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while(!$this->response) {
$this->channel->wait();
}
// We assume that in the response we will get our id back
return deserialize($this->response);
}
$rpc = new Rpc();
// Get unique user information and work items here
// Pass even more information in here, like what queue to use or you could even loop over this to send all the work items to the queues they need.
$response = rpc->call($uniqueUserId, $workItem);
$responseBuckets[array[0]]++;
// Just like above code that sees if a bucket is full or not