21

我有一堆键,每个键都有一个似然变量。我想随机选择其中一个键,但我希望不太可能(键、值)被选择而不是不太可能(更可能)的对象。我想知道您是否有任何建议,最好是我可以使用的现有 python 模块,否则我需要自己制作。

我已经检查了随机模块;它似乎没有提供这个。

我必须为 1000 组不同的对象做出数百万次这样的选择,每组包含 2,455 个对象。每个集合将相互交换对象,因此随机选择器需要是动态的。1000组2433个对象,即24.33亿个对象;低内存消耗至关重要。由于这些选择不是算法的主体,我需要这个过程非常快;CPU时间是有限的。

谢谢

更新:

好的,我试图明智地考虑您的建议,但时间是如此有限......

我查看了二叉搜索树方法,它似乎太冒险(复杂而复杂)。其他建议都类似于 ActiveState 配方。我拿了它并对其进行了一些修改,以期提高效率:

def windex(dict, sum, max):
    '''an attempt to make a random.choose() function that makes
    weighted choices accepts a dictionary with the item_key and
    certainty_value as a pair like:
    >>> x = [('one', 20), ('two', 2), ('three', 50)], the
    maximum certainty value (max) and the sum of all certainties.'''
    n = random.uniform(0, 1)
    sum = max*len(list)-sum 
    for key, certainty in dict.iteritems():
        weight = float(max-certainty)/sum
        if n < weight:
            break
        n = n - weight
    return key

我希望通过动态保持确定性和最大确定性的总和来提高效率。欢迎任何进一步的建议。你们为我节省了很多时间和精力,同时提高了我的效率,这太疯狂了。谢谢!谢谢!谢谢!

更新2:

我决定让它一次选择更多的选择来提高它的效率。这将在我的算法中导致可接受的精度损失,因为它本质上是动态的。无论如何,这就是我现在所拥有的:

def weightedChoices(dict, sum, max, choices=10):
    '''an attempt to make a random.choose() function that makes
    weighted choices accepts a dictionary with the item_key and
    certainty_value as a pair like:
    >>> x = [('one', 20), ('two', 2), ('three', 50)], the
    maximum certainty value (max) and the sum of all certainties.'''
    list = [random.uniform(0, 1) for i in range(choices)]
    (n, list) = relavate(list.sort())
    keys = []
    sum = max*len(list)-sum 
    for key, certainty in dict.iteritems():
        weight = float(max-certainty)/sum
        if n < weight:
            keys.append(key)
            if list: (n, list) = relavate(list)
            else: break
        n = n - weight
    return keys
def relavate(list):
    min = list[0]
    new = [l - min for l in list[1:]]
    return (min, new)

我还没试过。如果您有任何意见/建议,请不要犹豫。谢谢!

更新3:

我整天都在研究 Rex Logan 答案的任务定制版本。它实际上是一个特殊的字典类,而不是 2 个对象和权重数组;这使得事情变得相当复杂,因为 Rex 的代码会生成一个随机索引......我还编写了一个测试用例,它类似于我的算法中会发生的事情(但在我尝试之前我真的不知道!)。基本原则是:一个密钥被随机生成的次数越多,它再次生成的可能性就越小:

import random, time
import psyco
psyco.full()

