2

我目前正在尝试使用 gRPC Node.js API 设置服务器流。为此,当我在服务器端写入客户端立即接收数据事件的流时,我想实现这一点。

目前,如果我只在服务器端调用 write,我不会在客户端收到任何东西。但是,只要我在服务器上调用 end 函数,客户端就会收到所有数据事件。

为了测试这一点,我使用了一个无限的 while 循环在服务器端编写消息。然后客户端不接收消息(数据事件)。相反,如果我使用 for 循环并在之后调用 end ,则客户端在调用 end 时会收到所有消息(数据事件)。

我的 .proto 文件:

syntax = "proto3";

message ControlMessage {
  enum Control {
    Undefined = 0;
    Start = 1;
    Stop = 2;
  }
  Control control = 1;
}

message ImageMessage {
  enum ImageType {
    Raw = 0;
    Mono8 = 1;
    RGB8 = 2;
  }
  ImageType type = 1;
  int32 width = 2;
  int32 height = 3;
  bytes image = 4;
}

service StartImageTransmission {
  rpc Start(ControlMessage) returns (stream ImageMessage);
}

在服务器端,我实现了 start 函数并尝试无休止地向调用写入消息:

function doStart(call) {
  var imgMsg = {type: "Mono8", width: 600, height: 600, image: new ArrayBuffer(600*600)};
  //for(var i = 0; i < 10; i++) {
  while(true) {
    call.write(imgMsg);
    console.log("Message sent");
  }
  call.end();
}

我将该功能注册为服务器中的服务:

var server = new grpc.Server();
server.addService(protoDescriptor.StartImageTransmission.service, {Start: doStart});

在客户端,我生成一个适当的调用并注册数据和结束事件:

var call = client.Start({control: 0});
call.on('data', (imgMessage) => {
  console.log('received image message');
});
call.read();
call.on('end', () => {console.log('end');});

我还尝试用python编写服务器端。在这种情况下,节点客户端会立即接收消息,而不仅仅是在服务器端结束流之后。所以我想这对于使用 Node API 编写的服务器也应该是可能的。

4

2 回答 2

1

似乎问题在于无休止的while循环阻塞了节点中的所有后台任务。一个可能的解决方案是使用setTimeout创建循环。以下代码对我有用:

首先在 gRPC 调用中将call对象存储在一个数组中:

function doStart(call) {
  calls.push(call);
}

为了发送给所有客户,我使用了 setTimeout:

function sendToAllClients() {
  calls.forEach((call) => {
    call.write(imgMsg);
  });
  setTimeout(sendToAllClients, 10);
}

setTimeout(sendToAllClients, 10);

有用的stackoverflow atricle:为什么while循环会阻塞事件循环?

于 2019-10-09T09:56:58.160 回答
0

我能够使用uncork来自 Node.js 的Writable.

这是一个例子。伪代码,但从一个工作实现中提取:

import * as grpc from '@grpc/grpc-js';
import * as proto from './src/proto/generated/organizations'; // via protoc w/ ts-proto

const OrganizationsGrpcServer: proto.OrganizationsServer = {
  async getMany(call: ServerWritableStream<proto.Empty, proto.OrganizationCollection>) {
    call.write(proto.OrganizationCollection.fromJSON({ value: [{}] }));
    call.uncork();
    // do some blocking stuff
    call.write(proto.OrganizationCollection.fromJSON({ value: [{}] }));
    call.uncork();
    // call.end(), or client.close() below, at some point?
  },
  ping(call, callback) {
    callback(null);
  }
};

const client = new proto.OrganizationsClient('127.0.0.1:5000', grpc.credentials.createInsecure());
const stream = client.getMany(null);
stream.on('data', data => {
//  this cb should run twice
});
export default OrganizationsGrpcServer;
//.proto
service Organizations {
  rpc GetMany (google.protobuf.Empty) returns (stream OrganizationCollection) {}
}

message OrganizationCollection {
  repeated Organization value = 1;
}

版本:

  • @grpc/grpc-js 1.4.4
  • @grpc/proto-loader 0.6.7
  • ts-proto 1.92.1
  • npm 8.1.4
  • node 17
于 2021-12-02T21:36:44.637 回答