3

我正在尝试使用 GRPC 从客户端调用服务器方法,但出现以下错误:从对等 ipv4 收到错误

我试图为此找到解决方案,但现在已经超过一天了,无法弄清楚,请有人帮忙。非常感谢任何帮助

服务器原型文件(chunk.proto):

syntax = "proto3";

service FileServer {
  rpc upload_chunk_stream(stream Chunk) returns (Reply) {}
  rpc upload_single_chunk(Chunk) returns (Reply) {}
  rpc download_chunk_stream(Request) returns (stream Chunk) {}
  rpc get_available_memory_bytes(Empty_request) returns (Reply_double) {}
  rpc get_stored_hashes_list_iterator(Empty_request) returns (stream Reply_string) {}
  rpc hash_id_exists_in_memory(Request) returns (Reply) {}
}

message Chunk {
  bytes buffer = 1;
}

message Request {
  string hash_id = 1;
}

message Reply {
  bool success = 1;
}

message Reply_double {
  double bytes = 1;
}

message Empty_request {}

message Reply_string {
  string hash_id = 1;
}

在服务器端生成代码,生成文件:chunk_pb2_grpc.py 和 chunk_pb2.py

以下是服务器文件:

存储管理器.py

import sys

sys.path.append('./')

import grpc
import time
import chunk_pb2, chunk_pb2_grpc
from MemoryManager import MemoryManager

CHUNK_SIZE_ = 1024

class StorageManagerServer(chunk_pb2_grpc.FileServerServicer):

    def __init__(self, memory_node_bytes, page_memory_size_bytes):
        self.memory_manager = MemoryManager(memory_node_bytes, page_memory_size_bytes)

    def upload_chunk_stream(self, request_iterator, context):
        hash_id = ""
        chunk_size = 0
        number_of_chunks = 0
        print("inside")

        for key, value in context.invocation_metadata():
            if key == "key-hash-id":
                hash_id = value
            elif key == "key-chunk-size":
                chunk_size = int(value)
            elif key == "key-number-of-chunks":
                number_of_chunks = int(value)

        assert hash_id != ""
        assert chunk_size != 0
        assert number_of_chunks != 0

        success = self.memory_manager.put_data(request_iterator, hash_id, number_of_chunks, False)
        return chunk_pb2.Reply(success=success)

StartNodeExample.py

import sys
sys.path.append('./')

from StorageManager import StorageManagerServer
import grpc
import time
import chunk_pb2, chunk_pb2_grpc
from concurrent import futures

if __name__ == '__main__':
    print("Starting Storage Manager.")
    server_grpc = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
    total_memory_node_bytes = 1 * 1024 * 1024 * 1024  # start with 1 GB
    CHUNK_SIZE_ = 1024
    total_page_memory_size_bytes = CHUNK_SIZE_
    chunk_pb2_grpc.add_FileServerServicer_to_server(StorageManagerServer(total_memory_node_bytes, total_page_memory_size_bytes), server_grpc)
    # port = 9999
    # server_grpc.add_insecure_port(f'[::]:{port}')
    server_grpc.add_insecure_port('[::]:9999')
    server_grpc.start()

    print("Storage Manager is READY.")

    try:
        while True:
            time.sleep(60 * 60 * 24)  # should infinity
    except KeyboardInterrupt:
        server_grpc.stop(0)

客户端原型文件(chunk.proto):

syntax = "proto3";

service FileServer {
    rpc upload_chunk_stream(stream Chunk) returns (Reply) {}
    rpc upload_single_chunk(Chunk) returns (Reply) {}
    rpc download_chunk_stream(Request) returns (stream Chunk) {}
}

message Chunk {
  bytes buffer = 1;
}

message Request {
  string hash_id = 1;
}

message Reply {
  bool success = 1;
}

客户端生成代码,生成文件:chunk_pb2_grpc.py 和 chunk_pb2.py

以下是客户端文件:

grpc_client.py

import grpc
import chunk_pb2
import chunk_pb2_grpc
import threading
import io
import hashlib

CHUNK_SIZE = 1024 * 1024 * 4  # 4MB

def get_file_byte_chunks(f):
    while True:
        piece = f.read(CHUNK_SIZE)
        if len(piece) == 0:
            return
        yield chunk_pb2.Chunk(buffer=piece)

class Client:
    def __init__(self, address):
            channel = grpc.insecure_channel(address)
            self.stub = chunk_pb2_grpc.FileServerStub(channel)

    def upload(self, f, f_name):
        print("Inside here")
        hash_object = hashlib.sha1(f_name.encode())
        hex_dig = hash_object.hexdigest()
        print(hex_dig)
        chunks_generator = get_file_byte_chunks(f)
        metadata = (
            ('key-hash-id', hex_dig),
            ('key-chunk-size', str(CHUNK_SIZE))
        )
        response = self.stub.upload_chunk_stream(chunks_generator, metadata=metadata)

server.py(Flask REST API 服务器)

from flask import Flask, url_for, send_from_directory, request
import logging, os
from werkzeug.utils import secure_filename
from flask import jsonify, make_response
# import worker
import grpc_client

app = Flask(__name__)
@app.route('/', methods = ['POST'])
def api_root():
    app.logger.info(PROJECT_HOME)
    if request.method == 'POST' and request.files['image']:
        app.logger.info(app.config['UPLOAD_FOLDER'])
        img = request.files['image']
        img_name = secure_filename(img.filename)
        client = grpc_client.Client('127.0.0.1:9999')
        client.upload(img, img_name)
        return make_response(jsonify({"success":True}),200)
    else:
        return "Where is the file?"

收到错误:

[2019-12-15 03:08:55,973] ERROR in app: Exception on / [POST]
Traceback (most recent call last):
  File "/Users/wamiqueansari/Documents/275_gash/project/final/Tracking/venv/lib/python3.7/site-packages/flask/app.py", line 1982, in wsgi_app
    response = self.full_dispatch_request()
  File "/Users/wamiqueansari/Documents/275_gash/project/final/Tracking/venv/lib/python3.7/site-packages/flask/app.py", line 1614, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/Users/wamiqueansari/Documents/275_gash/project/final/Tracking/venv/lib/python3.7/site-packages/flask/app.py", line 1517, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/Users/wamiqueansari/Documents/275_gash/project/final/Tracking/venv/lib/python3.7/site-packages/flask/_compat.py", line 33, in reraise
    raise value
  File "/Users/wamiqueansari/Documents/275_gash/project/final/Tracking/venv/lib/python3.7/site-packages/flask/app.py", line 1612, in full_dispatch_request
    rv = self.dispatch_request()
  File "/Users/wamiqueansari/Documents/275_gash/project/final/Tracking/venv/lib/python3.7/site-packages/flask/app.py", line 1598, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "server.py", line 32, in api_root
    client.upload(img, img_name)
  File "/Users/wamiqueansari/Documents/275_gash/project/final/Tracking/grpc_client.py", line 32, in upload
    response = self.stub.upload_chunk_stream(chunks_generator, metadata=metadata)
  File "/Users/wamiqueansari/Documents/275_gash/project/final/Tracking/venv/lib/python3.7/site-packages/grpc/_channel.py", line 871, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/Users/wamiqueansari/Documents/275_gash/project/final/Tracking/venv/lib/python3.7/site-packages/grpc/_channel.py", line 592, in _end_unary_response_blocking
    raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Exception calling application: "
        debug_error_string = "{"created":"@1576408135.973233000","description":"Error received from peer ipv4:127.0.0.1:9999","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Exception calling application: ","grpc_status":2}"
4

0 回答 0