class ProbDict():
    """
    Modified version of Rex Logans RandomObject class. The more a key is randomly
    chosen, the more unlikely it will further be randomly chosen. 
    """
    def __init__(self,keys_weights_values={}):
        self._kw=keys_weights_values
        self._keys=self._kw.keys()
        self._len=len(self._keys)
        self._findSeniors()
        self._effort = 0.15
        self._fails = 0
    def __iter__(self):
        return self.next()
    def __getitem__(self, key):
        return self._kw[key]
    def __setitem__(self, key, value):
        self.append(key, value)
    def __len__(self):
        return self._len
    def next(self):
        key=self._key()
        while key:
            yield key
            key = self._key()
    def __contains__(self, key):
        return key in self._kw
    def items(self):
        return self._kw.items()
    def pop(self, key):  
        try:
            (w, value) = self._kw.pop(key)
            self._len -=1
            if w == self._seniorW:
                self._seniors -= 1
                if not self._seniors:
                    #costly but unlikely:
                    self._findSeniors()
            return [w, value]
        except KeyError:
            return None
    def popitem(self):
        return self.pop(self._key())
    def values(self):
        values = []
        for key in self._keys:
            try:
                values.append(self._kw[key][1])
            except KeyError:
                pass
        return values
    def weights(self):
        weights = []
        for key in self._keys:
            try:
                weights.append(self._kw[key][0])
            except KeyError:
                pass
        return weights
    def keys(self, imperfect=False):
        if imperfect: return self._keys
        return self._kw.keys()
    def append(self, key, value=None):
        if key not in self._kw:
            self._len +=1
            self._kw[key] = [0, value]
            self._keys.append(key)
        else:
            self._kw[key][1]=value
    def _key(self):
        for i in range(int(self._effort*self._len)):
            ri=random.randint(0,self._len-1) #choose a random object
            rx=random.uniform(0,self._seniorW)
            rkey = self._keys[ri]
            try:
                w = self._kw[rkey][0]
                if rx >= w: # test to see if that is the value we want
                    w += 1
                    self._warnSeniors(w)
                    self._kw[rkey][0] = w
                    return rkey
            except KeyError:
                self._keys.pop(ri)
        # if you do not find one after 100 tries then just get a random one
        self._fails += 1 #for confirming effectiveness only
        for key in self._keys:
            if key in self._kw:
                w = self._kw[key][0] + 1
                self._warnSeniors(w)
                self._kw[key][0] = w
                return key
        return None
    def _findSeniors(self):
        '''this function finds the seniors, counts them and assess their age. It
        is costly but unlikely.'''
        seniorW = 0
        seniors = 0
        for w in self._kw.itervalues():
            if w >= seniorW:
                if w == seniorW:
                    seniors += 1
                else:
                    seniorsW = w
                    seniors = 1
        self._seniors = seniors
        self._seniorW = seniorW
    def _warnSeniors(self, w):
        #a weight can only be incremented...good
        if w >= self._seniorW:
            if w == self._seniorW:
                self._seniors+=1
            else:
                self._seniors = 1
                self._seniorW = w
def test():
    #test code
    iterations = 200000
    size = 2500
    nextkey = size 


    pd = ProbDict(dict([(i,[0,i]) for i in xrange(size)]))
    start = time.clock()
    for i in xrange(iterations):
        key=pd._key()
        w=pd[key][0]
        if random.randint(0,1+pd._seniorW-w):
            #the heavier the object, the more unlikely it will be removed
            pd.pop(key)
        probAppend = float(500+(size-len(pd)))/1000
        if random.uniform(0,1) < probAppend:
            nextkey+=1
            pd.append(nextkey)
    print (time.clock()-start)*1000/iterations, "msecs / iteration with", pd._fails, "failures /", iterations, "iterations"
    weights = pd.weights()
    weights.sort()
    print "avg weight:", float(sum(weights))/pd._len, max(weights), pd._seniorW, pd._seniors, len(pd), len(weights)
    print weights
test()

仍然欢迎任何意见。@Darius:你的二叉树对我来说太复杂太复杂了;而且我不认为它的叶子可以有效地去除......谢谢所有

4

12 回答 12

26

这个 activestate 配方提供了一种易于遵循的方法,特别是评论中的版本,不需要您预先规范化权重:

import random

def weighted_choice(items):
    """items is a list of tuples in the form (item, weight)"""
    weight_total = sum((item[1] for item in items))
    n = random.uniform(0, weight_total)
    for item, weight in items:
        if n < weight:
            return item
        n = n - weight
    return item

如果您有大量项目,这将很慢。在这种情况下,二分搜索可能会更好......但编写起来也会更复杂,如果你的样本量很小,收益会很小。 如果您想遵循该路线,这里是 python 中的二进制搜索方法的示例。

