3

我有一个用 python 实现的 gRPC 服务器,我正在从 NodeJS 调用一个 RPC,但它给出了一个错误“找不到方法”。当我使用python客户端调用时,请求成功。

stream_csv.proto

syntax = "proto3";

package csv;

service Stream {
  rpc csvToObject(CSVDataRequest) returns (stream CSVDataResponse) {};
  rpc sayHello(HelloRequest) returns (HelloReply);
}

message CSVDataRequest{
  string url = 1;
  enum Protocol {
    HTTP = 0;
    HTTPS = 1;
    FTP = 2;
    SFTP = 3;

}

  Protocol protocol = 2;
}

message CSVDataResponse{
  repeated string row = 1;
}

message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

客户端.js

var PROTO_PATH = '../stream_csv.proto';

var grpc = require('grpc');
var protoLoader = require('@grpc/proto-loader');
var packageDefinition = protoLoader.loadSync(
    PROTO_PATH,
    {keepCase: true,
     longs: String,
     enums: String,
     defaults: true,
     oneofs: true
    });
var proto = grpc.loadPackageDefinition(packageDefinition).csv;

function main() {
  var client = new proto.Stream('localhost:5000',
                                       grpc.credentials.createInsecure());
  var user;
  if (process.argv.length >= 3) {
    user = process.argv[2];
  } else {
    user = 'world';
  }
  console.log(user);
  client.sayHello({name: user}, function(err, response) {
    console.log(err);
  });
}

main();

服务器.py

import grpc import stream_csv_pb2 import urllib.request from urllib.error import HTTPError, URLError from concurrent import futures

class DataService:

    def csv_to_object(self, request, context):
        url = request.url
        protocol = stream_csv_pb2.CSVDataRequest.Protocol.Name(
            request.protocol)
        fetch_url = protocol.lower() + "://"+url

        try:
            with urllib.request.urlopen(fetch_url) as data:
                for line in data:
                    decoded_line = line.decode()
                    val = decoded_line.split(',')
                    print(val)
                    print("Data send")
                    yield stream_csv_pb2.CSVDataResponse(row=val)

            print("Sending finished!")

        except URLError as e:
            context.abort(grpc.StatusCode.UNKNOWN,
                        'Randomly injected failure.')
            # return stream_csv_pb2.CSVDataResponse(row=[], error=e.reason)

    def SayHello(self, request, context):
        name = request.name
        print(name)
        return stream_csv_pb2.HelloReply(message='Hello %s' % (name))

def add_DataServicer_to_server(servicer, server):
    rpc_method_handlers = {
        'CSVToObject': grpc.unary_stream_rpc_method_handler(
            servicer.csv_to_object,
            request_deserializer=stream_csv_pb2.CSVDataRequest.FromString,
            response_serializer=stream_csv_pb2.CSVDataResponse.SerializeToString,
        ),
        'SayHello': grpc.unary_unary_rpc_method_handler(
            servicer.SayHello,
            request_deserializer=stream_csv_pb2.HelloRequest.FromString,
            response_serializer=stream_csv_pb2.HelloReply.SerializeToString,
        )
    }
    generic_handler = grpc.method_handlers_generic_handler(
        'stream_csv.Stream', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    add_DataServicer_to_server(DataService(), server)
    server.add_insecure_port('[::]:5000')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    serve()

错误

Error: 12 UNIMPLEMENTED: Method not found!
    at Object.exports.createStatusError (/home/shantam/Documents/grepsr/client/node_modules/grpc/src/common.js:91:15)
    at Object.onReceiveStatus (/home/shantam/Documents/grepsr/client/node_modules/grpc/src/client_interceptors.js:1209:28)
    at InterceptingListener._callNext (/home/shantam/Documents/grepsr/client/node_modules/grpc/src/client_interceptors.js:568:42)
    at InterceptingListener.onReceiveStatus (/home/shantam/Documents/grepsr/client/node_modules/grpc/src/client_interceptors.js:618:8)
    at callback (/home/shantam/Documents/grepsr/client/node_modules/grpc/src/client_interceptors.js:847:24) {
  code: 12,
  metadata: Metadata { _internal_repr: {}, flags: 0 },
  details: 'Method not found!'
}
4

1 回答 1

4

我为您的 Python(和 Golang)服务器实现编写了一个更简单的变体。

两者都按原样与您的 Node.JS 客户端一起工作。

我认为您的问题可能就像需要命名 rpc sayHello(而不是SayHello)一样简单。

from concurrent import futures
import logging

import grpc

import stream_csv_pb2
import stream_csv_pb2_grpc


class Stream(stream_csv_pb2_grpc.StreamServicer):

    def sayHello(self, request, context):
        logging.info("[sayHello]")
        return stream_csv_pb2.HelloReply(message='Hello, %s!' % request.name)


def serve():
    logging.info("[serve]")
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    stream_csv_pb2_grpc.add_StreamServicer_to_server(Stream(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()

和:

node client.js
Greeting: Hello, Freddie!
null

通常(!)gRPC 遵循 CamelCasing 服务、rpc 和消息名称的 Protobufs 样式指南,并在其他地方使用下划线,请参阅样式指南。编译时protoc,结果并不总是完全匹配(例如 Python 的 CamelCased 函数,而不是小写的下​​划线)。

按照惯例,您的原型将是:

service Stream {
  rpc CsvToObject(CSVDataRequest) returns (stream CSVDataResponse) {};
  rpc SayHello(HelloRequest) returns (HelloReply);
}

...然后 Python 生成的函数也将是 (!) SayHello

于 2020-10-19T18:54:44.353 回答