下面的类旨在操作类似 cisco 的设备接口,以执行命令和更新配置元素。
按照目前的情况,我可以实例化类,调用ssh_to_aos_expsh
函数并取回有效输出(例如,当命令为“show running-config”时获取配置)。但是,当我调用ssh_to_aos_config
函数(调用ssh_to_aos_expsh
函数)时,我得到一个 pexpect 超时错误。
我已经比较了 to 返回的 pexpect 对象( 和 中的“孩子” _ssh_connect
)ssh_to_aos_expsh
与tossh_to_aos_config
返回的对象,它似乎位于相同的内存位置,所以我不清楚为什么我不能继续使用 pexpect 操作对象。_ssh_connect
ssh_to_aos_expsh
ssh_to_aos_expsh
ssh_toaos_config
我不是最复杂的 python 编码器,所以我可能在尝试在函数之间传递 pexpect 对象时犯了一些无意的错误,如果是这样,我会很感激有人指出我的错误。
#!/usr/bin/env python
import os
import traceback
import pexpect
class SSHTool():
def __init__(self):
self.aos_user = 'some_user'
self.aos_passwd = 'some_passwd'
self.aos_init_prompt = 'accelerator>'
self.aos_enable_prompt = 'accelerator#'
self.aos_lnxsh_prompt = 'ACC#'
self.linux_passwd = 'linux_passwd'
self.root_prompt = ''
def _timeout_error(self, child):
print 'SSH could not login. Timeout error.'
print child.before, child.after
return None
def _password_error(self, child):
print 'SSH could not login. Password error.'
print child.before, child.after
return None
def _ssh_connect(self, user, address, passwd):
self.root_prompt = "root@%s's password: " % address
ssh_newkey = "Are you sure you want to continue connecting"
child = pexpect.spawn('ssh -l %s %s' % (user, address))
i = child.expect([pexpect.TIMEOUT, \
ssh_newkey, \
'Password: ', \
self.root_prompt])
if i == 0: # Timeout
return self._timeout_error(child)
elif i == 1: # SSH does not have the public key. Just accept it.
child.sendline ('yes')
i = child.expect([pexpect.TIMEOUT, \
'Password: ', \
self.root_prompt])
if i == 0: # Timeout
return self._timeout_error(child)
else:
child.sendline(passwd)
return child
elif i == 2 or i == 3:
child.sendline(passwd)
return child
else:
return self._password_error(child)
def ssh_to_aos_expsh(self, ip_address, command = ''):
child = self._ssh_connect(self.aos_user, \
ip_address, \
self.aos_passwd)
i = child.expect([pexpect.TIMEOUT, \
self.aos_init_prompt])
if i == 0:
return self._timeout_error(child)
child.sendline('enable')
i = child.expect([pexpect.TIMEOUT, \
self.aos_enable_prompt])
if i == 0:
return self._timeout_error(child)
if command:
child.sendline(command)
i = child.expect([pexpect.TIMEOUT, \
self.aos_enable_prompt])
if i == 0:
return self._timeout_error(child)
else:
return child.before
else:
return child
def ssh_to_aos_config(self, ip_address, command):
child = self.ssh_to_aos_expsh(ip_address)
i = child.expect([pexpect.TIMEOUT, \
self.aos_enable_prompt])
if i == 0:
return self._timeout_error(child)
child.sendline('config')
i = child.expect([pexpect.TIMEOUT, \
self.aos_config_prompt])
if i == 0:
return self._timeout_error(child)
child.sendline(command)
i = child.expect([pexpect.TIMEOUT, \
self.aos_config_prompt])
if i == 0:
return self._timeout_error(child)
else:
return child.before