(我建议在您的数据集上对这两种方法进行一些快速的性能测试。这种算法的不同方法的性能通常有点不直观。)


编辑:我接受了自己的建议,因为我很好奇,并做了一些测试。

我比较了四种方法:

上面的 weighted_choice 函数。

像这样的二分搜索选择函数:

def weighted_choice_bisect(items):
    added_weights = []
    last_sum = 0

    for item, weight in items:
        last_sum += weight
        added_weights.append(last_sum)

    return items[bisect.bisect(added_weights, random.random() * last_sum)][0]

1的编译版本:

def weighted_choice_compile(items):
    """returns a function that fetches a random item from items

    items is a list of tuples in the form (item, weight)"""
    weight_total = sum((item[1] for item in items))
    def choice(uniform = random.uniform):
        n = uniform(0, weight_total)
        for item, weight in items:
            if n < weight:
                return item
            n = n - weight
        return item
    return choice

2的编译版本:

def weighted_choice_bisect_compile(items):
    """Returns a function that makes a weighted random choice from items."""
    added_weights = []
    last_sum = 0

    for item, weight in items:
        last_sum += weight
        added_weights.append(last_sum)

    def choice(rnd=random.random, bis=bisect.bisect):
        return items[bis(added_weights, rnd() * last_sum)][0]
    return choice

然后我建立了一个很大的选择列表,如下所示:

choices = [(random.choice("abcdefg"), random.uniform(0,50)) for i in xrange(2500)]

还有一个过于简单的分析功能:

def profiler(f, n, *args, **kwargs):
    start = time.time()
    for i in xrange(n):
        f(*args, **kwargs)
    return time.time() - start

结果:

(对函数进行 1000 次调用所需的秒数。)

  • 简单未编译:0.918624162674
  • 二进制未编译:1.01497793198
  • 简单编译:0.287325024605
  • 二进制编译:0.00327413797379

“编译”结果包括编译一次选择函数所花费的平均时间。(我计时了 1,000 次编译,然后将该时间除以 1,000,并将结果添加到选择函数时间。)

所以:如果你有一个很少改变的项目+权重列表,那么二进制编译方法是迄今为止最快的。

于 2009-02-08T20:26:28.310 回答
6

在对原始帖子的评论中,Nicholas Leonard 建议交换和采样都需要快速。这是该案例的一个想法;我没试过。

如果只有采样必须快速,我们可以使用一个值数组以及它们的概率的运行总和,并对运行总和进行二进制搜索(key 是一个统一的随机数) - O(log( n)) 操作。但是交换需要更新在交换条目之后出现的所有运行和值 - 一个 O(n) 操作。(你能选择只交换清单末尾附近的物品吗?我假设不会。)

因此,让我们在两个操作中都瞄准 O(log(n))。而不是数组,而是为每个要从中采样的集合保留一棵二叉树。叶子保存样本值及其(未归一化的)概率。一个分支节点保存其子节点的总概率。

采样时,生成一个x介于 0 和根的总概率之间的均匀随机数,然后下降树。在每个分支,如果左孩子的总概率为 ,则选择左孩子<= x。否则减去左孩子的概率x并向右走。返回您达到的叶子值。

要进行交换,请从其树中移除叶子并调整通向它的分支(降低它们的总概率,并删除任何单个子分支节点)。将叶子插入目标树:您可以选择放置它的位置,因此请保持平衡。在每个级别选择一个随机孩子可能就足够了——这就是我要开始的地方。增加每个父节点的概率,回溯到根节点。

现在采样和交换平均都是 O(log(n))。(如果您需要保证平衡,一个简单的方法是在分支节点中添加另一个字段来保存整个子树中的叶子数。添加叶子时,在每个级别选择叶子较少的孩子。这留下了一个可能性树仅因删除而变得不平衡;如果集合之间的流量相当均匀,这不会成为问题,但如果是,则在删除期间使用遍历中每个节点的叶子计数信息选择旋转。)

更新:根据要求,这是一个基本的实现。根本没调。用法:

