4

我正在编写一个 python 脚本,以每隔一段时间通过已建立的 ssh 隧道查询一些远程数据库。我对 paramiko 库相当熟悉,所以这是我选择的路线。我更愿意将它保存在完整的 python 中,这样我就可以使用 paramiko 来处理关键问题,以及使用 python 来启动、控制和关闭 ssh 隧道。

这里有一些关于这个主题的相关问题,但大多数问题的答案似乎都不完整。我下面的解决方案是我迄今为止找到的解决方案的组合。

现在解决问题:我可以很容易地创建第一个隧道(在一个单独的线程中)并执行我的 DB/python 工作,但是当尝试关闭隧道时,本地主机不会释放我绑定到的本地端口。下面,我在流程的每个步骤中都包含了我的来源和相关的 netstat 数据。

#!/usr/bin/python

import select
import SocketServer
import sys
import paramiko
from threading import Thread
import time



class ForwardServer(SocketServer.ThreadingTCPServer):
    daemon_threads = True
    allow_reuse_address = True

class Handler (SocketServer.BaseRequestHandler):
    def handle(self):
        try:
            chan = self.ssh_transport.open_channel('direct-tcpip', (self.chain_host, self.chain_port), self.request.getpeername())
        except Exception, e:
            print('Incoming request to %s:%d failed: %s' % (self.chain_host, self.chain_port, repr(e)))
            return
        if chan is None:
            print('Incoming request to %s:%d was rejected by the SSH server.' % (self.chain_host, self.chain_port))
            return
        print('Connected!  Tunnel open %r -> %r -> %r' % (self.request.getpeername(), chan.getpeername(), (self.chain_host, self.chain_port)))
        while True:
            r, w, x = select.select([self.request, chan], [], [])
            if self.request in r:
                data = self.request.recv(1024)
                if len(data) == 0:
                    break
                chan.send(data)
            if chan in r:
                data = chan.recv(1024)
                if len(data) == 0:
                    break
                self.request.send(data)
        chan.close()
        self.request.close()
        print('Tunnel closed from %r' % (self.request.getpeername(),))

class DBTunnel():

    def __init__(self,ip):
        self.c = paramiko.SSHClient()
        self.c.load_system_host_keys()
        self.c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.c.connect(ip, username='someuser')
        self.trans = self.c.get_transport()

    def startTunnel(self):
        class SubHandler(Handler):
            chain_host = '127.0.0.1'
            chain_port = 5432
            ssh_transport = self.c.get_transport()
        def ThreadTunnel():
            global t
            t = ForwardServer(('', 3333), SubHandler)
            t.serve_forever()
        Thread(target=ThreadTunnel).start()

    def stopTunnel(self):
        t.shutdown()
        self.trans.close()
        self.c.close()

虽然我最终将使用 stopTunnel() 类型的方法,但我意识到代码并不完全正确,但更多的是尝试让隧道正确关闭并测试我的结果的实验​​。

当我第一次调用创建 DBTunnel 对象并调用 startTunnel() 时,netstat 产生以下结果:

tcp4       0      0 *.3333                 *.*                    LISTEN
tcp4       0      0 MYIP.36316      REMOTE_HOST.22                ESTABLISHED
tcp4       0      0 127.0.0.1.5432         *.*                    LISTEN

一旦我调用了 stopTunnel(),甚至删除了 DBTunnel 对象本身……我就留下了这个连接,直到我一起退出 python,我假设的垃圾收集器会处理它:

tcp4       0      0 *.3333                 *.*                    LISTEN

弄清楚为什么这个打开的套接字独立于 DBConnect 对象,以及如何在我的脚本中正确关闭它,这将是一件好事。如果我在完全退出 python 之前尝试使用相同的本地端口将不同的连接绑定到不同的 IP(time_wait 不是问题),那么我会得到臭名昭著的 bind err 48 地址在使用中。提前致谢 :)

4

4 回答 4

1

看来 SocketServer 的关闭方法没有正确关闭/关闭套接字。通过对我的代码进行以下更改,我保留了对 SocketServer 对象的访问权限并直接访问套接字以将其关闭。请注意, socket.close() 在我的情况下有效,但如果其他资源正在访问该套接字,其他人可能会对 socket.shutdown() 后跟 socket.close() 感兴趣。

[参考:socket.shutdown 与 socket.close

def ThreadTunnel():
    self.t = ForwardServer(('127.0.0.1', 3333), SubHandler)
    self.t.serve_forever()
Thread(target=ThreadTunnel).start()

def stopTunnel(self):
    self.t.shutdown()
    self.trans.close()
    self.c.close()
    self.t.socket.close()
于 2010-12-13T05:09:38.663 回答
1

请注意,您没有进行演示代码中所示的 Subhandler hack。评论是错误的。处理程序确实可以访问其服务器的数据。在处理程序中,您可以使用self.server.instance_data.

如果您使用以下代码,在您的处理程序中,您将使用

  • self.server.chain_host
  • self.server.chain_port
  • self.server.ssh_transport

class ForwardServer(SocketServer.ThreadingTCPServer):
    daemon_threads = True
    allow_reuse_address = True

    def __init__(
          self, connection, handler, chain_host, chain_port, ssh_transport):
        SocketServer.ThreadingTCPServer.__init__(self, connection, handler)
        self.chain_host = chain_host
        self.chain_port = chain_port
        self.ssh_transport = ssh_transport
...

server = ForwardServer(('', local_port), Handler, 
                       remote_host, remote_port, transport)
server.serve_forever()
于 2012-02-15T23:46:02.160 回答
0

您可能希望在生成的线程和调用者之间添加一些同步,这样您就不会在隧道准备好之前尝试使用它。就像是:

    from threading import Event   
    def startTunnel(self):
        class SubHandler(Handler):
            chain_host = '127.0.0.1'
            chain_port = 5432
            ssh_transport = self.c.get_transport()
        mysignal = Event()
        mysignal.clear()
        def ThreadTunnel():
            global t
            t = ForwardServer(('', 3333), SubHandler)
            mysignal.set() 
            t.serve_forever()
        Thread(target=ThreadTunnel).start()
        mysignal.wait()
于 2011-05-02T02:11:54.293 回答
0

您也可以尝试sshtunnel.stop()如果您想等到所有活动连接结束或.stop(force=True)关闭所有活动连接,它有两种情况可以关闭隧道。

如果您不想使用它,可以在此处查看此逻辑的源代码:https ://github.com/pahaz/sshtunnel/blob/090a1c1/sshtunnel.py#L1423-L1456

于 2020-11-02T12:10:01.557 回答