我解决这个问题的方法是使用上下文管理器。这将确保我长时间运行的命令被中止。关键逻辑是包装以模仿 SSHClient.exec_command 但捕获创建的通道并使用Timer
如果命令运行时间过长将关闭该通道。
import paramiko
import threading
class TimeoutChannel:
def __init__(self, client: paramiko.SSHClient, timeout):
self.expired = False
self._channel: paramiko.channel = None
self.client = client
self.timeout = timeout
def __enter__(self):
self.timer = threading.Timer(self.timeout, self.kill_client)
self.timer.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print("Exited Timeout. Timed out:", self.expired)
self.timer.cancel()
if exc_val:
return False # Make sure the exceptions are re-raised
if self.expired:
raise TimeoutError("Command timed out")
def kill_client(self):
self.expired = True
print("Should kill client")
if self._channel:
print("We have a channel")
self._channel.close()
def exec(self, command, bufsize=-1, timeout=None, get_pty=False, environment=None):
self._channel = self.client.get_transport().open_session(timeout=timeout)
if get_pty:
self._channel.get_pty()
self._channel.settimeout(timeout)
if environment:
self._channel.update_environment(environment)
self._channel.exec_command(command)
stdin = self._channel.makefile_stdin("wb", bufsize)
stdout = self._channel.makefile("r", bufsize)
stderr = self._channel.makefile_stderr("r", bufsize)
return stdin, stdout, stderr
现在使用代码非常简单,第一个示例将抛出一个TimeoutError
ssh = paramiko.SSHClient()
ssh.connect('hostname', username='user', password='pass')
with TimeoutChannel(ssh, 3) as c:
ssh_stdin, ssh_stdout, ssh_stderr = c.exec("cat") # non-blocking
exit_status = ssh_stdout.channel.recv_exit_status() # block til done, will never complete because cat wants input
此代码可以正常工作(除非主机处于疯狂的负载下!)
ssh = paramiko.SSHClient()
ssh.connect('hostname', username='user', password='pass')
with TimeoutChannel(ssh, 3) as c:
ssh_stdin, ssh_stdout, ssh_stderr = c.exec("uptime") # non-blocking
exit_status = ssh_stdout.channel.recv_exit_status() # block til done, will complete quickly
print(ssh_stdout.read().decode("utf8")) # Show results