我正在尝试将 python ReactiveX 流(使用 RxPy 库)发送到 Web UI 组件上的 javascript,但我似乎找不到这样做的方法。此外,我可能需要将进入 Javascript 的数据流放入各种 RxJS Observable 中以进行进一步处理。你能帮我理解如何实现这一目标吗?我仍然掌握 ReactiveX,所以也许我缺少一些基本概念,但我正在努力在网上找到类似的东西。
这个问题出现在我正在开发一个桌面应用程序时,该应用程序从 csv 或 zeromq 端点获取数据,并将其流式传输到将动态绘制数据的 UI(随着新数据的到来更新绘图)。我正在使用 Electron 构建我的应用程序,使用 python 作为我的后端代码。Python 是必须的,因为我将使用一些 TensorFlow 模型扩展应用程序。
多年来,作为初始结构的示例非常好,我编写了一些示例代码来玩,但我似乎无法让它工作。我设法从 UI 按钮一直到 python 脚本,但我陷入了 PriceApi.get_stream(...) 方法的返回。
索引.html
前端是笔直的。
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Electron Application</title>
</head>
<body>
<button id="super-button">Trigger Python Code</button>
<div id="py-output">
</div>
</body>
<script src="renderer.js" ></script>
</html>
api.py:
ZeroRPC 服务器文件与上述链接中的文件类似。
import gevent
import json
import signal
import zerorpc
from core_operator import stream
class PricesApi(object):
def get_stream(self, filename):
return stream(filename)
def stop(self):
print('Stopping strategy.')
def echo(self, text):
"""echo any text"""
return text
def load_settings():
with open('settings.json') as json_settings:
settings_dictionary = json.load(json_settings)
return settings_dictionary
def main():
settings = load_settings()
s = zerorpc.Server(PricesApi())
s.bind(settings['address'])
print(f"Initialising server on {settings['address']}")
s.run()
if __name__ == '__main__':
main()
core_operator.py
这是主要逻辑将从 zeroMQ 订阅获取价格的文件,但目前只是从 csv 创建一个 Observable。
import sys
import rx
from csv import DictReader
def prepare_csv_timeseries_stream(filename):
return rx.from_(DictReader(open(filename, 'r')))
def stream(filename):
price_observable = prepare_csv_timeseries_stream(filename)
return price_observable
渲染.js
最后,应该接收流的 javascript:
const zerorpc = require('zerorpc');
const fs = require('fs')
const settings_block = JSON.parse(fs.readFileSync('./settings.json').toString());
let client = new zerorpc.Client();
client.connect(settings_block['address']);
let button = document.querySelector('#super-button');
let pyOutput = document.querySelector('#py-output');
let filename = '%path-to-file%'
button.addEventListener('click', () => {
let line_to_write = '1'
console.log('button click received.')
client.invoke('get_stream', filename, (error, result) => {
var messages = pyOutput;
message = document.createElement('li'),
content = document.createTextNode(error.data);
message.appendChild(content);
messages.appendChild(message);
if(error) {
console.error(error);
} else {
var messages = pyOutput;
message = document.createElement('li'),
content = document.createTextNode(result.data);
message.appendChild(content);
messages.appendChild(message);
}
})
})
我一直在研究使用 WebSockets,但未能理解如何实现它。我确实找到了一些使用 Tornado 服务器的示例,但是我试图使其尽可能纯净,而且,我已经有了来自 Electron 的客户端/服务器结构,我无法直接使用它,这感觉很奇怪。此外,我正在尝试将整个系统维护为 PUSH 结构,因为数据要求不允许 PULL 类型的模式,以及定期轮询等。
非常感谢您随时可以为此付出代价,如果您需要任何进一步的详细信息或解释,请告诉我。