我想实现一个可以被客户端调用来结束服务器流的方法。grpc python中有什么东西或者我该如何实现它。简而言之,我希望客户说“不再流式传输”。另外,我希望我的服务器在完成流式传输后不会触发回调。
我的原型如下所示:
syntax = "proto3";
service TestService {
rpc GetOneToOne(TestRequest) returns (TestResponse) {}
rpc GetOneToStream(TestRequest) returns (stream TestResponse) {}
rpc GetStreamToOne(stream TestRequest) returns (TestResponse) {}
rpc GetStreamToStream(stream TestRequest) returns (stream TestResponse) {}
}
message TestRequest {
string message = 1;
}
message TestResponse {
string message = 1;
}
客户端.py:
import grpc
import string
import random
from system_test.sw_test.vstars.lib.low_level._endpoint_lib.test import(
test_pb2, test_pb2_grpc
)
# open a gRPC channel
channel = grpc.insecure_channel('localhost:50052')
# create a stub (client)
stub = test_pb2_grpc.TestServiceStub(channel)
# create a valid request message
number = test_pb2.TestRequest(message="pri")
# One to One
response1 = stub.GetOneToOne(number)
print(response1.message)
# stream to stream
iterator = iter([test_pb2.TestRequest(message=x) for x in ["gri","tree","dree","bri"]])
response4 = stub.GetStreamToStream(iterator)
for resp in response4:
print("StreamToStream",resp)
服务器.py:
from system_test.sw_test.vstars.lib.low_level._endpoint_lib.test import(
test_pb2, test_pb2_grpc
)
from system_test.sw_test.vstars.lib.low_level import endpoint_lib
import grpc
from concurrent import futures
import time
from queue import Empty
import queue
class _Servicer(test_pb2_grpc.TestServiceServicer):
def GetOneToOne(self, request, context):
return test_pb2.TestResponse(message='response: {}'.format(request.message))
def GetStreamToStream(self, request_iterator, context):
# yield from map(
# lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
# request_iterator
# )
def remove_feed():
print("fsdfdsf",context)
def stop_stream():
remove_feed()
# context.add_callback(stop_stream)
i=0
while True:
try:
print("in here",i)
yield from map(
lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
request_iterator
)
i+=1
except KeyboardInterrupt:
print("Key")
context.add_callback(stop_stream)
# create a gRPC server
rx_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
test_pb2_grpc.add_TestServiceServicer_to_server(_Servicer(), rx_server)
# listen on port 50051
print('Starting server. Listening on port 50051.')
rx_server.add_insecure_port('[::]:50052')
rx_server.start()
# since server.start() will not block,
# a sleep-loop is added to keep alive
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
rx_server.stop(0)