4

统计累加器允许执行增量计算。例如,为了计算在任意时间给定的数字流的算术平均值,可以创建一个对象来跟踪给定项目的当前数量n及其总和,sum. 当一个人请求平均值时,对象只是返回sum/n

像这样的累加器允许您进行增量计算,即当给定一个新数字时,您不需要重新计算整个总和和计数。

可以为其他统计信息编写类似的累加器(参见C++ 实现的boost 库)。

你将如何在 Python 中实现累加器?我想出的代码是:

class Accumulator(object):
    """
    Used to accumulate the arithmetic mean of a stream of
    numbers. This implementation does not allow to remove items
    already accumulated, but it could easily be modified to do
    so. also, other statistics could be accumulated.
    """
    def __init__(self):
     # upon initialization, the numnber of items currently
     # accumulated (_n) and the total sum of the items acumulated
     # (_sum) are set to zero because nothing has been accumulated
     # yet.
     self._n = 0
     self._sum = 0.0

    def add(self, item):
     # the 'add' is used to add an item to this accumulator
     try:
        # try to convert the item to a float. If you are
        # successful, add the float to the current sum and
        # increase the number of accumulated items
        self._sum += float(item)
        self._n += 1
     except ValueError:
        # if you fail to convert the item to a float, simply
        # ignore the exception (pass on it and do nothing)
        pass

    @property
    def mean(self):
     # the property 'mean' returns the current mean accumulated in
     # the object
     if self._n > 0:
        # if you have more than zero items accumulated, then return
        # their artithmetic average
        return self._sum / self._n
     else:
        # if you have no items accumulated, return None (you could
        # also raise an exception)
        return None

# using the object:

# Create an instance of the object "Accumulator"
my_accumulator = Accumulator()
print my_accumulator.mean
# prints None because there are no items accumulated

# add one (a number)
my_accumulator.add(1)
print my_accumulator.mean
# prints 1.0

# add two (a string - it will be converted to a float)
my_accumulator.add('2')
print my_accumulator.mean
# prints 1.5

# add a 'NA' (will be ignored because it cannot be converted to float)
my_accumulator.add('NA')
print my_accumulator.mean
# prints 1.5 (notice that it ignored the 'NA')

有趣的设计问题出现了:

  1. 如何使累加器线程安全?
  2. 如何安全取出物品?
  3. 如何以允许轻松插入其他统计数据的方式进行架构(统计工厂)
4

2 回答 2

3

对于通用的、线程安全的高级函数,您可以将以下内容与Queue.Queue类和其他一些位结合使用:

from Queue import Empty

def Accumulator(f, q, storage):
    """Yields successive values of `f` over the accumulation of `q`.

    `f` should take a single iterable as its parameter.

    `q` is a Queue.Queue or derivative.

    `storage` is a persistent sequence that provides an `append` method.
    `collections.deque` may be particularly useful, but a `list` is quite acceptable.

    >>> from Queue import Queue
    >>> from collections import deque
    >>> from threading import Thread
    >>> def mean(it):
    ...     vals = tuple(it)
    ...     return sum(it) / len(it)
    >>> value_queue = Queue()
    >>> LastThreeAverage = Accumulator(mean, value_queue, deque((), 3))
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(LastThreeAverage)
    [0, 1, 2, 4, 6, 8]
    """
    try:
        while True:
            storage.append(q.get(timeout=0.1))
            q.task_done()
            yield f(storage)
    except Empty:
        pass

这个生成器函数通过将其委托给其他实体来逃避其声称的大部分责任:

  • 它依赖于Queue.Queue以线程安全的方式提供其源元素
  • 一个collections.deque对象可以作为storage参数的值传入;除其他外,这提供了一种仅使用最后一个n(在本例中为 3)值的便捷方式
  • 函数本身(在这种情况下mean)作为参数传递。在某些情况下,这将导致代码效率低于最优,但很容易应用于各种情况。

