0

我正在尝试利用 protobuf.js 并为其提供传输层(rpcimpl),因为它与传输无关。

我可以通过protobuf(loadSync,lookup)将所有proto文件成功转换为直接grpc客户端和服务器到grpc(loadObject)。这使我能够通过测试获得服务器和客户端的具体 grpc 实现。下一步是测试 protobuf 客户端(不稳定)到 grpc 服务器稳定。这更多是出于好奇,看看我们是否可以独立于 grpc 的库本身而只使用 protobuf.js 。

我失败的测试总是向 grpc 服务器发送一个空缓冲区。导致“非法缓冲区错误”。

客户端.js

const http2 = require('http2');

const {
  // HTTP2_HEADER_AUTHORITY,
  HTTP2_HEADER_CONTENT_TYPE,
  HTTP2_HEADER_CONTENT_LENGTH,
  HTTP2_HEADER_METHOD,
  HTTP2_HEADER_PATH,
  // HTTP2_HEADER_SCHEME,
  HTTP2_HEADER_TE
  // HTTP2_HEADER_USER_AGENT
} = http2.constants;

const MIN_CONNECT_TIMEOUT_MS = 20000;

const { Duplex } = require('stream');

function bufferToStream(buffer) {
  const stream = new Duplex();
  stream.push(buffer);
  stream.push(null);
  return stream;
} 

// taken from googles grpc-js channel
// https://github.com/grpc/grpc-node/blob/master/packages/grpc-js-core/src/channel.ts#L181-L190
function clientTimeout({ client, deadline }, cb) {
  const now = new Date();

  const connectionTimeout = Math.max(
    deadline.getTime() - now.getTime(),
    MIN_CONNECT_TIMEOUT_MS
  );

  const id = setTimeout(() => {
    cb(new Error('connection timed out!'));
    client.close();
  }, connectionTimeout);
  return id;
}

function makeClient(connString, cb) {
  const deadline = new Date();
  const client = http2.connect(connString, {});
  client.on('socketError', cb);
  client.on('error', cb);

  const connectionTimerId = clientTimeout({ deadline, client }, cb);

  client.on('connect', () => {
    clearTimeout(connectionTimerId);
    cb(null, client);
  });
}

function fullNameToPath(fullName) {
  return fullName.replace(/(\.)(.*)(\.)/, /$2/);
}

module.exports = class Client {
  constructor(connString) {
    // this.options = url.parse(connString); needed for http2.js
    this.connString = connString;
    this.rpcImpl = this.rpcImpl.bind(this);
  }


  rpcImpl(pbMethod, payload, cb) {
    makeClient(this.connString, (err, client) => {
      try {
        const path = fullNameToPath(pbMethod.fullName);
        const req = client.request({
          [HTTP2_HEADER_METHOD]: 'POST',
          [HTTP2_HEADER_PATH]: path,
          [HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc',
          // [HTTP2_HEADER_CONTENT_LENGTH]: payload.length,
          [HTTP2_HEADER_TE]: 'trailers'
        });

        // req.once('response', (headers) => {
          // const grpcStatus = parseInt(headers['grpc-status'], 10);
          // 0 IS OK
          // if (grpcStatus) {
            // cb(new Error(headers['grpc-message']));
          // }
        // });

        const data = [];
        // req.once('error', cb);
        req.on('data', (chunk) => {
          data.push(chunk);
        });
        req.once('end', () => {
          if (data.length) {
            cb(null, Buffer.from(data.join()));
          }
          client.destroy();
        });
        // bufferToStream(payload).pipe(req);
        req.write(payload, null, cb);
        req.end();
      } catch (e) {
        cb(e);
      }
    });
  }
};

getProtoPath.js

module.exports = (_paths = [__dirname, 'src']) => (...args) => {
  let paths = _paths;
  if (!Array.isArray(paths)) {
    paths = [paths];
  }
  return path.join.apply(null, paths.concat(args));
};

helloworld.proto

// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

helloworld.spec.js

const sinon = require('sinon');
const { expect } = require('chai');
const { Server, ServerCredentials } = require('grpc');
const Client = require('../../../src/protobufjs/transports/http2/Client');

const getProtoPath = require('./getProtoPath');
const { loadSync } = require('google-proto-files');

const protPath = getProtoPath(__dirname)('../../protos/helloworld.proto');
const URI = '127.0.0.1:50061';

const client = new Client(`http://${URI}`);

const makeServiceImpl = () => ({
  sayHello: sinon.stub().yields(null, {
    message: 'Hello James'
  })
});

// const debug = require('../../debug').spawn('test:protobufjs');
describe('Client protobufjs (rpcimpl)', () => {
  let protobufSvc, api, server, grpcServerSvcImpl;

  // using GRPC server as a baseline of a real grpc server
  function initServer() {
    server = new Server();

    server.bind(URI, ServerCredentials.createInsecure());
    server.addService(
      api.toGrpc().Greeter.service,
      (grpcServerSvcImpl = makeServiceImpl())
    );
    server.start();
  }

  describe('client', () => {
    beforeEach(() => {
      api = loadSync(protPath).lookup('helloworld');
      protobufSvc = api.Greeter.create(client.rpcImpl);

      initServer();
    });

    afterEach(() => {
      if (server) server.forceShutdown();
      // eslint-disable-next-line no-multi-assign
      protobufSvc = undefined;
      api = undefined;
      server = undefined;
      grpcServerSvcImpl = undefined;
    });

    it('created', () => {
      expect(protobufSvc).to.be.ok;
    });

    describe('SayHello', () => {
      it('callback', (done) => {
        // eslint-disable-next-line
        protobufSvc.sayHello({ name: 'Bond' }, (err, resp) => {
          if (err) {
            return done(err); // YAY ILLEGAL BUFFER, HELP!!
          }
          expect(grpcServerSvcImpl.sayHello.called).to.be.ok;
          expect(resp.message).to.equal('Hello James');
          done();
        });
      });
    });
  });
});
4

0 回答 0