75

没有任何权重(等概率)的选择在这里得到了很好的描述。

我想知道是否有办法将这种方法转换为加权方法。

我也对其他方法感兴趣。

更新:无需更换的采样

4

14 回答 14

71

如果抽样是有替换的,你可以使用这个算法(在这里用 Python 实现):

import random

items = [(10, "low"),
         (100, "mid"),
         (890, "large")]

def weighted_sample(items, n):
    total = float(sum(w for w, v in items))
    i = 0
    w, v = items[0]
    while n:
        x = total * (1 - random.random() ** (1.0 / n))
        total -= x
        while x > w:
            x -= w
            i += 1
            w, v = items[i]
        w -= x
        yield v
        n -= 1

这是 O( n + m ),其中m是项目数。

为什么这行得通?它基于以下算法:

def n_random_numbers_decreasing(v, n):
    """Like reversed(sorted(v * random() for i in range(n))),
    but faster because we avoid sorting."""
    while n:
        v *= random.random() ** (1.0 / n)
        yield v
        n -= 1

该功能weighted_sample就是这种算法与items列表的遍历相融合,以挑选出由这些随机数选择的项目。

这反过来又有效,因为n 个随机数 0.. v都恰好小于z的概率是P = ( z/v ) n。求解z,得到z = vP 1/ n。用一个随机数代替P选择具有正确分布的最大数;我们可以重复这个过程来选择所有其他数字。

如果采样没有放回,你可以把所有的项放到一个二叉堆中,每个节点缓存该子堆中所有项的权重总和。构建堆是 O( m )。从堆中选择一个随机项,尊重权重,是 O(log m )。删除该项目并更新缓存的总数也是 O(log m )。因此,您可以在 O( m + n log m ) 时间内选择n 个项目。

(注意:这里的“权重”是指每次选择一个元素时,其余可能性的选择概率与其权重成正比。这并不意味着元素出现在输出中的可能性与其权重成正比。)

这是一个实现,大量评论:

import random

class Node:
    # Each node in the heap has a weight, value, and total weight.
    # The total weight, self.tw, is self.w plus the weight of any children.
    __slots__ = ['w', 'v', 'tw']
    def __init__(self, w, v, tw):
        self.w, self.v, self.tw = w, v, tw

def rws_heap(items):
    # h is the heap. It's like a binary tree that lives in an array.
    # It has a Node for each pair in `items`. h[1] is the root. Each
    # other Node h[i] has a parent at h[i>>1]. Each node has up to 2
    # children, h[i<<1] and h[(i<<1)+1].  To get this nice simple
    # arithmetic, we have to leave h[0] vacant.
    h = [None]                          # leave h[0] vacant
    for w, v in items:
        h.append(Node(w, v, w))
    for i in range(len(h) - 1, 1, -1):  # total up the tws
        h[i>>1].tw += h[i].tw           # add h[i]'s total to its parent
    return h

def rws_heap_pop(h):
    gas = h[1].tw * random.random()     # start with a random amount of gas

    i = 1                     # start driving at the root
    while gas >= h[i].w:      # while we have enough gas to get past node i:
        gas -= h[i].w         #   drive past node i
        i <<= 1               #   move to first child
        if gas >= h[i].tw:    #   if we have enough gas:
            gas -= h[i].tw    #     drive past first child and descendants
            i += 1            #     move to second child
    w = h[i].w                # out of gas! h[i] is the selected node.
    v = h[i].v

    h[i].w = 0                # make sure this node isn't chosen again
    while i:                  # fix up total weights
        h[i].tw -= w
        i >>= 1
    return v

def random_weighted_sample_no_replacement(items, n):
    heap = rws_heap(items)              # just make a heap...
    for i in range(n):
        yield rws_heap_pop(heap)        # and pop n items off it.
于 2010-01-27T18:58:01.600 回答
44

如果抽样是有放回的,使用轮盘赌选择技术(常用于遗传算法):

  1. 对权重进行排序
  2. 计算累积权重
  3. 选择一个随机数[0,1]*totalWeight
  4. 找到这个数字落入的区间
  5. 选择具有相应区间的元素
  6. 重复k次数

替代文字

如果采样没有替换,您可以通过在每次迭代后从列表中删除所选元素来调整上述技术,然后重新归一化权重,使其总和为 1(有效的概率分布函数)

于 2010-01-28T02:24:34.820 回答
31

我知道这是一个非常古老的问题,但我认为如果你应用一点数学知识,可以在 O(n) 时间内做到这一点!