请注意,如果您的生产者线程每个值花费的时间超过 0.1 秒,则累加器可能会超时。这很容易通过传递更长的超时时间或完全删除超时参数来解决。在后一种情况下,函数将在队列末尾无限期地阻塞;这种用法在子线程(通常是daemon线程)中使用的情况下更有意义。当然,您也可以将传递给q.get的参数作为第四个参数参数化Accumulator

如果您想从生产者线程(此处)传达队列结束,即没有更多的值putting_thread,您可以传递并检查哨兵值或使用其他方法。此线程中有更多信息;我选择编写一个名为CloseableQueue的 Queue.Queue 子类,它提供了一种close方法。

您可以通过多种其他方式自定义此类函数的行为,例如通过限制队列大小;这只是一个使用示例。

编辑

如上所述,由于重新计算的必要性,这会降低一些效率,而且我认为这并不能真正回答您的问题。

生成器函数也可以通过其send方法接受值。所以你可以写一个平均生成器函数,比如

def meangen():
    """Yields the accumulated mean of sent values.

    >>> g = meangen()
    >>> g.send(None) # Initialize the generator
    >>> g.send(4)
    4.0
    >>> g.send(10)
    7.0
    >>> g.send(-2)
    4.0
    """
    sum = yield(None)
    count = 1
    while True:
        sum += yield(sum / float(count))
        count += 1

这里的 yield 表达式既将值(参数send)带入函数,同时将计算的值作为 的返回值传递出去send

您可以将调用该函数返回的生成器传递给更优化的累加器生成器函数,如下所示:

def EfficientAccumulator(g, q):
    """Similar to Accumulator but sends values to a generator `g`.

    >>> from Queue import Queue
    >>> from threading import Thread
    >>> value_queue = Queue()
    >>> g = meangen()
    >>> g.send(None)
    >>> mean_accumulator = EfficientAccumulator(g, value_queue)
    >>> def add_to_queue(it, queue):
    ...     for value in it:
    ...         value_queue.put(value)
    >>> putting_thread = Thread(target=add_to_queue,
    ...                         args=(range(0, 12, 2), value_queue))
    >>> putting_thread.start()
    >>> list(mean_accumulator)
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0]
    """
    try:
        while True:
            yield(g.send(q.get(timeout=0.1)))
            q.task_done()
    except Empty:
        pass
于 2010-09-23T05:04:36.347 回答
1

如果我在 Python 中这样做,有两件事我会做不同的事情:

  1. 分离出每个累加器的功能。
  2. 不要以任何方式使用@property。

对于第一个,我可能想提出一个用于执行累积的 API,可能类似于:

def add(self, num) # add a number
def compute(self) # compute the value of the accumulator

然后我会创建一个 AccumulatorRegistry 来保存这些累加器,并允许用户调用操作并添加到所有这些累加器中。代码可能如下所示:

class Accumulators(object):
    _accumulator_library = {}

    def __init__(self):
        self.accumulator_library = {}
        for key, value in Accumulators._accumulator_library.items():
            self.accumulator_library[key] = value()

    @staticmethod
    def register(name, accumulator):
        Accumulators._accumulator_library[name] = accumulator

    def add(self, num):
        for accumulator in self.accumulator_library.values():
            accumulator.add(num)

    def compute(self, name):
        self.accumulator_library[name].compute()

    @staticmethod
    def register_decorator(name):
        def _inner(cls):
            Accumulators.register(name, cls)
            return cls


@Accumulators.register_decorator("Mean")
class Mean(object):
    def __init__(self):
        self.total = 0
        self.count = 0

    def add(self, num):
        self.count += 1
        self.total += num

    def compute(self):
        return self.total / float(self.count)

我可能应该谈谈你的线程安全问题。Python 的 GIL 可以保护您免受许多线程问题的影响。不过,您可以采取一些措施来保护自己:

  • 如果这些对象被本地化到一个线程,请使用 threading.local
  • 如果没有,您可以将操作包装在锁中,使用 with context 语法为您处理持有锁。
于 2010-09-22T23:35:15.777 回答