我有一个计算量很大的过程,需要几分钟才能在服务器中完成。所以我想通过 websockets 将每次迭代的结果发送给客户端。

整个应用程序有效,但我的问题是,在整个模拟完成后,所有消息都以一大块形式到达客户端。我必须在这里遗漏一些东西,因为我希望await websocket.send_json()在此过程中发送消息,而不是最后发送所有消息。

服务器 python (FastAPI)

# A very simplified abstraction of the actual app.

def simulate_intervals(data):
  for t in range(data.n_intervals):
    state = interval(data) # returns a JAX NumPy array
    yield state

def simulate(data):
  for key in range(data.n_trials):
     trial = simulate_intervals(data)
     yield trial

async def socket(websocket: WebSocket):

  await websocket.accept()
  while True:
    # Get model inputs from client
    data = await websocket.receive_text()
    # Minimal computation
    nodes = distributions(data)

    nodosJson = json.dumps(nodes, cls=NumpyEncoder)
    # I expect this message to be sent early on,
    # but the client gets it at the end with all the other messages. 
    await websocket.send_json({"tipo": "nodos", "datos": json.loads(nodosJson)})
    # Heavy computation
    trials = simulate(data)

    for trialI, trial in enumerate(trials):
      for stateI, state in enumerate(trial):
        stateString = json.dumps(state, cls=NumpyEncoder)

        await websocket.send_json(
            "tipo": "estado",
            "datos": json.loads(stateString),
            "trialI": trialI,
            "stateI": stateI,

    await websocket.send_json({"tipo": "estado", "msg": "fin"})



const ws = new WebSocket('ws://localhost:8000/ws');

ws.onopen = () => {
  console.log('Conexión exitosa');

ws.onmessage = (e) => {
  const mensaje = JSON.parse(e.data);

botonEnviarDatos.onclick = () => {

1 回答 1








# A very simplified abstraction of the actual app.

def simulate_intervals(data):
  for t in range(data.n_intervals):
    state = interval(data) # returns a JAX NumPy array
    yield state

def simulate(data):
  for key in range(data.n_trials):
     trial = simulate_intervals(data)
     yield trial

async def socket(websocket: WebSocket):

  await websocket.accept()
  while True:
    # Get messages from client
    data = await websocket.receive_text()
    # "tipo" is basically the type of data being sent from client or server to the other one.
    # In this case, "tipo": "inicio" is the client sending inputs and requesting for a certain data in response.
    if data["tipo"] == "inicio":
      # Minimal computation
      nodes = distributions(data)

      nodosJson = json.dumps(nodes, cls=NumpyEncoder)
      # In this first interaction, the client gets the first message without delay. 
      await websocket.send_json({"tipo": "nodos", "datos": json.loads(nodosJson)})

      # Since this is a generator (def returns yield) it does not actually
      # trigger that actual computationally heavy process. 
      trials = simulate(data)
      # define some initial variables to count the iterations
      trialI = 0
      stateI = 0
      trialsLen = args.number_trials
      statesLen = 600
      # load the first trial (also a generator)
      # without the for loop used before, the counters and next()
      # allow us to do the same as being done before in the for loop
      trial = next(trials)

      # With the use of generators and next() it is possible to keep
      # this first message light on the server and send the first response
      # as quickly as possible.
    # This type of message asks for the next instance of the simluation
    # without processing the entire model.
    elif data["tipo"] == "sim":
      # check if we are within the limits (before this was a nested for loop)
      if trialI < trialsLen and stateI < statesLen:
        # Trigger the next instance of the simulation
        state = next(trial)
        # update counter
        stateI = stateI + 1
        # Send the message with 1 instance of the simulation.
        stateString = json.dumps(state, cls=NumpyEncoder)
        await websocket.send_json(
             "tipo": "estado",
             "datos": json.loads(stateString),
             "trialI": trialI,
             "stateI": stateI,
        # Check if the second loop is done
        if stateI == statesLen:
          # update counter of first loop
          trialI = trialI + 1
          # update counter of second loop
          stateI = 0
          # Check if there are more pending trials,
          # otherwise stop and notify the client we are done.
            trial = next(trials)
          except StopIteration:
            await websocket.send_json({"tipo": "fin"})



ws.onmessage = (e) => {
  const mensaje = JSON.parse(e.data);
  // Simply check the type of incoming message so it can be processed
  if (mensaje.tipo === 'fin') {
  } else if (mensaje.tipo === 'nodos') {
  } else if (mensaje.tipo === 'estado') {

  // After receiving a message, ping the server for the next one 
      tipo: 'sim',


于 2021-08-25T18:17:45.480 回答