3

我用 python 构建了一个 grpc 服务器,并尝试使用 werkzeug Local 和 LocalProxy 处理一些线程本地存储,类似于 flask 所做的。

我面临的问题是,当我从服务器拦截器将一些数据存储在本地,然后尝试从服务端检索它时,本地是空的。真正的问题是,由于某种原因,拦截器在与服务程序不同的 greenlet 中运行,因此从 werkzeug.local 开始,不可能跨请求共享数据。对于应该属于同一请求的数据,存储最终会使用不同的密钥。

使用 python 线程库也会发生同样的情况,看起来拦截器是从主线程或与服务程序不同的线程运行的。有解决方法吗?我本来希望拦截器在同一个线程中运行,从而允许这种事情。

# Define a global somewhere
from werkzeug.local import Local
local = Local()

# from an interceptor save something
local.message = "test msg"

# from the service access it
local.service_var = "test"
print local.message  # this throw a AttributeError

# print the content of local
print local.__storage__  # we have 2 entries in the storage, 2 different greenlets, but we are in the same request.
4

2 回答 2

1

拦截器确实在与处理线程不同的服务线程上运行。服务线程负责服务服务者和拦截服务者处理程序。拦截器返回 servicer 方法处理程序后,服务线程会将其提交到thread_poolat _server.py#L525

# Take unary unary call as an example.
# The method_handler is the returned object from interceptor.
def _handle_unary_unary(rpc_event, state, method_handler, thread_pool):
    unary_request = _unary_request(rpc_event, state,
                                   method_handler.request_deserializer)
    return thread_pool.submit(_unary_response_in_pool, rpc_event, state,
                              method_handler.unary_unary, unary_request,
                              method_handler.request_deserializer,
                              method_handler.response_serializer)

至于解决方法,我只能想象在初始化期间将存储实例同时传递给拦截器和服务程序。之后,存储可以用作成员变量。

class StorageServerInterceptor(grpc.ServerInterceptor):

    def __init__(self, storage):
        self._storage = storage

    def intercept_service(self, continuation, handler_call_details):
        key = ...
        value = ...
        self._storage.set(key, value)
        ...
        return continuation(handler_call_details)

class Storage(...StorageServicer):

    def __init__(self, storage):
        self._storage = storage

    ...Servicer Handlers...
于 2018-11-28T21:32:42.753 回答
0

您还可以包装所有将被调用的函数并将线程设置为本地,并返回一个带有包装函数的新处理程序。

class MyInterceptor(grpc.ServerInterceptor):
  def wrap_handler(self, original_handler: grpc.RpcMethodHandler):
    if original_handler.unary_unary is not None:
        unary_unary = original_handler.unary_unary
        def wrapped_unary_unary(*args, **kwargs):
            threading.local().my_var = "hello"
            return unary_unary(*args, **kwargs)
        new_unary_unary = wrapped_unary_unary
    else:
        new_unary_unary = None

    ...
    # do this for all the combinations to make new_unary_stream, new_stream_unary, new_stream_stream

    
    new_handler = grpc.RpcMethodHandler()
    new_handler.request_streaming=original_handler.request_streaming
    new_handler.response_streaming=original_handler.response_streaming
    new_handler.request_deserializer=original_handler.request_deserializer
    new_handler.response_serializer=original_handler.response_serializer
    new_handler.unary_unary=new_unary_unary
    new_handler.unary_stream=new_unary_stream
    new_handler.stream_unary=new_stream_unary
    new_handler.stream_stream=new_stream_stream
    return new_handler

  def intercept_service(self, continuation, handler_call_details):
    return self.wrap_handler(continuation(handler_call_details))
于 2022-03-02T01:30:55.733 回答