更新!由 tee 引起的段错误已在 python 2.7、3.7、3.8 及以上任何版本的最新版本中得到修复。为了线程安全,您仍然需要自己管理并发访问,您可以使用下面的我的解决方案。
tl;博士
在 CPython 中,`itertools.tee` 是线程安全的*当且仅当*原始迭代器是在 C/C++ 中实现的,即不使用**任何** python。
如果原始迭代器it
是用 python 编写的,例如类实例或生成器,则itertools.tee(it)
不是线程安全的。在最好的情况下,你只会得到一个异常(你会),而在最坏的情况下,python 会崩溃。
tee
这里是一个线程安全的包装类和函数,而不是 using :
class safeteeobject(object):
"""tee object wrapped to make it thread-safe"""
def __init__(self, teeobj, lock):
self.teeobj = teeobj
self.lock = lock
def __iter__(self):
return self
def __next__(self):
with self.lock:
return next(self.teeobj)
def __copy__(self):
return safeteeobject(self.teeobj.__copy__(), self.lock)
def safetee(iterable, n=2):
"""tuple of n independent thread-safe iterators"""
lock = Lock()
return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))
我现在将(大量)扩展什么时候tee
是线程安全的,什么时候不是线程安全的,以及为什么。
没问题的例子
让我们运行一些代码(这是 python 3 代码,对于 python 2 使用 `itertools.izip` 而不是 `zip` 具有相同的行为):
>>> from itertools import tee, count
>>> from threading import Thread
>>> def limited_sum(it):
... s = 0
... for elem, _ in zip(it, range(1000000)):
... s += elem
... print(elem)
>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
# prints 499999500000 twice, which is in fact the same 1+...+999999
itertools.count 在 CPython 项目的文件中完全用 C 语言编写Modules/itertoolsmodule.c
,所以它工作得很好。
同样适用于:列表、元组、集合、范围、字典(键、值和项)、collections.defaultdict
(键、值和项)以及其他一些。
不起作用的示例 - 生成器
一个非常简短的示例是使用生成器:
>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Exception in thread Thread-10:
Traceback (most recent call last):
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(*self._args, **self._kwargs)
ValueError: generator already executing
是的,tee
它是用 C 编写的,而且 GIL 确实一次执行一个字节代码。但是上面的例子表明,这并不足以保证线程安全。沿着这条线的某个地方发生了这样的事情:
- 两个线程调用
next
它们的 tee_object 实例的次数相同,
- 线程 1 调用
next(a)
,
- 它需要获取一个新元素,所以线程 1 现在调用
next(gen)
,
gen
是用python编写的。比如说,gen.__next__
CPython 的第一个字节码决定切换线程,
- 线程 2 恢复并调用
next(b)
,
- 它需要获取一个新元素,所以它调用
next(gen)
- 由于
gen.__next__
已经在线程 1 中运行,我们得到一个异常。
它不起作用的示例 - 迭代器对象
好的,也许在 tee 中使用生成器不是线程安全的。然后我们运行一个使用迭代器对象的上述代码的变体:
>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... def __iter__(self):
... return self
... def __next__(self):
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
上面的代码在 Ubuntu、Windows 7 和 OSX 上的 python 2.7.13 和 3.6(可能还有所有 cpython 版本)中崩溃。我还不想透露原因,先一步再说。
如果我在迭代器中使用锁怎么办?
也许上面的代码崩溃是因为我们的迭代器本身不是线程安全的。让我们添加一个锁,看看会发生什么:
>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... self.lock = Lock()
... def __iter__(self):
... return self
... def __next__(self):
... with self.lock:
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
在我们的迭代器中添加锁不足以使tee
线程安全。
为什么 tee 不是线程安全的
问题的关键在于CPython文件中的getitem
方法。的实现真的很酷,通过优化可以节省 RAM 调用:返回“tee 对象”,每个对象都保存对 head 的引用。这些反过来就像链表中的链接,但不是包含单个元素 - 它们包含 57。这对我们的目的来说并不重要,但它就是这样。这是 的功能:teedataobject
Modules/itertoolsmodule.c
tee
tee
teedataobject
getitem
teedataobject
static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
PyObject *value;
assert(i < LINKCELLS);
if (i < tdo->numread)
value = tdo->values[i];
else {
/* this is the lead iterator, so fetch more data */
assert(i == tdo->numread);
value = PyIter_Next(tdo->it);
if (value == NULL)
return NULL;
tdo->numread++;
tdo->values[i] = value;
}
Py_INCREF(value);
return value;
}
当被要求提供一个元素时,teedataobject
检查它是否已经准备好了。如果是,则返回它。如果没有,则调用next
原始迭代器。这就是,如果迭代器是用 python 编写的,代码可能会挂起。所以问题来了:
- 两个线程调用
next
了相同的次数,
- 线程 1 调用
next(a)
,C 代码到达PyIter_Next
上面的调用。例如,在 的第一个字节码上next(gen)
,CPython 决定切换线程。
- 线程 2 调用
next(b)
,因为它仍然需要一个新元素,所以 C 代码开始PyIter_Next
调用,
此时,两个线程都在同一个地方,具有相同的i
和值tdo->numread
。请注意,这tdo->numread
只是一个变量,用于跟踪下teedataobject
一个应该写入的 57 个单元链接中的位置。
线程 2 完成对元素的调用PyIter_Next
并返回一个元素。在某个时候,CPython 决定再次切换线程,
线程 1 恢复,完成对 的调用PyIter_Next
,然后运行以下两行:
tdo->numread++;
tdo->values[i] = value;
但是线程 2 已经设置好了tdo->values[i]
!
这已经足以表明它tee
不是线程安全的,因为我们丢失了线程 2 放入的值tdo->values[i]
。但这并不能解释崩溃。
说i
是 56。由于两个线程都调用tdo->numread++
,它现在达到 58 - 高于 57,分配的大小为tdo->values
。在线程 1 也继续运行之后,该对象tdo
没有更多的引用并准备好被删除。这是明确的功能teedataobject
:
static int
teedataobject_clear(teedataobject *tdo)
{
int i;
PyObject *tmp;
Py_CLEAR(tdo->it);
for (i=0 ; i<tdo->numread ; i++)
Py_CLEAR(tdo->values[i]); // <----- PROBLEM!!!
tmp = tdo->nextlink;
tdo->nextlink = NULL;
teedataobject_safe_decref(tmp);
return 0;
}
在标记为“问题”的行,CPython 将尝试清除tdo->values[57]
. 这就是崩溃发生的地方。嗯,有些时候。崩溃的地方不止一个,我只想展示一个。
现在你知道了 -itertools.tee
不是线程安全的。
一种解决方案 - 外部锁
__next__
我们可以在迭代器周围加一个锁,而不是锁定在迭代器中tee.__next__
。这意味着teedataobject.__getitem__
每次都将由单个线程调用整个方法。我在这个答案的开头给出了一个简短的实现。tee
它是线程安全的替代品。它唯一没有实现的tee
就是酸洗。由于锁不可腌制,因此添加它并非易事。但是,当然,这是可以做到的。