45

你知道是否有办法让 pythonrandom.sample使用生成器对象。我正在尝试从一个非常大的文本语料库中获取随机样本。问题是random.sample()引发以下错误。

TypeError: object of type 'generator' has no len()

我在想也许有一些方法可以用一些东西来做这件事,itertools但通过一些搜索却找不到任何东西。

一个有点编造的例子:

import random
def list_item(ls):
    for item in ls:
        yield item

random.sample( list_item(range(100)), 20 )


更新


根据MartinPieters的要求,我对目前提出的三种方法做了一些时间安排。结果如下。

Sampling 1000 from 10000
Using iterSample 0.0163 s
Using sample_from_iterable 0.0098 s
Using iter_sample_fast 0.0148 s

Sampling 10000 from 100000
Using iterSample 0.1786 s
Using sample_from_iterable 0.1320 s
Using iter_sample_fast 0.1576 s

Sampling 100000 from 1000000
Using iterSample 3.2740 s
Using sample_from_iterable 1.9860 s
Using iter_sample_fast 1.4586 s

Sampling 200000 from 1000000
Using iterSample 7.6115 s
Using sample_from_iterable 3.0663 s
Using iter_sample_fast 1.4101 s

Sampling 500000 from 1000000
Using iterSample 39.2595 s
Using sample_from_iterable 4.9994 s
Using iter_sample_fast 1.2178 s

Sampling 2000000 from 5000000
Using iterSample 798.8016 s
Using sample_from_iterable 28.6618 s
Using iter_sample_fast 6.6482 s

所以事实证明,array.insert当涉及到大样本时,它有一个严重的缺点。我用来计时方法的代码

from heapq import nlargest
import random
import timeit


def iterSample(iterable, samplesize):
    results = []
    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

def sample_from_iterable(iterable, samplesize):
    return (x for _, x in nlargest(samplesize, ((random.random(), x) for x in iterable)))

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    for _ in xrange(samplesize):
        results.append(iterator.next())
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")
    return results

if __name__ == '__main__':
    pop_sizes = [int(10e+3),int(10e+4),int(10e+5),int(10e+5),int(10e+5),int(10e+5)*5]
    k_sizes = [int(10e+2),int(10e+3),int(10e+4),int(10e+4)*2,int(10e+4)*5,int(10e+5)*2]

    for pop_size, k_size in zip(pop_sizes, k_sizes):
        pop = xrange(pop_size)
        k = k_size
        t1 = timeit.Timer(stmt='iterSample(pop, %i)'%(k_size), setup='from __main__ import iterSample,pop')
        t2 = timeit.Timer(stmt='sample_from_iterable(pop, %i)'%(k_size), setup='from __main__ import sample_from_iterable,pop')
        t3 = timeit.Timer(stmt='iter_sample_fast(pop, %i)'%(k_size), setup='from __main__ import iter_sample_fast,pop')

        print 'Sampling', k, 'from', pop_size
        print 'Using iterSample', '%1.4f s'%(t1.timeit(number=100) / 100.0)
        print 'Using sample_from_iterable', '%1.4f s'%(t2.timeit(number=100) / 100.0)
        print 'Using iter_sample_fast', '%1.4f s'%(t3.timeit(number=100) / 100.0)
        print ''

我还进行了一项测试,以检查所有方法确实确实采用了生成器的无偏样本。因此,对于所有方法,我从时间中采样1000元素10000 100000并计算总体中每个项目的平均出现频率,结果证明这~.1与所有三种方法的预期相同。

4

8 回答 8

27

虽然 Martijn Pieters 的答案是正确的,但它在变大时确实会变慢samplesize,因为list.insert在循环中使用可能具有二次复杂度。

在我看来,这是一种在提高性能的同时保持均匀性的替代方案:

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    try:
        for _ in xrange(samplesize):
            results.append(iterator.next())
    except StopIteration:
        raise ValueError("Sample larger than population.")
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items
    return results

samplesize对于上面的值,差异慢慢开始显现10000。拨打电话的时间(1000000, 100000)

  • 迭代采样:5.05s
  • iter_sample_fast:2.64 秒
于 2012-09-25T12:56:13.753 回答
21

你不能。

您有两种选择:将整个生成器读入一个列表,然后从该列表中采样,或者使用一种方法逐个读取生成器并从中挑选样本:

import random

def iterSample(iterable, samplesize):
    results = []

    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

此方法根据迄今为止可迭代项中的项目数调整下一个项目是样本一部分的机会。它不需要samplesize在内存中保存超过项目。

解决方案不是我的;它是在 SO 上作为另一个答案的一部分提供的。

于 2012-09-25T10:53:34.763 回答
8

只是为了它,这是一个单线,它从O( n lg k ) 时间生成的n 个项目中采样k个元素而不替换:

