2

我正在编写一个多线程 Python 程序,其中我正在启动工作线程来处理输入数据列表。然后他们处理的数据需要网络操作,这有效地使他们受 I/O 限制(所以 GIL 对我来说不是问题)。

我遇到了一个问题,多个工作线程显然接收到相同的输入,但我不知道为什么。据我所知,我没有在线程之间共享任何线程不安全的数据。

我已经创建了我正在尝试做的最小化版本。该程序在不执行任何 I/O 或任何操作的情况下显示问题:

#!/usr/bin/env python

import threading
import logging
import time

logging.basicConfig(level=logging.DEBUG,
                    format="%(threadName)-10s %(levelname)-7s %(message)s")

sema = threading.Semaphore(10)

# keep track of already-visited data in worker threads
seen = []
seenlock = threading.Lock()

def see(num):
    try:
        logging.info("see: look at %d", num)

        with seenlock:
            if num in seen:
                # this should be unreachable if each thread processes a unique number
                logging.error("see: already saw %d", num)
            else:
                seen.append(num)

        time.sleep(0.3)

    finally:
        sema.release()


def main():
    # start at 1, so that the input number matches the log's "Thread-#"
    for i in xrange(1, 100):
        sema.acquire() # prevent more than 10 simultaneous threads
        logging.info("process %d", i)
        threading.Thread(target=lambda: see(i)).start()

if __name__ == '__main__': main()

还有一些输出:

MainThread INFO    process 1
MainThread INFO    process 2
Thread-1   INFO    see: look at 2
Thread-2   INFO    see: look at 2
MainThread INFO    process 3
Thread-2   ERROR   see: already saw 2
MainThread INFO    process 4
Thread-3   INFO    see: look at 4
Thread-4   INFO    see: look at 4
MainThread INFO    process 5
Thread-4   ERROR   see: already saw 4
Thread-5   INFO    see: look at 5
MainThread INFO    process 6
Thread-6   INFO    see: look at 6
MainThread INFO    process 7
Thread-7   INFO    see: look at 7
MainThread INFO    process 8
Thread-8   INFO    see: look at 8
MainThread INFO    process 9
MainThread INFO    process 10

我觉得我正在做的唯一可能奇怪的事情是在一个线程上获得一个信号量许可,而不是在它被释放的地方,但是信号量应该是线程安全的,并且不关心谁获得和释放许可,只要有每个相同的数量。

确认时间:

  • Python 2.7.3(Windows;从 python.org 构建)
  • Python 2.6.7(Windows;cygwin dist)
  • Python 2.6.6 (Linux; Debian dist)

我在做什么来使我的线程共享数据?

4

1 回答 1

3

这与线程无关。它与闭包的行为有关。

>>> funcs = []
>>> for x in range(10):
...     def foo():
...         return x
...     funcs.append(foo)
... 
>>> [f() for f in funcs]
[9, 9, 9, 9, 9, 9, 9, 9, 9, 9]

当您定义一个函数并引用封闭范围内的变量时,该变量的值始终是封闭范围内该变量在函数调用时的值。由于这些函数都是在for循环结束后调用的,因此x == 9对于所有 10 次调用。

解决此问题的一种简单方法是使用默认值。简而言之,改变这个:

    threading.Thread(target=lambda: see(i)).start()

对此:

    threading.Thread(target=lambda x=i: see(x)).start()

或者,更好的是,使用Thread构造函数的全部功能(感谢 Joel Cornett 提醒我):

    threading.Thread(target=see, args=(i,)).start()
于 2012-07-14T19:36:45.167 回答