132

Node.js 非常适合我们的 Web 项目,但是我们更喜欢 Python 的计算任务很少。我们也已经为他们准备了 Python 代码。我们非常关注速度,如何以异步非阻塞方式从 node.js 调用 Python“worker”最优雅的方式是什么?

4

7 回答 7

121

这听起来像是 zeroMQ 非常适合的场景。这是一个类似于使用 TCP 或 Unix 套接字的消息传递框架,但它更加健壮(http://zguide.zeromq.org/py:all

有一个库使用 zeroMQ 来提供运行良好的 RPC 框架。它被称为 zeroRPC ( http://www.zerorpc.io/ )。这里是你好世界。

Python“你好 x”服务器:

import zerorpc

class HelloRPC(object):
    '''pass the method a name, it replies "Hello name!"'''
    def hello(self, name):
        return "Hello, {0}!".format(name)

def main():
    s = zerorpc.Server(HelloRPC())
    s.bind("tcp://*:4242")
    s.run()

if __name__ == "__main__" : main()

和 node.js 客户端:

var zerorpc = require("zerorpc");

var client = new zerorpc.Client();
client.connect("tcp://127.0.0.1:4242");
//calls the method on the python object
client.invoke("hello", "World", function(error, reply, streaming) {
    if(error){
        console.log("ERROR: ", error);
    }
    console.log(reply);
});

反之亦然,node.js 服务器:

var zerorpc = require("zerorpc");

var server = new zerorpc.Server({
    hello: function(name, reply) {
        reply(null, "Hello, " + name, false);
    }
});

server.bind("tcp://0.0.0.0:4242");

和python客户端

import zerorpc, sys

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:4242")
name = sys.argv[1] if len(sys.argv) > 1 else "dude"
print c.hello(name)
于 2013-03-27T13:54:55.757 回答
89

对于 node.js 和 Python 服务器之间的通信,如果两个进程在同一服务器上运行,我将使用 Unix 套接字,否则使用 TCP/IP 套接字。对于封送协议,我会采用 JSON 或协议缓冲区。如果线程化 Python 成为瓶颈,请考虑使用Twisted Python,它提供与 node.js 相同的事件驱动并发。

如果您喜欢冒险,请学习clojureclojurescriptclojure-py),您将获得与 Java、JavaScript(包括 node.js)、CLR 和 Python 上的现有代码运行和互操作的相同语言。您只需使用 clojure 数据结构即可获得出色的编组协议。

于 2012-05-27T16:17:28.130 回答
7

如果您安排将 Python 工作者放在一个单独的进程中(长时间运行的服务器类型进程或按需生成的子进程),那么您与它的通信将在 node.js 端是异步的。UNIX/TCP 套接字和标准输入/输出/错误通信在节点中本质上是异步的。

于 2012-05-27T16:17:03.947 回答
6

我使用thoonk.jsthoonk.py取得了很大的成功。Thoonk 利用 Redis(内存中键值存储)为您提供供稿(想想发布/订阅)、队列和作业模式以进行通信。

为什么这比 unix 套接字或直接 tcp 套接字更好?整体性能可能会有所降低,但是 Thoonk 提供了一个非常简单的 API,可以简化手动处理套接字的过程。Thoonk 还有助于使实现分布式计算模型变得非常简单,该模型允许您扩展您的 python worker 以提高性能,因为您只需启动 python worker 的新实例并将它们连接到同一个 redis 服务器。

于 2012-10-18T13:27:29.660 回答
6

我也会考虑 Apache Thrift http://thrift.apache.org/

它可以在多种编程语言之间架起桥梁,效率很高,并且支持异步或同步调用。在此处查看完整功能http://thrift.apache.org/docs/features/

多语言对未来的计划很有用,例如,如果您以后想用 C++ 完成部分计算任务,可以很容易地使用 Thrift 将其添加到混合中。

于 2013-02-06T07:08:20.387 回答
3

我建议使用一些工作队列,例如优秀的Gearman,它将为您提供一种调度后台作业的好方法,并在处理完它们后异步获取它们的结果。

在 Digg(以及许多其他公司)大量使用的这一优势在于,它提供了一种强大、可扩展和稳健的方式,使任何语言的工作人员都能以任何语言与客户交谈。

于 2012-05-27T16:19:50.467 回答
1

2019 年更新

有几种方法可以实现这一点,这里是复杂度递增顺序的列表

  1. Python Shell,您将向 python 控制台写入流,它会回写给您
  2. Redis Pub Sub,当你的节点 js 发布者推送数据时,你可以在 Python 中监听一个频道
  3. Websocket 连接,其中 Node 充当客户端,Python 充当服务器,反之亦然
  4. 与 Express/Flask/Tornado 等的 API 连接与公开的 API 端点单独工作以供对方查询

方法一 Python Shell 最简单的方法

source.js 文件

const ps = require('python-shell')
// very important to add -u option since our python script runs infinitely
var options = {
    pythonPath: '/Users/zup/.local/share/virtualenvs/python_shell_test-TJN5lQez/bin/python',
    pythonOptions: ['-u'], // get print results in real-time
    // make sure you use an absolute path for scriptPath
    scriptPath: "./subscriber/",
    // args: ['value1', 'value2', 'value3'],
    mode: 'json'
};

const shell = new ps.PythonShell("destination.py", options);

function generateArray() {
    const list = []
    for (let i = 0; i < 1000; i++) {
        list.push(Math.random() * 1000)
    }
    return list
}

setInterval(() => {
    shell.send(generateArray())
}, 1000);

shell.on("message", message => {
    console.log(message);
})

目的地.py 文件

import datetime
import sys
import time
import numpy
import talib
import timeit
import json
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

size = 1000
p = 100
o = numpy.random.random(size)
h = numpy.random.random(size)
l = numpy.random.random(size)
c = numpy.random.random(size)
v = numpy.random.random(size)

def get_indicators(values):
    # Return the RSI of the values sent from node.js
    numpy_values = numpy.array(values, dtype=numpy.double) 
    return talib.func.RSI(numpy_values, 14)

for line in sys.stdin:
    l = json.loads(line)
    print(get_indicators(l))
    # Without this step the output may not be immediately available in node
    sys.stdout.flush()

注意:创建一个名为subscriber 的文件夹,它与source.js 文件处于同一级别,并将destination.py 放入其中。不要忘记更改您的 virtualenv 环境

于 2019-04-20T16:57:06.320 回答