>>> t1 = build_tree([('one', 20), ('two', 2), ('three', 50)])
>>> t1
Branch(Leaf(20, 'one'), Branch(Leaf(2, 'two'), Leaf(50, 'three')))
>>> t1.sample()
Leaf(50, 'three')
>>> t1.sample()
Leaf(20, 'one')
>>> t2 = build_tree([('four', 10), ('five', 30)])
>>> t1a, t2a = transfer(t1, t2)
>>> t1a
Branch(Leaf(20, 'one'), Leaf(2, 'two'))
>>> t2a
Branch(Leaf(10, 'four'), Branch(Leaf(30, 'five'), Leaf(50, 'three')))

代码:

import random

def build_tree(pairs):
    tree = Empty()
    for value, weight in pairs:
        tree = tree.add(Leaf(weight, value))
    return tree

def transfer(from_tree, to_tree):
    """Given a nonempty tree and a target, move a leaf from the former to
    the latter. Return the two updated trees."""
    leaf, from_tree1 = from_tree.extract()
    return from_tree1, to_tree.add(leaf)

class Tree:
    def add(self, leaf):
        "Return a new tree holding my leaves plus the given leaf."
        abstract
    def sample(self):
        "Pick one of my leaves at random in proportion to its weight."
        return self.sampling(random.uniform(0, self.weight))
    def extract(self):
        """Pick one of my leaves and return it along with a new tree
        holding my leaves minus that one leaf."""
        return self.extracting(random.uniform(0, self.weight))        

class Empty(Tree):
    weight = 0
    def __repr__(self):
        return 'Empty()'
    def add(self, leaf):
        return leaf
    def sampling(self, weight):
        raise Exception("You can't sample an empty tree")
    def extracting(self, weight):
        raise Exception("You can't extract from an empty tree")

class Leaf(Tree):
    def __init__(self, weight, value):
        self.weight = weight
        self.value = value
    def __repr__(self):
        return 'Leaf(%r, %r)' % (self.weight, self.value)
    def add(self, leaf):
        return Branch(self, leaf)
    def sampling(self, weight):
        return self
    def extracting(self, weight):
        return self, Empty()

def combine(left, right):
    if isinstance(left, Empty): return right
    if isinstance(right, Empty): return left
    return Branch(left, right)

class Branch(Tree):
    def __init__(self, left, right):
        self.weight = left.weight + right.weight
        self.left = left
        self.right = right
    def __repr__(self):
        return 'Branch(%r, %r)' % (self.left, self.right)
    def add(self, leaf):
        # Adding to a random branch as a clumsy way to keep an
        # approximately balanced tree.
        if random.random() < 0.5:
            return combine(self.left.add(leaf), self.right)
        return combine(self.left, self.right.add(leaf))
    def sampling(self, weight):
        if weight < self.left.weight:
            return self.left.sampling(weight)
        return self.right.sampling(weight - self.left.weight)
    def extracting(self, weight):
        if weight < self.left.weight:
            leaf, left1 = self.left.extracting(weight)
            return leaf, combine(left1, self.right)
        leaf, right1 = self.right.extracting(weight - self.left.weight)
        return leaf, combine(self.left, right1)

更新 2:回答另一个问题时,Jason Orendorff 指出二叉树可以通过像经典堆结构一样在数组中表示它们来保持完美平衡。(这也节省了在指针上花费的空间。)请参阅我对该答案的评论,了解如何使他的代码适应这个问题。

于 2009-02-09T01:12:33.923 回答
2

你想给每个物体一个重量。权重越大,发生的可能性就越大。更准确地说 probx =weight/sum_all_weights。

然后生成一个 0 到 sum_all_weights 范围内的随机数,并将其映射到每个对象。

此代码允许您生成随机索引,并在创建对象时映射以提高速度。如果您的所有对象集具有相同的分布,那么您可以只使用一个 RandomIndex 对象。

import random

