我的存根使用了很多方法,我无法在每次使用它们时重新创建通道并关闭它们,而且我无法将它们全部使用,因为它们是随机的。所以我希望我创建的频道在超时时自动关闭。我将为我的根方法创建通道,其他使用根方法的通道,如果不再使用此根方法,我希望能够自动关闭通道,或者如果此根方法不在长时间使用会自动关闭频道
@staticmethod
def _create_client(workspace):
workspace._client = TreeLabClient(token=workspace.token)
def __init__(self, token: str):
self.metadata = [('access_token', token)]
if not os.getenv('ip'):
os.environ['ip'] = get_env_ip()
self.channel = grpc.insecure_channel(os.getenv('ip'))
self.client = treelab_service.TreeLabApiServiceStub(self.channel)
self.connectivity_state = _ChannelConnectivityState(self.channel)
self.stub = treelab_service.TreeLabApiServiceStub(self.channel)
def close(self):
self.channel.close()
@wait("WorkspaceCreated")
def create_workspace(self, create_workspace_input: CreateWorkspaceInput):
return self.stub.CreateWorkspace, create_workspace_input, self.metadata
@wait("WorkspaceUpdated")
def update_workspace(self, update_workspace_input: UpdateWorkspaceInput):
return self.stub.UpdateWorkspace, update_workspace_input, self.metadata
@wait("CoreCreated")
def add_core(self, add_core_input):
return self.stub.AddCore, add_core_input, self.metadata
@wait("CoreRemoved")
def remove_core(self, remove_core_input: RemoveCoreInput):
return self.stub.RemoveCore, remove_core_input, self.metadata
@wait("CoreUpdated")
def update_core(self, update_core_input: UpdateCoreInput):
return self.stub.UpdateCore, update_core_input, self.metadata
@wait("TableCreated")
def add_table(self, add_table_input: AddTableInput):
return self.stub.AddTable, add_table_input, self.metadata
我希望能够让 GRPC 的通道设置过期时间