0

我正在使用 RESTful Web 服务的烧瓶服务器和 python-socketio 来实现服务器和客户端之间的双向通信,以跟踪后端的下载进度。

我获取在 server.py 文件中声明的变量 sio 并将其作为参数传递到一个新对象中,该对象将使用它向客户端发出某些关于它在服务器上下载文件的消息。

sio = socketio.Server(async_mode='threading')
omics_env = None

@sio.on('init', namespace='/guardiome')
def init(sid, data):
    global omics_env

    if omics_env == None:
        omics_env = Environment(socket=sio)
        omics_env.conda.download_conda()
        omics_env.data_management.download_omics_data()

问题是当文件在 python 服务器中下载时,每次将 1% 的数据写入文件时,它都会向客户端发出一条消息。但它并不总是在每次下载/写入 1% 的数据到文件时向客户端发出。

它通常会报告进度到 18%,暂停一段时间,然后报告 40%,跳过 18% 到 40% 之间的排放。

有人可能会说它的互联网可能滞后,但我确实在发出函数顶部的下载函数中打印了语句,这表明它正在写入/下载每 1% 的数据。

我还在线检查了其他资源。有些人提到使用 eventlet 并在服务器的最高代码级别执行类似的操作。

import eventlet
evenlet.monkey_patch()

但这根本不会导致代码发射。

其他人提到使用像 redis 这样的消息队列,但我不能使用 redis,我计划将整个 python 代码转换为二进制可执行文件,以便在 linux 平台上完全可移植以与本地客户端通信。

这是我的 server.py

import socketio
import eventlet.wsgi

from environment import Environment

from flask import Flask, jsonify, request, send_file
from flask_cors import CORS

omics_env = None

sio = socketio.Server(async_mode='threading')
app = Flask(__name__)
CORS(app)

@sio.on('init', namespace='/guardiome')
def init(sid, data):
    global omics_env

    if omics_env == None:
        omics_env = Environment(socket=sio)
        omics_env.conda.download_conda()
        omics_env.data_management.download_omics_data()

    omics_env.logger.info('_is_ready()')

    sio.emit(
        event='init',
        data={'status': True, 'information': None},
        namespace='/guardiome')


try:
    # wrap Flask application with engineio's middleware
    app.wsgi_app = socketio.Middleware(sio, app.wsgi_app)
    # Launch the server with socket integration
    app.run(port=8008, debug=False, threaded=True)

finally:
    pass
    # LOGGER.info('Exiting ...')

这是我将 sio 作为报告参数传入的 download_w_progress 函数

def download_w_progress(url , path, reporter=None):

    ssl._create_default_https_context = ssl._create_unverified_context
    r = requests.get(url, stream=True)

    # Helper lambda functions
    progress_report = lambda current, total: int((current/total)*100)
    raw_percent = lambda current, total: (current/total)*100

    # TODO(mak3): Write lambda function for reporting amount of file downloaded
    # in MB, KB, GB, or whatever

    with open(path, 'wb') as f:

        total_length = int(r.headers.get('content-length'))
        progress_count = 0
        chunk_size = 1024

        # Used to cut down on emit the same rounded percentage number
        previous_percent = -1

        # Read and write the file in chunks to its destination
        for chunk in r.iter_content(chunk_size=1024):
            progress_dict = {
                "percent": progress_report(progress_count, total_length)
            }

            if reporter != None:
                # Limit the number of emits sent to prevent
                # to socket from overworking
                if progress_dict["percent"] != previous_percent:
                    reporter.emit(event="environment", namespace="/guardiome", data=progress_dict)


            # TODO(mak3): Remove or uncomment in production
            if progress_dict["percent"] != previous_percent:
                print(progress_dict["percent"], end='\r')

            progress_count += chunk_size
            previous_percent = progress_dict["percent"]

            if chunk:
                f.write(chunk)
                f.flush()
4

1 回答 1

1

抱歉,当您发布此问题时,我错过了这个问题。

您的代码中有几个问题。您正在选择async_mode='threading. 一般来说,最好省略这个参数,让服务器根据您使用的服务器选择最佳异步模式。当你添加eventlet时,比如threading模式是不行的,其实eventlet有一个特定的async模式。

所以我的建议是:

  1. 删除构造函数async_mode中的参数socketio.Server()
  2. 在您的虚拟环境中安装 eventlet
  3. 用启动 eventlet 服务器的代码替换app.run()脚本中的部分,或者如果您使用的是 Flask,请使用 Flask-SocketIO 扩展,它已经内置了此代码。
  4. 在读取文件的循环内添加一个sio.sleep(0)调用。这将使 eventlet 有机会保持所有任务顺利运行。
于 2019-03-15T11:54:52.497 回答