14

假设我有这个 Python 代码:

from itertools import count, tee
original = count()     # just an example, can be another iterable
a, b = tee(original)

问题是,如果我开始在一个线程中迭代“a”,同时在另一个线程中迭代“b”,会有什么问题吗?显然,a 和 b 共享一些数据(原始可迭代,+ 一些额外的东西,内部缓冲区或其他东西)。那么,当 a.next() 和 b.next() 访问这些共享数据时,它们会做适当的锁定吗?

4

4 回答 4

17

更新!由 tee 引起的段错误已在 python 2.7、3.7、3.8 及以上任何版本的最新版本中得到修复。为了线程安全,您仍然需要自己管理并发访问,您可以使用下面的我的解决方案。

tl;博士

在 CPython 中,`itertools.tee` 是线程安全的*当且仅当*原始迭代器是在 C/C++ 中实现的,即不使用**任何** python。

如果原始迭代器it是用 python 编写的,例如类实例或生成器,则itertools.tee(it)不是线程安全的。在最好的情况下,你只会得到一个异常(你会),而在最坏的情况下,python 会崩溃。

tee这里是一个线程安全的包装类和函数,而不是 using :

class safeteeobject(object):
    """tee object wrapped to make it thread-safe"""
    def __init__(self, teeobj, lock):
        self.teeobj = teeobj
        self.lock = lock
    def __iter__(self):
        return self
    def __next__(self):
        with self.lock:
            return next(self.teeobj)
    def __copy__(self):
        return safeteeobject(self.teeobj.__copy__(), self.lock)

def safetee(iterable, n=2):
    """tuple of n independent thread-safe iterators"""
    lock = Lock()
    return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))

我现在将(大量)扩展什么时候tee是线程安全的,什么时候不是线程安全的,以及为什么。

没问题的例子

让我们运行一些代码(这是 python 3 代码,对于 python 2 使用 `itertools.izip` 而不是 `zip` 具有相同的行为):
>>> from itertools import tee, count
>>> from threading import Thread

>>> def limited_sum(it):
...     s = 0
...     for elem, _ in zip(it, range(1000000)):
...         s += elem
...     print(elem)

>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
# prints 499999500000 twice, which is in fact the same 1+...+999999

itertools.count 在 CPython 项目的文件中完全用 C 语言编写Modules/itertoolsmodule.c,所以它工作得很好。

同样适用于:列表、元组、集合、范围、字典(键、值和项)、collections.defaultdict(键、值和项)以及其他一些。

不起作用的示例 - 生成器

一个非常简短的示例是使用生成器:
>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]

Exception in thread Thread-10:
Traceback (most recent call last):
  File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.4/threading.py", line 868, in run
    self._target(*self._args, **self._kwargs)
ValueError: generator already executing

是的,tee它是用 C 编写的,而且 GIL 确实一次执行一个字节代码。但是上面的例子表明,这并不足以保证线程安全。沿着这条线的某个地方发生了这样的事情:

  1. 两个线程调用next它们的 tee_object 实例的次数相同,
  2. 线程 1 调用next(a)
  3. 它需要获取一个新元素,所以线程 1 现在调用next(gen),
  4. gen是用python编写的。比如说,gen.__next__CPython 的第一个字节码决定切换线程,
  5. 线程 2 恢复并调用next(b)
  6. 它需要获取一个新元素,所以它调用next(gen)
  7. 由于gen.__next__已经在线程 1 中运行,我们得到一个异常。

它不起作用的示例 - 迭代器对象

好的,也许在 tee 中使用生成器不是线程安全的。然后我们运行一个使用迭代器对象的上述代码的变体:
>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...     def __iter__(self):
...         return self
...     def __next__(self):
...         self.i -= 1
...         if self.i < 0:
...             raise StopIteration
...         return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

