Node.js 非常适合我们的 Web 项目,但是我们更喜欢 Python 的计算任务很少。我们也已经为他们准备了 Python 代码。我们非常关注速度,如何以异步非阻塞方式从 node.js 调用 Python“worker”最优雅的方式是什么?
7 回答
这听起来像是 zeroMQ 非常适合的场景。这是一个类似于使用 TCP 或 Unix 套接字的消息传递框架,但它更加健壮(http://zguide.zeromq.org/py:all)
有一个库使用 zeroMQ 来提供运行良好的 RPC 框架。它被称为 zeroRPC ( http://www.zerorpc.io/ )。这里是你好世界。
Python“你好 x”服务器:
import zerorpc
class HelloRPC(object):
'''pass the method a name, it replies "Hello name!"'''
def hello(self, name):
return "Hello, {0}!".format(name)
def main():
s = zerorpc.Server(HelloRPC())
s.bind("tcp://*:4242")
s.run()
if __name__ == "__main__" : main()
和 node.js 客户端:
var zerorpc = require("zerorpc");
var client = new zerorpc.Client();
client.connect("tcp://127.0.0.1:4242");
//calls the method on the python object
client.invoke("hello", "World", function(error, reply, streaming) {
if(error){
console.log("ERROR: ", error);
}
console.log(reply);
});
反之亦然,node.js 服务器:
var zerorpc = require("zerorpc");
var server = new zerorpc.Server({
hello: function(name, reply) {
reply(null, "Hello, " + name, false);
}
});
server.bind("tcp://0.0.0.0:4242");
和python客户端
import zerorpc, sys
c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
name = sys.argv[1] if len(sys.argv) > 1 else "dude"
print c.hello(name)
对于 node.js 和 Python 服务器之间的通信,如果两个进程在同一服务器上运行,我将使用 Unix 套接字,否则使用 TCP/IP 套接字。对于封送协议,我会采用 JSON 或协议缓冲区。如果线程化 Python 成为瓶颈,请考虑使用Twisted Python,它提供与 node.js 相同的事件驱动并发。
如果您喜欢冒险,请学习clojure(clojurescript、clojure-py),您将获得与 Java、JavaScript(包括 node.js)、CLR 和 Python 上的现有代码运行和互操作的相同语言。您只需使用 clojure 数据结构即可获得出色的编组协议。
如果您安排将 Python 工作者放在一个单独的进程中(长时间运行的服务器类型进程或按需生成的子进程),那么您与它的通信将在 node.js 端是异步的。UNIX/TCP 套接字和标准输入/输出/错误通信在节点中本质上是异步的。
我使用thoonk.js和thoonk.py取得了很大的成功。Thoonk 利用 Redis(内存中键值存储)为您提供供稿(想想发布/订阅)、队列和作业模式以进行通信。
为什么这比 unix 套接字或直接 tcp 套接字更好?整体性能可能会有所降低,但是 Thoonk 提供了一个非常简单的 API,可以简化手动处理套接字的过程。Thoonk 还有助于使实现分布式计算模型变得非常简单,该模型允许您扩展您的 python worker 以提高性能,因为您只需启动 python worker 的新实例并将它们连接到同一个 redis 服务器。
我也会考虑 Apache Thrift http://thrift.apache.org/
它可以在多种编程语言之间架起桥梁,效率很高,并且支持异步或同步调用。在此处查看完整功能http://thrift.apache.org/docs/features/
多语言对未来的计划很有用,例如,如果您以后想用 C++ 完成部分计算任务,可以很容易地使用 Thrift 将其添加到混合中。
我建议使用一些工作队列,例如优秀的Gearman,它将为您提供一种调度后台作业的好方法,并在处理完它们后异步获取它们的结果。
在 Digg(以及许多其他公司)大量使用的这一优势在于,它提供了一种强大、可扩展和稳健的方式,使任何语言的工作人员都能以任何语言与客户交谈。
2019 年更新
有几种方法可以实现这一点,这里是复杂度递增顺序的列表
- Python Shell,您将向 python 控制台写入流,它会回写给您
- Redis Pub Sub,当你的节点 js 发布者推送数据时,你可以在 Python 中监听一个频道
- Websocket 连接,其中 Node 充当客户端,Python 充当服务器,反之亦然
- 与 Express/Flask/Tornado 等的 API 连接与公开的 API 端点单独工作以供对方查询
方法一 Python Shell 最简单的方法
source.js 文件
const ps = require('python-shell')
// very important to add -u option since our python script runs infinitely
var options = {
pythonPath: '/Users/zup/.local/share/virtualenvs/python_shell_test-TJN5lQez/bin/python',
pythonOptions: ['-u'], // get print results in real-time
// make sure you use an absolute path for scriptPath
scriptPath: "./subscriber/",
// args: ['value1', 'value2', 'value3'],
mode: 'json'
};
const shell = new ps.PythonShell("destination.py", options);
function generateArray() {
const list = []
for (let i = 0; i < 1000; i++) {
list.push(Math.random() * 1000)
}
return list
}
setInterval(() => {
shell.send(generateArray())
}, 1000);
shell.on("message", message => {
console.log(message);
})
目的地.py 文件
import datetime
import sys
import time
import numpy
import talib
import timeit
import json
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
size = 1000
p = 100
o = numpy.random.random(size)
h = numpy.random.random(size)
l = numpy.random.random(size)
c = numpy.random.random(size)
v = numpy.random.random(size)
def get_indicators(values):
# Return the RSI of the values sent from node.js
numpy_values = numpy.array(values, dtype=numpy.double)
return talib.func.RSI(numpy_values, 14)
for line in sys.stdin:
l = json.loads(line)
print(get_indicators(l))
# Without this step the output may not be immediately available in node
sys.stdout.flush()
注意:创建一个名为subscriber 的文件夹,它与source.js 文件处于同一级别,并将destination.py 放入其中。不要忘记更改您的 virtualenv 环境