我正在尝试利用 protobuf.js 并为其提供传输层(rpcimpl),因为它与传输无关。
我可以通过protobuf(loadSync,lookup)将所有proto文件成功转换为直接grpc客户端和服务器到grpc(loadObject)。这使我能够通过测试获得服务器和客户端的具体 grpc 实现。下一步是测试 protobuf 客户端(不稳定)到 grpc 服务器稳定。这更多是出于好奇,看看我们是否可以独立于 grpc 的库本身而只使用 protobuf.js 。
我失败的测试总是向 grpc 服务器发送一个空缓冲区。导致“非法缓冲区错误”。
客户端.js
const http2 = require('http2');
const {
// HTTP2_HEADER_AUTHORITY,
HTTP2_HEADER_CONTENT_TYPE,
HTTP2_HEADER_CONTENT_LENGTH,
HTTP2_HEADER_METHOD,
HTTP2_HEADER_PATH,
// HTTP2_HEADER_SCHEME,
HTTP2_HEADER_TE
// HTTP2_HEADER_USER_AGENT
} = http2.constants;
const MIN_CONNECT_TIMEOUT_MS = 20000;
const { Duplex } = require('stream');
function bufferToStream(buffer) {
const stream = new Duplex();
stream.push(buffer);
stream.push(null);
return stream;
}
// taken from googles grpc-js channel
// https://github.com/grpc/grpc-node/blob/master/packages/grpc-js-core/src/channel.ts#L181-L190
function clientTimeout({ client, deadline }, cb) {
const now = new Date();
const connectionTimeout = Math.max(
deadline.getTime() - now.getTime(),
MIN_CONNECT_TIMEOUT_MS
);
const id = setTimeout(() => {
cb(new Error('connection timed out!'));
client.close();
}, connectionTimeout);
return id;
}
function makeClient(connString, cb) {
const deadline = new Date();
const client = http2.connect(connString, {});
client.on('socketError', cb);
client.on('error', cb);
const connectionTimerId = clientTimeout({ deadline, client }, cb);
client.on('connect', () => {
clearTimeout(connectionTimerId);
cb(null, client);
});
}
function fullNameToPath(fullName) {
return fullName.replace(/(\.)(.*)(\.)/, /$2/);
}
module.exports = class Client {
constructor(connString) {
// this.options = url.parse(connString); needed for http2.js
this.connString = connString;
this.rpcImpl = this.rpcImpl.bind(this);
}
rpcImpl(pbMethod, payload, cb) {
makeClient(this.connString, (err, client) => {
try {
const path = fullNameToPath(pbMethod.fullName);
const req = client.request({
[HTTP2_HEADER_METHOD]: 'POST',
[HTTP2_HEADER_PATH]: path,
[HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc',
// [HTTP2_HEADER_CONTENT_LENGTH]: payload.length,
[HTTP2_HEADER_TE]: 'trailers'
});
// req.once('response', (headers) => {
// const grpcStatus = parseInt(headers['grpc-status'], 10);
// 0 IS OK
// if (grpcStatus) {
// cb(new Error(headers['grpc-message']));
// }
// });
const data = [];
// req.once('error', cb);
req.on('data', (chunk) => {
data.push(chunk);
});
req.once('end', () => {
if (data.length) {
cb(null, Buffer.from(data.join()));
}
client.destroy();
});
// bufferToStream(payload).pipe(req);
req.write(payload, null, cb);
req.end();
} catch (e) {
cb(e);
}
});
}
};
getProtoPath.js
module.exports = (_paths = [__dirname, 'src']) => (...args) => {
let paths = _paths;
if (!Array.isArray(paths)) {
paths = [paths];
}
return path.join.apply(null, paths.concat(args));
};
helloworld.proto
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
helloworld.spec.js
const sinon = require('sinon');
const { expect } = require('chai');
const { Server, ServerCredentials } = require('grpc');
const Client = require('../../../src/protobufjs/transports/http2/Client');
const getProtoPath = require('./getProtoPath');
const { loadSync } = require('google-proto-files');
const protPath = getProtoPath(__dirname)('../../protos/helloworld.proto');
const URI = '127.0.0.1:50061';
const client = new Client(`http://${URI}`);
const makeServiceImpl = () => ({
sayHello: sinon.stub().yields(null, {
message: 'Hello James'
})
});
// const debug = require('../../debug').spawn('test:protobufjs');
describe('Client protobufjs (rpcimpl)', () => {
let protobufSvc, api, server, grpcServerSvcImpl;
// using GRPC server as a baseline of a real grpc server
function initServer() {
server = new Server();
server.bind(URI, ServerCredentials.createInsecure());
server.addService(
api.toGrpc().Greeter.service,
(grpcServerSvcImpl = makeServiceImpl())
);
server.start();
}
describe('client', () => {
beforeEach(() => {
api = loadSync(protPath).lookup('helloworld');
protobufSvc = api.Greeter.create(client.rpcImpl);
initServer();
});
afterEach(() => {
if (server) server.forceShutdown();
// eslint-disable-next-line no-multi-assign
protobufSvc = undefined;
api = undefined;
server = undefined;
grpcServerSvcImpl = undefined;
});
it('created', () => {
expect(protobufSvc).to.be.ok;
});
describe('SayHello', () => {
it('callback', (done) => {
// eslint-disable-next-line
protobufSvc.sayHello({ name: 'Bond' }, (err, resp) => {
if (err) {
return done(err); // YAY ILLEGAL BUFFER, HELP!!
}
expect(grpcServerSvcImpl.sayHello.called).to.be.ok;
expect(resp.message).to.equal('Hello James');
done();
});
});
});
});
});