上面的代码在 Ubuntu、Windows 7 和 OSX 上的 python 2.7.13 和 3.6(可能还有所有 cpython 版本)中崩溃。我还不想透露原因,先一步再说。

如果我在迭代器中使用锁怎么办?

也许上面的代码崩溃是因为我们的迭代器本身不是线程安全的。让我们添加一个锁,看看会发生什么:
>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
...     def __init__(self, n):
...         self.i = n
...         self.lock = Lock()
...     def __iter__(self):
...         return self
...     def __next__(self):
...         with self.lock:
...             self.i -= 1
...             if self.i < 0:
...                 raise StopIteration
...             return self.i
... 
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)

在我们的迭代器中添加锁不足以使tee线程安全。

为什么 tee 不是线程安全的

问题的关键在于CPython文件中的getitem方法。的实现真的很酷,通过优化可以节省 RAM 调用:返回“tee 对象”,每个对象都保存对 head 的引用。这些反过来就像链表中的链接,但不是包含单个元素 - 它们包含 57。这对我们的目的来说并不重要,但它就是这样。这是 的功能:teedataobjectModules/itertoolsmodule.cteeteeteedataobjectgetitemteedataobject

static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
    PyObject *value;

    assert(i < LINKCELLS);
    if (i < tdo->numread)
        value = tdo->values[i];
    else {
        /* this is the lead iterator, so fetch more data */
        assert(i == tdo->numread);
        value = PyIter_Next(tdo->it);
        if (value == NULL)
            return NULL;
        tdo->numread++;
        tdo->values[i] = value;
    }
    Py_INCREF(value);
    return value;
}

当被要求提供一个元素时,teedataobject检查它是否已经准备好了。如果是,则返回它。如果没有,则调用next原始迭代器。这就是,如果迭代器是用 python 编写的,代码可能会挂起。所以问题来了:

  1. 两个线程调用next了相同的次数,
  2. 线程 1 调用next(a),C 代码到达PyIter_Next上面的调用。例如,在 的第一个字节码上next(gen),CPython 决定切换线程。
  3. 线程 2 调用next(b),因为它仍然需要一个新元素,所以 C 代码开始PyIter_Next调用,

此时,两个线程都在同一个地方,具有相同的i和值tdo->numread。请注意,这tdo->numread只是一个变量,用于跟踪下teedataobject一个应该写入的 57 个单元链接中的位置。

  1. 线程 2 完成对元素的调用PyIter_Next并返回一个元素。在某个时候,CPython 决定再次切换线程,

  2. 线程 1 恢复,完成对 的调用PyIter_Next,然后运行以下两行:

         tdo->numread++;
         tdo->values[i] = value;
    
  3. 但是线程 2 已经设置好了tdo->values[i]

这已经足以表明它tee不是线程安全的,因为我们丢失了线程 2 放入的值tdo->values[i]。但这并不能解释崩溃。

i是 56。由于两个线程都调用tdo->numread++,它现在达到 58 - 高于 57,分配的大小为tdo->values。在线程 1 也继续运行之后,该对象tdo没有更多的引用并准备好被删除。这是明确的功能teedataobject

static int
teedataobject_clear(teedataobject *tdo)
{
    int i;
    PyObject *tmp;

    Py_CLEAR(tdo->it);
    for (i=0 ; i<tdo->numread ; i++)
        Py_CLEAR(tdo->values[i]); // <----- PROBLEM!!!
    tmp = tdo->nextlink;
    tdo->nextlink = NULL;
    teedataobject_safe_decref(tmp);
    return 0;
}

在标记为“问题”的行,CPython 将尝试清除tdo->values[57]. 这就是崩溃发生的地方。嗯,有些时候。崩溃的地方不止一个,我只想展示一个。

现在你知道了 -itertools.tee不是线程安全的。

一种解决方案 - 外部锁