from heapq import nlargest

def sample_from_iterable(it, k):
    return (x for _, x in nlargest(k, ((random.random(), x) for x in it)))
于 2012-09-25T12:24:42.197 回答
3

我正在尝试从一个非常大的文本语料库中获取随机样本。

您出色的综合答案目前显示iter_sample_fast(gen, pop). 但是,我尝试了 Katriel 的推荐random.sample(list(gen), pop)——相比之下,它的速度快得惊人!

def iter_sample_easy(iterable, samplesize):
    return random.sample(list(iterable), samplesize)

Sampling 1000 from 10000
Using iter_sample_fast 0.0192 s
Using iter_sample_easy 0.0009 s

Sampling 10000 from 100000
Using iter_sample_fast 0.1807 s
Using iter_sample_easy 0.0103 s

Sampling 100000 from 1000000
Using iter_sample_fast 1.8192 s
Using iter_sample_easy 0.2268 s

Sampling 200000 from 1000000
Using iter_sample_fast 1.7467 s
Using iter_sample_easy 0.3297 s

Sampling 500000 from 1000000
Using iter_sample_easy 0.5628 s

Sampling 2000000 from 5000000
Using iter_sample_easy 2.7147 s

现在,随着您的语料库变得非常大,将整个迭代物化为 alist将使用大量的内存。但是,如果我们可以将问题分块,我们仍然可以利用 Python 的超快性:基本上,我们选择一个CHUNKSIZE“相当小”random.sample的块,然后random.sample再将它们合并在一起。我们只需要正确设置边界条件。

list(iterable)如果长度是 的精确倍数CHUNKSIZE且不大于 ,我知道该怎么做samplesize*CHUNKSIZE

def iter_sample_dist_naive(iterable, samplesize):
    CHUNKSIZE = 10000
    samples = []
    it = iter(iterable)
    try:
        while True:
            first = next(it)
            chunk = itertools.chain([first], itertools.islice(it, CHUNKSIZE-1))
            samples += iter_sample_easy(chunk, samplesize)
    except StopIteration:
        return random.sample(samples, samplesize)

但是,上面的代码在 时会产生非均匀采样,并且会在“非常大”时len(list(iterable)) % CHUNKSIZE != 0耗尽内存。len(list(iterable)) * samplesize / CHUNKSIZE修复这些错误恐怕超出了我的薪酬等级,但这篇博文中描述了一个解决方案,对我来说听起来很合理。(搜索词:“分布式随机抽样”、“分布式水库抽样”。)

Sampling 1000 from 10000
Using iter_sample_fast 0.0182 s
Using iter_sample_dist_naive 0.0017 s
Using iter_sample_easy 0.0009 s

Sampling 10000 from 100000
Using iter_sample_fast 0.1830 s
Using iter_sample_dist_naive 0.0402 s
Using iter_sample_easy 0.0103 s

Sampling 100000 from 1000000
Using iter_sample_fast 1.7965 s
Using iter_sample_dist_naive 0.6726 s
Using iter_sample_easy 0.2268 s

Sampling 200000 from 1000000
Using iter_sample_fast 1.7467 s
Using iter_sample_dist_naive 0.8209 s
Using iter_sample_easy 0.3297 s

我们真正获胜的地方是samplesize相对于 非常小的时候len(list(iterable))

Sampling 20 from 10000
Using iterSample 0.0202 s
Using sample_from_iterable 0.0047 s
Using iter_sample_fast 0.0196 s
Using iter_sample_easy 0.0001 s
Using iter_sample_dist_naive 0.0004 s

Sampling 20 from 100000
Using iterSample 0.2004 s
Using sample_from_iterable 0.0522 s
Using iter_sample_fast 0.1903 s
Using iter_sample_easy 0.0016 s
Using iter_sample_dist_naive 0.0029 s

Sampling 20 from 1000000
Using iterSample 1.9343 s
Using sample_from_iterable 0.4907 s
Using iter_sample_fast 1.9533 s
Using iter_sample_easy 0.0211 s
Using iter_sample_dist_naive 0.0319 s

Sampling 20 from 10000000
Using iterSample 18.6686 s
Using sample_from_iterable 4.8120 s
Using iter_sample_fast 19.3525 s
Using iter_sample_easy 0.3162 s
Using iter_sample_dist_naive 0.3210 s

Sampling 20 from 100000000
Using iter_sample_easy 2.8248 s
Using iter_sample_dist_naive 3.3817 s
于 2018-12-27T16:31:28.793 回答
1

如果人口规模n是已知的,这里有一些内存高效的代码在生成器上循环,只提取目标样本:

from random import sample
from itertools import count, compress

targets = set(sample(range(n), k=10))
for selection in compress(pop, map(targets.__contains__, count())):
    print(selection)