指数分布有两个非常有用的特性。

  1. 给定来自具有不同速率参数的不同指数分布的 n 个样本,给定样本最小的概率等于其速率参数除以所有速率参数的总和。

  2. 它是“无记忆的”。因此,如果您已经知道最小值,那么任何剩余元素是第二个到最小值的概率与如果真正的最小值被删除(并且从未生成),则该元素将是新元素的概率相同分钟。这似乎很明显,但我认为由于一些条件概率问题,其他分布可能并非如此。

使用事实1,我们知道选择单个元素可以通过生成这些速率参数等于权重的指数分布样本,然后选择具有最小值的那个来完成。

使用事实 2,我们知道我们不必重新生成指数样本。相反,只需为每个元素生成一个,并获取样本最少的 k 个元素。

可以在 O(n) 中找到最低的 k。使用Quickselect算法找到第 k 个元素,然后简单地再次遍历所有元素并输出所有低于第 k 个的元素。

一个有用的说明:如果您无法立即访问库以生成指数分布样本,则可以通过以下方式轻松完成:-ln(rand())/weight

于 2015-05-13T23:35:07.377 回答
3

我已经在 Ruby 中做到了这一点

https://github.com/fl00r/pickup

require 'pickup'
pond = {
  "selmon"  => 1,
  "carp" => 4,
  "crucian"  => 3,
  "herring" => 6,
  "sturgeon" => 8,
  "gudgeon" => 10,
  "minnow" => 20
}
pickup = Pickup.new(pond, uniq: true)
pickup.pick(3)
#=> [ "gudgeon", "herring", "minnow" ]
pickup.pick
#=> "herring"
pickup.pick
#=> "gudgeon"
pickup.pick
#=> "sturgeon"
于 2012-06-01T18:07:28.697 回答
1

如果要生成带有替换的大型随机整数数组,可以使用分段线性插值。例如,使用 NumPy/SciPy:

import numpy
import scipy.interpolate

def weighted_randint(weights, size=None):
    """Given an n-element vector of weights, randomly sample
    integers up to n with probabilities proportional to weights"""
    n = weights.size
    # normalize so that the weights sum to unity
    weights = weights / numpy.linalg.norm(weights, 1)
    # cumulative sum of weights
    cumulative_weights = weights.cumsum()
    # piecewise-linear interpolating function whose domain is
    # the unit interval and whose range is the integers up to n
    f = scipy.interpolate.interp1d(
            numpy.hstack((0.0, weights)),
            numpy.arange(n + 1), kind='linear')
    return f(numpy.random.random(size=size)).astype(int)

如果您想在不更换的情况下进行采样,这将无效。

于 2011-05-10T00:15:01.393 回答
1

这是geodns的 Go 实现:

package foo

import (
    "log"
    "math/rand"
)

type server struct {
    Weight int
    data   interface{}
}

func foo(servers []server) {
    // servers list is already sorted by the Weight attribute

    // number of items to pick
    max := 4

    result := make([]server, max)

    sum := 0
    for _, r := range servers {
        sum += r.Weight
    }

    for si := 0; si < max; si++ {
        n := rand.Intn(sum + 1)
        s := 0

        for i := range servers {
            s += int(servers[i].Weight)
            if s >= n {
                log.Println("Picked record", i, servers[i])
                sum -= servers[i].Weight
                result[si] = servers[i]

                // remove the server from the list
                servers = append(servers[:i], servers[i+1:]...)
                break
            }
        }
    }

    return result
}
于 2012-08-26T07:55:43.493 回答
1

如果您想从加权集合中选择 x 个元素而不进行替换,以便以与其权重成比例的概率选择元素:

import random

def weighted_choose_subset(weighted_set, count):
    """Return a random sample of count elements from a weighted set.

    weighted_set should be a sequence of tuples of the form 
    (item, weight), for example:  [('a', 1), ('b', 2), ('c', 3)]

    Each element from weighted_set shows up at most once in the
    result, and the relative likelihood of two particular elements
    showing up is equal to the ratio of their weights.

    This works as follows:

    1.) Line up the items along the number line from [0, the sum
    of all weights) such that each item occupies a segment of
    length equal to its weight.

    2.) Randomly pick a number "start" in the range [0, total
    weight / count).

    3.) Find all the points "start + n/count" (for all integers n
    such that the point is within our segments) and yield the set
    containing the items marked by those points.

    Note that this implementation may not return each possible
    subset.  For example, with the input ([('a': 1), ('b': 1),
    ('c': 1), ('d': 1)], 2), it may only produce the sets ['a',
    'c'] and ['b', 'd'], but it will do so such that the weights
    are respected.

    This implementation only works for nonnegative integral
    weights.  The highest weight in the input set must be less
    than the total weight divided by the count; otherwise it would
    be impossible to respect the weights while never returning
    that element more than once per invocation.
    """
    if count == 0:
        return []

    total_weight = 0
    max_weight = 0
    borders = []
    for item, weight in weighted_set:
        if weight < 0:
            raise RuntimeError("All weights must be positive integers")
        # Scale up weights so dividing total_weight / count doesn't truncate:
        weight *= count
        total_weight += weight
        borders.append(total_weight)
        max_weight = max(max_weight, weight)

    step = int(total_weight / count)

    if max_weight > step:
        raise RuntimeError(
            "Each weight must be less than total weight / count")

    next_stop = random.randint(0, step - 1)

    results = []
    current = 0
    for i in range(count):
        while borders[current] <= next_stop:
            current += 1
        results.append(weighted_set[current][0])
        next_stop += step

    return results