class RandomIndex:
    def __init__(self, wlist):
        self._wi=[]
        self._rsize=sum(wlist)-1
        self._m={}
        i=0
        s=wlist[i]
        for n in range(self._rsize+1):
            if n == s:
                i+=1
                s+=wlist[i]
            self._m[n]=i    

    def i(self):
        rn=random.randint(0,self._rsize)
        return self._m[rn]


sx=[1,2,3,4]


wx=[1,10,100,1000] #weight list
ri=RandomIndex(wx)

cnt=[0,0,0,0]

for i in range(1000):
    cnt[ri.i()] +=1  #keep track of number of times each index was generated

print(cnt)  
于 2009-02-08T20:08:31.440 回答
2

我会用这个食谱。您需要为对象添加权重,但这只是一个简单的比率,并将它们放在元组列表中(对象,信念/(信念总和))。使用列表推导应该很容易做到这一点。

于 2009-02-08T20:16:43.853 回答
2

我建议你将这个加权随机的 PHP 实现移植到 Python。特别是,基于二分搜索的第二种算法有助于解决您的速度问题。

于 2009-02-08T20:20:31.083 回答
2

这是一种经典的方法,在伪代码中,random.random() 为您提供从 0 到 1 的随机浮点数。

let z = sum of all the convictions
let choice = random.random() * z 
iterate through your objects:
    choice = choice - the current object's conviction
    if choice <= 0, return this object
return the last object

例如:假设您有两个对象,一个重量为 2,另一个重量为 4。您生成一个从 0 到 6 的数字。如果choice介于 0 和 2 之间,这将以 2/6 = 1/3 的概率发生,那么它将被减去 2 并选择第一个对象。如果选择在 2 和 6 之间,这将以 4/6 = 2/3 的概率发生,那么第一个减法仍然会有 > 0 的选择,第二个减法将使第二个对象被选中。

于 2009-02-08T20:30:14.570 回答
2

大约3年后...

如果你使用 numpy,也许最简单的选项是使用np.random.choice,它接受一个可能值的列表,以及与每个值关联的可选概率序列:

import numpy as np

values = ('A', 'B', 'C', 'D')
weights = (0.5, 0.1, 0.2, 0.2)

print ''.join(np.random.choice(values, size=60, replace=True, p=weights))
# ACCADAACCDACDBACCADCAAAAAAADACCDCAADDDADAAACCAAACBAAADCADABA
于 2014-01-19T00:25:27.990 回答
1

最简单的做法是使用 random.choice(它使用均匀分布)并改变源集合中对象的出现频率。

>>> random.choice([1, 2, 3, 4])
4

...与:

>>> random.choice([1, 1, 1, 1, 2, 2, 2, 3, 3, 4])
2

因此,您的对象可能具有基本发生率 (n),并且根据定罪率将 1 到 n 个对象添加到源集合中。这个方法真的很简单;但是,如果不同对象的数量很大或需要非常细粒度的定罪率,它可能会产生很大的开销。

或者,如果您使用均匀分布生成多个随机数并将它们相加,则出现在平均值附近的数字比出现在极端值附近的数字更有可能(想想掷两个骰子和得到 7 对 12 或 2 的概率)。然后,您可以按定罪率对对象进行排序,并使用多个骰子生成一个数字,用于计算和索引对象。使用接近平均值的数字来索引低信念对象和接近极端的数字来索引高信念项目。您可以通过更改“边数”和“骰子”的数量来改变选择给定对象的精确概率(将对象放入桶中并使用边数较少的骰子可能更简单,而不是试图将每个对象与特定结果相关联):

>>> die = lambda sides : random.randint(1, sides)
>>> die(6)
3
>>> die(6) + die(6) + die(6)
10
于 2009-02-08T20:01:37.323 回答
1

一个非常简单的方法是为每个值设置权重,它不需要太多内存。

您可能可以使用哈希/字典来执行此操作。

您要做的是将随机数x与您想要选择的整个集合相乘和相加,然后将该结果除以集合中的对象数。

伪代码:

objectSet = [(object1, weight1), ..., (objectN, weightN)]
sum = 0
rand = random()
for obj, weight in objectSet
    sum = sum+weight*rand
