1

我正在使用这个问题的公认答案。相关代码如下。

import multiprocessing

def query_with_timeout(dbc, timeout, query, *a, **k):
  conn1, conn2 = multiprocessing.Pipe(False)
  subproc = multiprocessing.Process(target=do_query,
                                    args=(dbc, query, conn2)+a, 
                                    kwargs=k)
  subproc.join(timeout)
  if conn1.poll():
    return conn1.recv()
  subproc.terminate()
  raise Exception("Query %r ran for >%r" % (query, timeout))

def do_query(dbc, query, conn, *a, **k):
  cu = dbc.cursor()
  cu.execute(query, *a, **k)
  return cu.fetchall()

我的电话看起来像这样:

res = query_with_timeout(dbconn, 30, "SELECT * FROM `table`)
print res

dbconn是一个连接对象。它在应用程序的其他地方(在此超时功能之外)使用没有任何问题。

它似乎加入了子进程并等待了 30 秒。尽管它在轮询结果时从不进入if conn1.poll(): return conn1.recv()代码块。相反,我总是收到异常。

我知道查询运行(这是一个简单的选择),它运行不到一秒钟。

我错过了什么?

4

1 回答 1

1

好吧,有几件事基于您发布的代码。首先,你从来没有start()这个过程。但也许这只是你在输入这个例子时犯的一个错误?

其次,您不会send()通过连接进行任何操作。

这是上面的简化版本,具有预期的行为:

import multiprocessing
import time

def simple_example(wait, timeout, query):
    conn1, conn2 = multiprocessing.Pipe(False)
    subproc = multiprocessing.Process(target=do_query, args=(wait, query, conn2))
    subproc.start()
    subproc.join(timeout)
    if conn1.poll():
        return conn1.recv()
    subproc.terminate()
    raise Exception('Query %r ran for >%r' % (query, timeout))

def do_query(wait, query, conn):
    print query
    time.sleep(wait)
    conn.send(query)

res = simple_example(0, 2, 'foo')
res = simple_example(3, 2, 'foo')

我担心这可能只是您的示例代码的问题。

于 2012-06-15T20:43:24.370 回答