这会按照人口生成器生成的顺序输出选择。

该技术是使用标准库random.sample()随机选择目标索引进行选择。第二个like 确定给定索引是否在目标中,如果是,则从生成器中给出相应的值。

例如,给定目标{6, 2, 4}

0  1  2  3  4  5  6  7  8  9  10   ...  output of count()
F  F  T  F  T  F  T  F  F  F  F    ...  is the count in targets?
A  B  C  D  E  F  G  H  I  J  K    ...  output of the population generator
-  -  C  -  E  -  G  -  -  -  -    ...  selections emitted by compress

这种技术适用于循环一个太大而无法放入内存的语料库(否则,您可以直接在总体上 使用sample() )。

于 2019-10-30T07:47:56.060 回答
0

如果迭代器中的项目数是已知的(通过其他地方计算项目),另一种方法是:

def iter_sample(iterable, iterlen, samplesize):
    if iterlen < samplesize:
        raise ValueError("Sample larger than population.")
    indexes = set()
    while len(indexes) < samplesize:
        indexes.add(random.randint(0,iterlen))
    indexesiter = iter(sorted(indexes))
    current = indexesiter.next()
    ret = []
    for i, item in enumerate(iterable):
        if i == current:
            ret.append(item)
            try:
                current = indexesiter.next()
            except StopIteration:
                break
    random.shuffle(ret)
    return ret

我发现这更快,尤其是当 sampsize 相对于 iterlen 较小时。但是,当要求提供整体或接近整体的样本时,就会出现问题。

iter_sample (iterlen=10000, samplesize=100) 时间: (1, 'ms') iter_sample_fast (iterlen=10000, samplesize=100) 时间: (15, 'ms')

iter_sample (iterlen=1000000, samplesize=100) 时间: (65, 'ms') iter_sample_fast (iterlen=1000000, samplesize=100) 时间: (1477, 'ms')

iter_sample (iterlen=1000000, samplesize=1000) 时间: (64, 'ms') iter_sample_fast (iterlen=1000000, samplesize=1000) 时间: (1459, 'ms')

iter_sample (iterlen=1000000, samplesize=10000) 时间: (86, 'ms') iter_sample_fast (iterlen=1000000, samplesize=10000) 时间: (1480, 'ms')

iter_sample (iterlen=1000000, samplesize=100000) 时间: (388, 'ms') iter_sample_fast (iterlen=1000000, samplesize=100000) 时间: (1521, 'ms')

iter_sample (iterlen=1000000, samplesize=1000000) 时间: (25359, 'ms') iter_sample_fast (iterlen=1000000, samplesize=1000000) 时间: (2178, 'ms')

于 2014-09-14T00:21:27.683 回答
0

最快的方法,直到你知道生成器有多长(并且将渐近均匀分布)时,否则证明不是这样:

def gen_sample(generator_list, sample_size, iterlen):
    num = 0
    inds = numpy.random.random(iterlen) <= (sample_size * 1.0 / iterlen)
    results = []
    iterator = iter(generator_list)
    gotten = 0
    while gotten < sample_size: 
        try:
            b = iterator.next()
            if inds[num]: 
                results.append(b)
                gotten += 1
            num += 1    
        except: 
            num = 0
            iterator = iter(generator_list)
            inds = numpy.random.random(iterlen) <= ((sample_size - gotten) * 1.0 / iterlen)
    return results

它在小型迭代和大型迭代中都是最快的(可能介于两者之间)

# Huge
res = gen_sample(xrange(5000000), 200000, 5000000)
timing: 1.22s

# Small
z = gen_sample(xrange(10000), 1000, 10000) 
timing: 0.000441    
于 2014-10-19T12:47:06.783 回答
0

这是一个完全不同的变体,它使用一组作为一桶项目。它首先用pool项目启动桶,然后从桶中产生样本,从迭代器中替换它们,最后它排出桶的剩余部分。

HashWrapper用于隐藏set.

class HashWrapper(tuple):
    """Wrap unhashable type."""
    def __hash__(self):
        return id(self)


def randomize_iterator(data: Iterator, pool=100) -> Iterator:
    """
    Randomize an iterator.
    """

    bucket = set()
    iterator = iter(data)

    # Prime the bucket
    for _ in range(pool):
        try:
            bucket.add(HashWrapper(next(iterator)))
        except StopIteration:
            # We've drained the iterator
            break

    # Start picking from the bucket and replacing new items from the iterator
    for item in iterator:
        sample, = random.sample(bucket, 1)
        yield sample
        bucket.remove(sample)
        bucket.add(HashWrapper(item))

    # Drain the bucket
    yield from random.sample(bucket, len(bucket))
于 2020-06-11T04:44:04.713 回答