__next__我们可以在迭代器周围加一个锁,而不是锁定在迭代器中tee.__next__。这意味着teedataobject.__getitem__每次都将由单个线程调用整个方法。我在这个答案的开头给出了一个简短的实现。tee它是线程安全的替代品。它唯一没有实现的tee就是酸洗。由于锁不可腌制,因此添加它并非易事。但是,当然,这是可以做到的。

于 2017-06-19T19:36:51.123 回答
2

如果文档中显示了等效代码,请在此处:

是正确的,那么不,它不会是线程安全的。

请注意,尽管deque被记录为具有线程安全的追加和弹出功能,但它并不为使用它的代码做任何保证。

由于主代码最终可能会向底层迭代器询问多个线程上的元素,因此您需要有一个线程安全的集合和迭代器作为输入,以使 tee 安全。

于 2011-07-15T07:36:06.130 回答
0

在 C-Python 中,itertools.tee()它返回的迭代器是用 C 代码实现的。这意味着 GIL 应该保护它不被多个线程同时调用。它可能会正常工作,并且不会使解释器崩溃,但不能保证它是线程安全的。

简而言之,不要冒险。

于 2011-07-15T07:42:06.190 回答
0

我想分享我在 Python 3.6.9 和 3.7.4 环境中使用 itertools.tee 将大型 plat 文本文件拆分为多个 csv 文件从/到 s3 的经验。

我的数据流来自 s3 zipfile、s3fs read iter、map iter for dataclass transform、tee iter、map iter for dataclass filter、循环遍历 iter 并捕获数据并使用 s3fs write 和/或 local write 以 csv 格式写入 s3 和s3fs 放入 s3。

itertools.tee 在 zipfile 进程堆栈上失败。

上面,Safetee 的 Dror Speiser 工作正常,但是由于数据集分布不佳或处理延迟,tee 对象之间的任何不平衡都会增加内存使用量。此外,它不适用于多处理日志记录,可能与此错误有关:https ://bugs.python.org/issue34410

下面的代码只是在 tee 对象之间添加简单的流控制,以防止内存增加和 OOM Killer 情况。

希望对以后的参考有所帮助。

import time
import threading
import logging
from itertools import tee
from collections import Counter

logger = logging.getLogger(__name__)


FLOW_WAIT_GAP = 1000  # flow gap for waiting
FLOW_WAIT_TIMEOUT = 60.0  # flow wait timeout


class Safetee:
    """tee object wrapped to make it thread-safe and flow controlled"""

    def __init__(self, teeobj, lock, flows, teeidx):
        self.teeobj = teeobj
        self.lock = lock
        self.flows = flows
        self.mykey = teeidx
        self.logcnt = 0

    def __iter__(self):
        return self

    def __next__(self):
        waitsec = 0.0
        while True:
            with self.lock:
                flowgap = self.flows[self.mykey] - self.flows[len(self.flows) - 1]
                if flowgap < FLOW_WAIT_GAP or waitsec > FLOW_WAIT_TIMEOUT:
                    nextdata = next(self.teeobj)
                    self.flows[self.mykey] += 1
                    return nextdata

            waitthis = min(flowgap / FLOW_WAIT_GAP, FLOW_WAIT_TIMEOUT / 3)
            waitsec += waitthis

            time.sleep(waitthis)

            if waitsec > FLOW_WAIT_TIMEOUT and self.logcnt < 5:
                self.logcnt += 1
                logger.debug(f'tee wait seconds={waitsec:.2f}, mykey={self.mykey}, flows={self.flows}')

    def __copy__(self):
        return Safetee(self.teeobj.__copy__(), self.lock, self.flows, self.teeidx)


def safetee(iterable, n=2):
    """tuple of n independent thread-safe and flow controlled iterators"""
    lock = threading.Lock()
    flows = Counter()
    return tuple(Safetee(teeobj, lock, flows, teeidx) for teeidx, teeobj in enumerate(tee(iterable, n)))


于 2020-08-28T15:03:07.290 回答