choice = objectSet[floor(sum/objectSet.size())]

编辑:我只是想到我的代码在非常大的集合中会有多慢(它是 O(n))。下面的伪代码是 O(log(n)),基本上是使用二分查找。

objectSet = [(object1, weight1), ..., (objectN, weightN)]
sort objectSet from less to greater according to weights
choice = random() * N # where N is the number of objects in objectSet
do a binary search until you have just one answer

Python 中的二进制搜索的实现遍布全网,这里不再赘述。

于 2009-02-08T20:35:52.467 回答
1

这是特殊概率分布的更好答案,Rex Logan 的答案似乎是针对的。分布是这样的:每个对象都有一个介于 0 和 100 之间的整数权重,其概率与其权重成正比。由于这是目前公认的答案,我想这值得考虑。

所以保留一个包含 101 个 bin 的数组。每个 bin 都包含具有特定重量的所有对象的列表。每个箱子还知道其所有对象的重量。

取样:按其总重量的比例随机挑选一个箱子。(为此使用标准配方之一——线性或二分搜索。)然后从垃圾箱中随机均匀地挑选一个对象。

转移对象:将其从 bin 中移除,将其放入目标的 bin 中,并更新两个 bin 的权重。(如果您使用二进制搜索进行采样,您还必须更新使用的运行总和。这仍然相当快,因为​​没有多少箱。)

于 2009-02-09T20:31:49.490 回答
1

(一年后) Walker 对不同概率的随机物体的别名方法非常快,非常简单

于 2010-01-12T17:58:54.287 回答
0

对于非非常大的数字,我需要更快的功能。所以在这里,在 Visual C++ 中:

#undef _DEBUG // disable linking with python25_d.dll
#include <Python.h>
#include <malloc.h>
#include <stdlib.h>

static PyObject* dieroll(PyObject *, PyObject *args)
{
    PyObject *list;
    if (!PyArg_ParseTuple(args, "O:decompress", &list))
        return NULL;

    if (!PyList_Check(list)) 
        return PyErr_Format(PyExc_TypeError, "list of numbers expected ('%s' given)", list->ob_type->tp_name), NULL;

    int size = PyList_Size(list);

    if (size < 1)
        return PyErr_Format(PyExc_TypeError, "got empty list"), NULL;

    long *array = (long*)alloca(size*sizeof(long));

    long sum = 0;
    for (int i = 0; i < size; i++) {
        PyObject *o = PyList_GetItem(list, i);

        if (!PyInt_Check(o))
            return PyErr_Format(PyExc_TypeError, "list of ints expected ('%s' found)", o->ob_type->tp_name), NULL;
        long n = PyInt_AsLong(o);
        if (n == -1 && PyErr_Occurred())
            return NULL;
        if (n < 0)
            return PyErr_Format(PyExc_TypeError, "list of positive ints expected (negative found)"), NULL;

        sum += n; //NOTE: integer overflow
        array[i] = sum;
    }

    if (sum <= 0)
        return PyErr_Format(PyExc_TypeError, "sum of numbers is not positive"), NULL;

    int r = rand() * (sum-1) / RAND_MAX; //NOTE: rand() may be too small (0x7fff).    rand() * sum may result in integer overlow.

    assert(array[size-1] == sum);
    assert(r < sum && r < array[size-1]);
    for (int i = 0; i < size; ++i)
    {
        if (r < array[i])
            return PyInt_FromLong(i);
    }
    return PyErr_Format(PyExc_TypeError, "internal error."), NULL;
}

static PyMethodDef module_methods[] = 
{
    {"dieroll", (PyCFunction)dieroll, METH_VARARGS, "random index, beased on weights" },
    {NULL}  /* Sentinel */
};

PyMODINIT_FUNC initdieroll(void) 
{
    PyObject *module = Py_InitModule3("dieroll", module_methods, "dieroll");
    if (module == NULL)
        return;
}
于 2009-06-27T21:12:33.557 回答