我有一个快速的 Node.js 应用程序和一个使用 python 的机器学习算法。例如,我正在使用 RabbitMQ 来集成 Node.Js 和 Python,并且它可以工作,我的意思是,这比使用带有 spawn 的库 child_process 更具性能。但是我在同步工人对他们各自请求的响应时遇到了麻烦。
我的代码类似于下面的示例(玩具示例)并基于这篇文章1。这个实现有两个主要问题,它向客户端(邮递员)发送错误的答案或者没有完成请求。
这段代码应该接受来自客户端的请求(图像和类型),将此任务放入队列(任务队列),等待工作人员完成其工作并将结果发送给客户端(正确的)。
import express from "express";
import bodyParser from 'body-parser';
import cors from 'cors';
import path from 'path';
import multer from "multer";
import amqp from "amqplib/callback_api.js";
const port = 3000;
const app = express();
app.use(express.json());
app.use(bodyParser.urlencoded({extended: true}));
app.use(cors());
const storage = multer.memoryStorage();
const upload = multer({ storage: storage });
app.get('/', (req, res) => {
res.status(200).send({message: "ok"});
});
app.post("/classify", upload.single("image"), async(req, res) => {
const { type } = req.body;
const task = Buffer.from(
JSON.stringify({
type: type,
image: req.file.buffer.toString("base64")
})
);
amqp.connect("amqp://localhost", (err, conn) => {
conn.createChannel((err, ch) => {
ch.assertQueue("tasks", {durable: false});
ch.assertQueue("results", {durable: false});
ch.sendToQueue("tasks", task);
ch.consume("results", (msg) => {
const predictions = JSON.parse(msg.content.toString());
res.status(200).send({message: predictions });
},{noAck: true});
});
setTimeout(() => {conn.close();}, 500)
});
});
console.clear();
app.listen(port, () => {
console.log(`Server listening at http://localhost:${port}`)
})
此代码应从队列(任务队列)中获取任务,运行它们并将结果放入结果队列。
#!/usr/bin/env python
import pika
import json
import time
import random
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue="tasks")
channel.queue_declare(queue="results")
# Simulate the execution of a machine learning model
def run_model(image, type):
time.sleep(random.randint(1, 4))
return random.choice([" Iris setosa", "Iris virginica", "Iris versicolor"])
def callback(ch, method, properties, body):
params = json.loads(body.decode('utf-8'))
type = str(params["type"])
image = params['image']
print("Worker received a new task...")
results = run_model(image, type)
# send a message back
channel.basic_publish(
exchange="",
routing_key="results",
body=json.dumps(results, ensure_ascii=False)
)
#connection.close()
# receive message and complete the task
channel.basic_consume("tasks", callback, auto_ack=True)
channel.start_consuming()
如何解决这些问题?