于 2014-11-20T21:13:50.740 回答
0

在您链接到的问题中,凯尔的解决方案可以进行简单的概括。扫描列表并将总权重相加。那么选择一个元素的概率应该是:

1 - (1 - (#needed/(剩余重量)))/(n 时的重量)。访问一个节点后,从总数中减去它的权重。此外,如果您需要 n 并留下 n,则必须明确停止。

您可以检查所有重量为 1 的东西,这简化了 kyle 的解决方案。

编辑:(不得不重新考虑两倍可能意味着什么)

于 2010-01-26T16:51:22.420 回答
0

这个完全是 O(n) 并且没有过多的内存使用。我相信这是一个聪明而有效的解决方案,易于移植到任何语言。前两行只是在 Drupal 中填充样本数据。

function getNrandomGuysWithWeight($numitems){
  $q = db_query('SELECT id, weight FROM theTableWithTheData');
  $q = $q->fetchAll();

  $accum = 0;
  foreach($q as $r){
    $accum += $r->weight;
    $r->weight = $accum;
  }

  $out = array();

  while(count($out) < $numitems && count($q)){
    $n = rand(0,$accum);
    $lessaccum = NULL;
    $prevaccum = 0;
    $idxrm = 0;
    foreach($q as $i=>$r){
      if(($lessaccum == NULL) && ($n <= $r->weight)){
        $out[] = $r->id;
        $lessaccum = $r->weight- $prevaccum;
        $accum -= $lessaccum;
        $idxrm = $i;
      }else if($lessaccum){
        $r->weight -= $lessaccum;
      }
      $prevaccum = $r->weight;
    }
    unset($q[$idxrm]);
  }
  return $out;
}
于 2013-08-14T22:28:23.383 回答
0

我在这里放了一个简单的选择 1 个项目的解决方案,您可以轻松地将其扩展为 k 个项目(Java 风格):

double random = Math.random();
double sum = 0;
for (int i = 0; i < items.length; i++) {
    val = items[i];
    sum += val.getValue();
    if (sum > random) {
        selected = val;
        break;
    }
}
于 2014-01-29T16:13:13.653 回答
0

我在这里实现了一个类似于 Jason Orendorff 在 Rust 中的想法的算法。我的版本还支持批量操作:及时从数据结构中插入和删除(当您想删除由其 id 给出的一堆项目时,而不是通过加权选择路径),O(m + log n)其中 m 是要删除的项目数,n 是存储的项目数。

于 2016-05-15T17:10:18.977 回答
0

没有递归替换的采样 - c# 中优雅且非常简短的解决方案

//有多少种方式我们可以从60个学生中选择4个,这样每次我们选择不同的4个

class Program
{
    static void Main(string[] args)
    {
        int group = 60;
        int studentsToChoose = 4;

        Console.WriteLine(FindNumberOfStudents(studentsToChoose, group));
    }

    private static int FindNumberOfStudents(int studentsToChoose, int group)
    {
        if (studentsToChoose == group || studentsToChoose == 0)
            return 1;

        return FindNumberOfStudents(studentsToChoose, group - 1) + FindNumberOfStudents(studentsToChoose - 1, group - 1);

    }
}
于 2018-01-18T19:21:40.503 回答
0

我只是花了几个小时试图在没有替换的情况下支持底层采样算法,这个主题比我最初想象的要复杂。真令人兴奋!为了未来读者的利益(祝您有美好的一天!)我在这里记录了我的见解,包括一个现成的函数,它尊重下面给定的包含概率。可以在此处找到各种方法的快速数学概述:Tillé:具有相等或不等概率的采样算法。例如,Jason 的方法可以在第 46 页找到。他的方法需要注意的是,权重与包含概率不成比例,如文档中所述。实际上,-th 包含概率可以递归计算如下:

def inclusion_probability(i, weights, k):
    """
        Computes the inclusion probability of the i-th element
        in a randomly sampled k-tuple using Jason's algorithm
        (see https://stackoverflow.com/a/2149533/7729124)
    """
    if k <= 0: return 0
    cum_p = 0
    for j, weight in enumerate(weights):
        # compute the probability of j being selected considering the weights
        p = weight / sum(weights)

        if i == j:
            # if this is the target element, we don't have to go deeper,
            # since we know that i is included
            cum_p += p
        else:
            # if this is not the target element, than we compute the conditional
            # inclusion probability of i under the constraint that j is included
            cond_i = i if i < j else i-1
            cond_weights = weights[:j] + weights[j+1:]
            cond_p = inclusion_probability(cond_i, cond_weights, k-1)
            cum_p += p * cond_p
    return cum_p

我们可以通过比较来检查上面函数的有效性

In : for i in range(3): print(i, inclusion_probability(i, [1,2,3], 2))
0 0.41666666666666663
1 0.7333333333333333
2 0.85

In : import collections, itertools
In : sample_tester = lambda f: collections.Counter(itertools.chain(*(f() for _ in range(10000))))
In : sample_tester(lambda: random_weighted_sample_no_replacement([(1,'a'),(2,'b'),(3,'c')],2))
Out: Counter({'a': 4198, 'b': 7268, 'c': 8534})

指定包含概率的一种方法(也在上面的文档中提出)是计算它们的权重。手头问题的整个复杂性源于这样一个事实,即人们不能直接这样做,因为基本上必须反转递归公式,象征性地我声称这是不可能的。从数值上讲,它可以使用所有类型的方法来完成,例如牛顿法。然而,使用普通 Python 反转雅可比行列式的复杂性很快就会变得难以忍受,我真的建议numpy.random.choice在这种情况下进行研究。

幸运的是,有一种使用纯 Python 的方法可能对您的目的具有足够的性能,也可能不足以满足您的目的,如果没有那么多不同的权重,它会很好地工作。您可以在第 75 和 76 页找到该算法。它通过将采样过程分成具有相同包含概率的部分来工作,即我们可以random.sample再次使用!我不打算在这里解释原理,因为第 69 页很好地介绍了基础知识。下面是代码,希望有足够多的注释:

def sample_no_replacement_exact(items, k, best_effort=False, random_=None, ε=1e-9):
    """
        Returns a random sample of k elements from items, where items is a list of
        tuples (weight, element). The inclusion probability of an element in the
        final sample is given by
           k * weight / sum(weights).

        Note that the function raises if a inclusion probability cannot be
        satisfied, e.g the following call is obviously illegal:
           sample_no_replacement_exact([(1,'a'),(2,'b')],2)
        Since selecting two elements means selecting both all the time,
        'b' cannot be selected twice as often as 'a'. In general it can be hard to
        spot if the weights are illegal and the function does *not* always raise
        an exception in that case. To remedy the situation you can pass
        best_effort=True which redistributes the inclusion probability mass
        if necessary. Note that the inclusion probabilities will change
        if deemed necessary.

        The algorithm is based on the splitting procedure on page 75/76 in:
        http://www.eustat.eus/productosServicios/52.1_Unequal_prob_sampling.pdf
        Additional information can be found here:
        https://stackoverflow.com/questions/2140787/

        :param items: list of tuples of type weight,element
        :param k: length of resulting sample
        :param best_effort: fix inclusion probabilities if necessary,
                            (optional, defaults to False)
        :param random_: random module to use (optional, defaults to the
                        standard random module)
        :param ε: fuzziness parameter when testing for zero in the context
                  of floating point arithmetic (optional, defaults to 1e-9)
        :return: random sample set of size k
        :exception: throws ValueError in case of bad parameters,
                    throws AssertionError in case of algorithmic impossibilities
    """
    # random_ defaults to the random submodule
    if not random_:
        random_ = random

    # special case empty return set
    if k <= 0:
        return set()

    if k > len(items):
        raise ValueError("resulting tuple length exceeds number of elements (k > n)")

    # sort items by weight
    items = sorted(items, key=lambda item: item[0])

    # extract the weights and elements
    weights, elements = list(zip(*items))

    # compute the inclusion probabilities (short: π) of the elements
    scaling_factor = k / sum(weights)
    π = [scaling_factor * weight for weight in weights]

    # in case of best_effort: if a inclusion probability exceeds 1,
    # try to rebalance the probabilities such that:
    # a) no probability exceeds 1,
    # b) the probabilities still sum to k, and
    # c) the probability masses flow from top to bottom:
    #    [0.2, 0.3, 1.5] -> [0.2, 0.8, 1]
    # (remember that π is sorted)
    if best_effort and π[-1] > 1 + ε:
        # probability mass we still we have to distribute
        debt = 0.
        for i in reversed(range(len(π))):
            if π[i] > 1.:
                # an 'offender', take away excess
                debt += π[i] - 1.
                π[i] = 1.
            else:
                # case π[i] < 1, i.e. 'save' element
                # maximum we can transfer from debt to π[i] and still not
                # exceed 1 is computed by the minimum of:
                # a) 1 - π[i], and
                # b) debt
                max_transfer = min(debt, 1. - π[i])
                debt -= max_transfer
                π[i] += max_transfer
        assert debt < ε, "best effort rebalancing failed (impossible)"

    # make sure we are talking about probabilities
    if any(not (0 - ε <= π_i <= 1 + ε) for π_i in π):
        raise ValueError("inclusion probabilities not satisfiable: {}" \
                         .format(list(zip(π, elements))))

    # special case equal probabilities
    # (up to fuzziness parameter, remember that π is sorted)
    if π[-1] < π[0] + ε:
        return set(random_.sample(elements, k))

    # compute the two possible lambda values, see formula 7 on page 75
    # (remember that π is sorted)
    λ1 = π[0] * len(π) / k
    λ2 = (1 - π[-1]) * len(π) / (len(π) - k)
    λ = min(λ1, λ2)

    # there are two cases now, see also page 69
    # CASE 1
    # with probability λ we are in the equal probability case
    # where all elements have the same inclusion probability
    if random_.random() < λ:
        return set(random_.sample(elements, k))

    # CASE 2:
    # with probability 1-λ we are in the case of a new sample without
    # replacement problem which is strictly simpler,
    # it has the following new probabilities (see page 75, π^{(2)}):
    new_π = [
        (π_i - λ * k / len(π))
        /
        (1 - λ)
        for π_i in π
    ]
    new_items = list(zip(new_π, elements))

    # the first few probabilities might be 0, remove them
    # NOTE: we make sure that floating point issues do not arise
    #       by using the fuzziness parameter
    while new_items and new_items[0][0] < ε:
        new_items = new_items[1:]

    # the last few probabilities might be 1, remove them and mark them as selected
    # NOTE: we make sure that floating point issues do not arise
    #       by using the fuzziness parameter
    selected_elements = set()
    while new_items and new_items[-1][0] > 1 - ε:
        selected_elements.add(new_items[-1][1])
        new_items = new_items[:-1]

    # the algorithm reduces the length of the sample problem,
    # it is guaranteed that:
    # if λ = λ1: the first item has probability 0
    # if λ = λ2: the last item has probability 1
    assert len(new_items) < len(items), "problem was not simplified (impossible)"

    # recursive call with the simpler sample problem
    # NOTE: we have to make sure that the selected elements are included
    return sample_no_replacement_exact(
        new_items,
        k - len(selected_elements),
        best_effort=best_effort,
        random_=random_,
        ε=ε
    ) | selected_elements

例子:

In : sample_no_replacement_exact([(1,'a'),(2,'b'),(3,'c')],2)
Out: {'b', 'c'}

In : import collections, itertools
In : sample_tester = lambda f: collections.Counter(itertools.chain(*(f() for _ in range(10000))))
In : sample_tester(lambda: sample_no_replacement_exact([(1,'a'),(2,'b'),(3,'c'),(4,'d')],2))
Out: Counter({'a': 2048, 'b': 4051, 'c': 5979, 'd': 7922})

权重总和为 10,因此包含概率计算为:a→ 20%、b→ 40%、c→ 60%、d→ 80%。(总和:200% = k。)它有效!

对于有效使用此功能,请注意一句话,很难发现权重的非法输入。一个明显的非法例子是

In: sample_no_replacement_exact([(1,'a'),(2,'b')],2)
ValueError: inclusion probabilities not satisfiable: [(0.6666666666666666, 'a'), (1.3333333333333333, 'b')]

b不能出现两次,a因为两者都必须始终被选中。还有更微妙的例子。为了避免生产中的异常,只需使用 best_effort=True,它会重新平衡包含概率质量,以便我们始终拥有有效的分布。显然,这可能会改变包含概率。

于 2019-01-11T09:17:40.897 回答
-2

我使用了关联图(权重,对象)。例如:

{
(10,"low"),
(100,"mid"),
(10000,"large")
}

total=10110

查看一个介于 0 和 'total' 之间的随机数并遍历键,直到该数字适合给定范围。

于 2010-01-26T16:36:57.787 回答