我正在尝试做的事情:当我对对象的状态进行更新时,应该通过 gRPC 流向所有 gRPC 客户端提供更新。重要的是每个客户端都获得每次更新,并且他们只获得一次。
我期望发生的事情:当我立即执行 event.set() 和 event.clear() 时,所有客户端都将运行一次,产生新的状态。
实际发生的情况:客户端缺少更新。例如,我的服务功能发送了 10 个版本更新。在客户端,它缺少这些更新,我会查看它在哪里更新 1 2 然后错过 3 或其他更新,然后再次开始获取它们。
服务器版本 1,这不起作用,因为客户端缺少一些更新:
class StatusStreamer(pb2_grpc.StatusServiceServicer):
def __init__(self, status, event):
self.continue_running = True
self.status = status
self.event = event
def StatusSubscribe(self, request, context):
while self.continue_running:
self.event.wait()
yield self.status
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
status = status_builder()
event = threading.Event()
status_streamer = StatusStreamer(status, event)
pb2_grpc.add_StatusServiceServicer_to_server(status_streamer, server)
server.add_insecure_port('[::]:50051')
server.start()
print('server started')
try:
while True:
_ = input('enter a key to update')
for _ in range(10):
#make an update and send it out to all clients
status.version = str(int(status.version) + 1)
print('update:',status.version)
event.set()
event.clear()
except KeyboardInterrupt:
print('\nstopping...')
event.set()
status_streamer.continue_running = False
server.stop(0)
服务器版本 2,此版本有效,但我认为存在竞争条件: 在第二个版本中,我没有使用 threading.Event,而是使用布尔值 new_update,它在所有线程之间共享。在 serve 函数中,我将其设置为 true,然后所有线程都将其设置为 False。
class StatusStreamer(pb2_grpc.StatusServiceServicer):
def __init__(self, status):
self.continue_running = True
self.new_update = False
self.status = status
def StatusSubscribe(self, request, context):
while self.continue_running:
if self.new_update:
yield self.status
self.new_update = False #race condition I believe, that maybe doesn't occur because of the GIL.
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
status = status_builder()
status_streamer = StatusStreamer(status)
pb2_grpc.add_StatusServiceServicer_to_server(status_streamer, server)
server.add_insecure_port('[::]:50051')
server.start()
print('server started')
try:
while True:
_ = input('enter a key to update')
for _ in range(10):
#make an update and send it out to all clients
status.version = str(int(status.version) + 1)
print('update:', status.version)
status_streamer.new_update = True #Also a race condition I believe.
except KeyboardInterrupt:
print('\nstopping...')
status_streamer.continue_running = False
server.stop(0)
我相信第二个版本之所以有效,是因为它依赖于 CPython 的全局解释器锁,确保没有线程会同时改变 new_update。我不喜欢这个解决方案,我有什么选择?另外,我知道我可以创建一个队列或列表并存储所有更改,然后跟踪每个连接的客户端所在的位置,我不想分配内存来执行此操作。