0

我在处理我一直在处理的守护进程中的数据库连接时遇到问题,我首先连接到我的 postgres 数据库:

try:
  psycopg2.apilevel = '2.0'
  psycopg2.threadsafety = 3
  cnx = psycopg2.connect( "host='192.168.10.36' dbname='db' user='vas' password='vas'")
  except Exception, e:
  print "Unable to connect to DB. Error [%s]" % ( e,)
  exit( )

之后,我选择数据库中状态 = 0 的所有行:

try:
  cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
  cursor.execute( "SELECT * FROM table WHERE status = 0")
  rows = cursor.fetchall( )
  cursor.close( )
except Exception, e:
  print "Error on sql query [%s]" % ( e,)

然后如果有选择的行程序叉成:

while 1:
  try:
    psycopg2.apilevel = '2.0'
    psycopg2.threadsafety = 3
    cnx = psycopg2.connect( "host='192.168.10.36' dbname='sms' user='vas' password='vas'")
  except Exception, e:
    print "Unable to connect to DB. Error [%s]" % ( e,)
    exit( )

  if rows:
    daemonize( )
    for i in rows:
      try:
        global q, l
        q = Queue.Queue( max_threads)
        for i in rows:
          cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
          t = threading.Thread( target=sender, args=(i, cursor))
          t.setDaemon( True)
          t.start( )

          for i in rows:
            q.put( i)
            q.join( )
      except Exception, e:
        print "Se ha producido el siguente error [%s]" % ( e,)
        exit( )
  else:
    print "No rows where selected\n"
    time.sleep( 5)

我的守护程序函数如下所示:

def daemonize( ):
  try:
    pid = os.fork()
    if pid > 0:
      sys.exit(0)
  except OSError, e:
    print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
    sys.exit(1)

  os.chdir("/")
  os.umask(0)

  try:
    pid = os.fork()
    if pid > 0:
      sys.exit(0)
  except OSError, e:
    print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
    sys.exit(1)

线程以发送者函数为目标:

def sender( row, db):
  while 1:
  item = q.get( )
  if send_to( row['to'], row['text']):
    db.execute( "UPDATE table SET status = 1 WHERE id = %d" % sms['id'])
  else:
    print "UPDATE table SET status = 2 WHERE id = %d" % sms['id']
    db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
  db.close( )
  q.task_done( )

send_to函数只是打开一个 url 并在成功时返回 true 或 false

从昨天开始,我不断收到这些错误,无法通过:

UPDATE outbox SET status = 2 WHERE id = 36
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 525, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 477, in run
    self.__target(*self.__args, **self.__kwargs)
  File "sender.py", line 30, in sender
    db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
  File "/usr/lib/python2.6/dist-packages/psycopg2/extras.py", line 88, in execute
    return _cursor.execute(self, query, vars, async)
OperationalError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
4

1 回答 1

1

数据库句柄无法跨fork(). 您需要在每个子进程中打开一个新的数据库句柄,即在您调用daemonize()call之后psycopg2.connect

我没有使用过 postgres,但我知道这对于 MySQL 来说绝对正确。

于 2009-10-13T19:46:51.553 回答