1

我有一个快速的 Node.js 应用程序和一个使用 python 的机器学习算法。例如,我正在使用 RabbitMQ 来集成 Node.Js 和 Python,并且它可以工作,我的意思是,这比使用带有 spawn 的库 child_process 更具性能。但是我在同步工人对他们各自请求的响应时遇到了麻烦。

我的代码类似于下面的示例(玩具示例)并基于这篇文章1。这个实现有两个主要问题,它向客户端(邮递员)发送错误的答案或者没有完成请求。

这段代码应该接受来自客户端的请求(图像和类型),将此任务放入队列(任务队列),等待工作人员完成其工作并将结果发送给客户端(正确的)。

import express from "express";
import bodyParser from 'body-parser';
import cors from 'cors';
import path from 'path';
import multer from "multer";
import amqp from "amqplib/callback_api.js";

const port = 3000;

const app = express();

app.use(express.json());
app.use(bodyParser.urlencoded({extended: true}));
app.use(cors());


const storage = multer.memoryStorage();
const upload = multer({ storage: storage });

app.get('/', (req, res) => {
    res.status(200).send({message: "ok"});
});

app.post("/classify", upload.single("image"), async(req, res) => {

    const { type } = req.body;

    const task = Buffer.from(
        JSON.stringify({
            type: type, 
            image: req.file.buffer.toString("base64")
        })
    );

    amqp.connect("amqp://localhost", (err, conn) => {
        conn.createChannel((err, ch) => {
            ch.assertQueue("tasks", {durable: false});
            ch.assertQueue("results", {durable: false});
            ch.sendToQueue("tasks", task);

            ch.consume("results", (msg) => {
                const predictions = JSON.parse(msg.content.toString());
                res.status(200).send({message: predictions });
                
            },{noAck: true});
        });
        setTimeout(() => {conn.close();}, 500)
    });
});

console.clear();

app.listen(port, () => {
    console.log(`Server listening at http://localhost:${port}`)
})

此代码应从队列(任务队列)中获取任务,运行它们并将结果放入结果队列。

#!/usr/bin/env python
import pika
import json
import time 
import random

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue="tasks")
channel.queue_declare(queue="results")


# Simulate the execution of a machine learning model
def run_model(image, type):
    time.sleep(random.randint(1, 4))
    return random.choice([" Iris setosa", "Iris virginica", "Iris versicolor"])

   
def callback(ch, method, properties, body):
    params = json.loads(body.decode('utf-8'))
    type = str(params["type"])
    image = params['image']

    print("Worker received a new task...")

    results = run_model(image, type)

    # send a message back
    channel.basic_publish(
        exchange="", 
        routing_key="results", 
        body=json.dumps(results, ensure_ascii=False)
    )
  
    #connection.close()

# receive message and complete the task
channel.basic_consume("tasks", callback, auto_ack=True)
channel.start_consuming()

如何解决这些问题?

4

1 回答 1

0

我对 RabbitMQ 不是很熟悉,但我认为您需要一种方法来唯一标识发送给 Python 工作者的每个任务。

尝试在 Node.js 端生成一个 UUID(使用uuid包),并将其与作业数据一起传递。然后,使用 Node.js 中的results-${uuid}通道。

当 Python 工作者完成时,让它使用results-${uuid}传入参数中的路由键。这样,每个请求只监听其特定的结果通道。

于 2020-11-27T03:30:06.397 回答