1

我的存根使用了很多方法,我无法在每次使用它们时重新创建通道并关闭它们,而且我无法将它们全部使用,因为它们是随机的。所以我希望我创建的频道在超时时自动关闭。我将为我的根方法创建通道,其他使用根方法的通道,如果不再使用此根方法,我希望能够自动关闭通道,或者如果此根方法不在长时间使用会自动关闭频道

@staticmethod
    def _create_client(workspace):
        workspace._client = TreeLabClient(token=workspace.token)


   def __init__(self, token: str):
        self.metadata = [('access_token', token)]
        if not os.getenv('ip'):
            os.environ['ip'] = get_env_ip()
        self.channel = grpc.insecure_channel(os.getenv('ip'))
        self.client = treelab_service.TreeLabApiServiceStub(self.channel)
        self.connectivity_state = _ChannelConnectivityState(self.channel)
        self.stub = treelab_service.TreeLabApiServiceStub(self.channel)
    def close(self):
        self.channel.close()
    @wait("WorkspaceCreated")
    def create_workspace(self, create_workspace_input: CreateWorkspaceInput):
        return self.stub.CreateWorkspace, create_workspace_input, self.metadata
    @wait("WorkspaceUpdated")
    def update_workspace(self, update_workspace_input: UpdateWorkspaceInput):
        return self.stub.UpdateWorkspace, update_workspace_input, self.metadata
    @wait("CoreCreated")
    def add_core(self, add_core_input):
        return self.stub.AddCore, add_core_input, self.metadata
    @wait("CoreRemoved")
    def remove_core(self, remove_core_input: RemoveCoreInput):
        return self.stub.RemoveCore, remove_core_input, self.metadata
    @wait("CoreUpdated")
    def update_core(self, update_core_input: UpdateCoreInput):
        return self.stub.UpdateCore, update_core_input, self.metadata
    @wait("TableCreated")
    def add_table(self, add_table_input: AddTableInput):
        return self.stub.AddTable, add_table_input, self.metadata

我希望能够让 GRPC 的通道设置过期时间

4

0 回答 0