0

我正在尝试做的事情:当我对对象的状态进行更新时,应该通过 gRPC 流向所有 gRPC 客户端提供更新。重要的是每个客户端都获得每次更新,并且他们只获得一次。

我期望发生的事情:当我立即执行 event.set() 和 event.clear() 时,所有客户端都将运行一次,产生新的状态。

实际发生的情况:客户端缺少更新。例如,我的服务功能发送了 10 个版本更新。在客户端,它缺少这些更新,我会查看它在哪里更新 1 2 然后错过 3 或其他更新,然后再次开始获取它们。

服务器版本 1,这不起作用,因为客户端缺少一些更新:

class StatusStreamer(pb2_grpc.StatusServiceServicer):
    def __init__(self, status, event):
        self.continue_running = True
        self.status = status
        self.event = event


    def StatusSubscribe(self, request, context):
        while self.continue_running:
            self.event.wait()
            yield self.status


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    status = status_builder()
    event = threading.Event()
    status_streamer = StatusStreamer(status, event)
    pb2_grpc.add_StatusServiceServicer_to_server(status_streamer, server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print('server started')
    try:
        while True:
            _ = input('enter a key to update')
            for _ in range(10):
                #make an update and send it out to all clients
                status.version = str(int(status.version) + 1)
                print('update:',status.version)
                event.set()
                event.clear()
    except KeyboardInterrupt:
        print('\nstopping...')
        event.set()
        status_streamer.continue_running = False
        server.stop(0)

服务器版本 2,此版本有效,但我认为存在竞争条件: 在第二个版本中,我没有使用 threading.Event,而是使用布尔值 new_update,它在所有线程之间共享。在 serve 函数中,我将其设置为 true,然后所有线程都将其设置为 False。

class StatusStreamer(pb2_grpc.StatusServiceServicer):
    def __init__(self, status):
        self.continue_running = True
        self.new_update = False
        self.status = status


    def StatusSubscribe(self, request, context):
        while self.continue_running:
            if self.new_update:
                yield self.status
                self.new_update = False #race condition I believe, that maybe doesn't occur because of the GIL.  




def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    status = status_builder()
    status_streamer = StatusStreamer(status)
    pb2_grpc.add_StatusServiceServicer_to_server(status_streamer, server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print('server started')
    try:
        while True:
            _ = input('enter a key to update')
            for _ in range(10):
                #make an update and send it out to all clients
                status.version = str(int(status.version) + 1)
                print('update:', status.version)
                status_streamer.new_update = True #Also a race condition I believe.
    except KeyboardInterrupt:
        print('\nstopping...')
        status_streamer.continue_running = False
        server.stop(0)

我相信第二个版本之所以有效,是因为它依赖于 CPython 的全局解释器锁,确保没有线程会同时改变 new_update。我不喜欢这个解决方案,我有什么选择?另外,我知道我可以创建一个队列或列表并存储所有更改,然后跟踪每个连接的客户端所在的位置,我不想分配内存来执行此操作。

4

1 回答 1

1

对于服务器版本 1,缺少更新的原因是主线程一旦持有 GIL,它可能会在将 GILevent.set()让给其他线程之前执行多次。所以其他线程可能不会被 阻塞event.wait(),并导致丢失更新。一个潜在的解决方法是保留一个连接计数器,并阻止版本更新,直到服务器向所有连接发送更新。

对于服务器版本 2,使用threading.Lockorthreading.RLock可以解决您的竞争条件。此外,这个版本会在标志检查中消耗大量的 CPU 周期,可能会损害您在其他线程中的业务逻辑。也可能是主线程持有 GIL 的时间过长,以至于服务器尚未向所有连接发送消息。

不幸的是,我没有完美的解决方案来满足您的要求。gRPC 团队在https://github.com/grpc/grpc/blob/v1.18.x/src/python/grpcio_health_checking/grpc_health/v1/health.py有一个具有类似功能的服务实现。

在实现中,服务端会保留返回的响应迭代器的引用。当状态更新时,服务者将显式地添加消息到相应的响应迭代器。因此,状态更新不会错过。

希望这可以回答你的问题。

于 2019-03-06T20:24:00.423 回答