3

我想实现一个可以被客户端调用来结束服务器流的方法。grpc python中有什么东西或者我该如何实现它。简而言之,我希望客户说“不再流式传输”。另外,我希望我的服务器在完成流式传输后不会触发回调。

我的原型如下所示:

syntax = "proto3";

service TestService {
  rpc GetOneToOne(TestRequest) returns (TestResponse) {}
  rpc GetOneToStream(TestRequest) returns (stream TestResponse) {}
  rpc GetStreamToOne(stream TestRequest) returns (TestResponse) {}
  rpc GetStreamToStream(stream TestRequest) returns (stream TestResponse) {}
}

message TestRequest {
  string message = 1;
}

message TestResponse {
  string message = 1;
}

客户端.py:

import grpc
import string
import random
from system_test.sw_test.vstars.lib.low_level._endpoint_lib.test import(
    test_pb2, test_pb2_grpc
)

 # open a gRPC channel
channel = grpc.insecure_channel('localhost:50052')

 # create a stub (client)
stub = test_pb2_grpc.TestServiceStub(channel)

# create a valid request message
number = test_pb2.TestRequest(message="pri")

# One to One
response1 = stub.GetOneToOne(number)

print(response1.message)

# stream to stream
iterator = iter([test_pb2.TestRequest(message=x) for x in ["gri","tree","dree","bri"]])
response4 = stub.GetStreamToStream(iterator)
for resp in response4:
    print("StreamToStream",resp)

服务器.py:

from system_test.sw_test.vstars.lib.low_level._endpoint_lib.test import(
    test_pb2, test_pb2_grpc
)
from system_test.sw_test.vstars.lib.low_level import endpoint_lib

import grpc
from concurrent import futures
import time
from queue import Empty
import queue


class _Servicer(test_pb2_grpc.TestServiceServicer):
    def GetOneToOne(self, request, context):
        return test_pb2.TestResponse(message='response: {}'.format(request.message))

    def GetStreamToStream(self, request_iterator, context):
        # yield from map(
        #     lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
        #     request_iterator
        # )
        def remove_feed():
            print("fsdfdsf",context)

        def stop_stream():
            remove_feed()

        # context.add_callback(stop_stream)
        i=0
        while True:
            try:
                print("in here",i)
                yield from map(
                    lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
                    request_iterator
                )
                i+=1
            except KeyboardInterrupt:
                print("Key")
                context.add_callback(stop_stream)

# create a gRPC server
rx_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
test_pb2_grpc.add_TestServiceServicer_to_server(_Servicer(), rx_server)

# listen on port 50051
print('Starting server. Listening on port 50051.')
rx_server.add_insecure_port('[::]:50052')
rx_server.start()

# since server.start() will not block,
# a sleep-loop is added to keep alive
try:
    while True:
        time.sleep(86400)
except KeyboardInterrupt:
    rx_server.stop(0)

4

1 回答 1

0

基于gRPC python 示例,您应该能够cancel使用存根调用方法。

客户端.py:

...
...
response4 = stub.GetStreamToStream(iterator)
response4.cancel()
于 2022-02-01T19:22:31.163 回答