6

这是我前段时间在面试时被问到的问题,我找不到答案。

给定一些样本 S1, S2, ... Sn 和它们的概率分布(或权重,不管它叫什么)P1, P2, .. Pn,设计随机选择样本的算法考虑到它的概率。我提出的解决方案如下:

  1. 构建权重 Ci 的累积数组,例如

    C0 = 0; Ci = C[i-1] + Pi。

同时计算T=P1+P2+...Pn。需要 O(n) 时间

  1. 生成均匀随机数 R = T*random[0..1]
  2. 使用二分查找算法,返回最少 i 个这样的 Ci >= R。结果是 Si。它需要 O(logN) 时间。

现在实际的问题是:假设我想更改初始权重 Pj 之一。如何在比 O(n) 时间内更好地做到这一点?其他数据结构是可以接受的,但随机采样算法不应该比 O(logN) 差。

4

4 回答 4

5

解决此问题的一种方法是重新考虑如何构建包含累积总数的二叉搜索树。与其构建二叉搜索树,不如考虑将每个节点解释如下:

  • 每个节点存储一系列专用于节点本身的值。
  • 左子树中的节点表示从该范围左侧的概率分布中采样。
  • 右子树中的节点表示从该范围右侧的概率分布中采样。

例如,假设我们对于事件 A、B、C、D、E、F 和 G 的权重分别为 3、2、2、2、2、1 和 1。我们构建这个包含 A、B、C 的二叉树, D、E、F 和 G:

               D
             /   \
           B       F
          / \     / \
         A   C   E   G

现在,我们用概率注释树。由于 A、C、E 和 G 都是叶子,我们给它们每个概率质量一个:

               D
             /   \
           B       F
          / \     / \
         A   C   E   G
         1   1   1   1

现在,看看 B 的树。B 被选中的权重为 2,A 被选中的权重为 3,C 被选中的概率为 2。如果我们将它们归一化到范围 [0, 1),那么 A 占概率的 3/7,B 和 C 各占 2/7s。因此,我们有 B 的节点表示 [0, 3/7) 范围内的任何内容都进入左子树,范围 [3/7, 5/7) 范围内的任何内容都映射到 B,范围 [5 /7, 1) 映射到右子树:

                   D
                 /   \
           B              F
 [0, 3/7) / \  [5/7, 1)  / \
         A   C          E   G
         1   1          1   1

类似地,让我们处理 F。E 被选中的权重为 2,而 F 和 G 的被选中概率权重为 1。因此这里E的子树占概率质量的1/2,节点F占1/4,G的子树占1/4。这意味着我们可以将概率分配为

                       D
                     /   \
           B                        F
 [0, 3/7) / \  [5/7, 1)   [0, 1/2) / \  [3/4, 1)
         A   C                    E   G
         1   1                    1   1

最后,让我们看看根。左子树的组合权重为 3 + 2 + 2 = 7。右子树的组合权重为 2 + 1 + 1 = 4。D 本身的权重为 2。因此左子树的概率为 7/13被选中,D 有 2/13 的概率被选中,右子树有 4/13 的概率被选中。因此,我们可以将概率最终确定为

                       D
           [0, 7/13) /   \ [9/13, 1)
           B                        F
 [0, 3/7) / \  [5/7, 1)   [0, 1/2) / \  [3/4, 1)
         A   C                    E   G
         1   1                    1   1

要生成随机值,您将重复以下操作:

  • 从根开始:
    • 在 [0, 1) 范围内选择一个均匀随机值。
    • 如果它在左子树的范围内,则下降到它。
    • 如果它在右子树的范围内,则下降到它。
    • 否则,返回当前节点对应的值。

概率本身可以在构建树时递归确定:

  • 任何叶节点的左右概率均为 0。
  • 如果一个内部节点本身的权重为 W,其左树的总权重为 W L,其右树的总权重为 W R,那么左边的概率为 (W L ) / (W + W L + W R ),右边的概率为概率是 (W R ) / (W + W L + W R )。

这种重新表述有用的原因是,它为我们提供了一种在 O(log n) 时间内更新每个概率更新概率的方法。特别是,让我们考虑一下,如果我们更新某个特定节点的权重,哪些不变量会发生变化。为简单起见,我们现在假设节点是叶子。当我们更新叶节点的权重时,叶节点的概率仍然是正确的,但对于它正上方的节点却是不正确的,因为该节点的一个子树的权重发生了变化。因此,我们可以(在 O(1) 时间内)通过使用与上述相同的公式重新计算父节点的概率。但是随后该节点的父节点不再具有正确的值,因为它的子树权重之一发生了变化,因此我们也可以在那里重新计算概率。这个过程一直重复到树的根部,我们对每个级别进行 O(1) 计算以纠正分配给每个边的权重。假设树是平衡的,因此我们必须做 O(log n) 的总工作来更新一个概率。如果节点不是叶节点,则逻辑相同;我们只是从树的某个地方开始。

简而言之,这给出了

  • O(n) 时间来构建树(使用自下而上的方法),
  • O(log n) 时间来生成一个随机值,并且
  • O(log n) 时间来更新任何一个值。

希望这可以帮助!

于 2012-01-20T20:38:09.770 回答
4

将搜索结构存储为平衡二叉树,而不是数组。树的每个节点都应该存储它包含的元素的总权重。根据 的值R,搜索过程要么返回当前节点,要么搜索左子树或右子树。

当元素的权重发生变化时,搜索结构的更新就是调整从元素到树根的路径上的权重。

由于树是平衡的,因此搜索和权重更新操作都是 O(log N)。

于 2012-01-20T20:27:29.013 回答
1

对于那些想要一些代码的人,这里有一个 python 实现:

import numpy


class DynamicProbDistribution(object):
  """ Given a set of weighted items, randomly samples an item with probability
  proportional to its weight. This class also supports fast modification of the
  distribution, so that changing an item's weight requires O(log N) time. 
  Sampling requires O(log N) time. """

  def __init__(self, weights):
    self.num_weights = len(weights)
    self.weights = numpy.empty((1+len(weights),), 'float32')
    self.weights[0] = 0 # Not necessary but easier to read after printing
    self.weights[1:] = weights
    self.weight_tree = numpy.zeros((1+len(weights),), 'float32')
    self.populate_weight_tree()

  def populate_weight_tree(self):
    """ The value of every node in the weight tree is equal to the sum of all 
    weights in the subtree rooted at that node. """
    i = self.num_weights
    while i > 0:
      weight_sum = self.weights[i]
      twoi = 2*i
      if twoi < self.num_weights:
        weight_sum += self.weight_tree[twoi] + self.weight_tree[twoi+1]
      elif twoi == self.num_weights:
        weight_sum += self.weights[twoi]
      self.weight_tree[i] = weight_sum
      i -= 1

  def set_weight(self, item_idx, weight):
    """ Changes the weight of the given item. """
    i = item_idx + 1
    self.weights[i] = weight
    while i > 0:
      weight_sum = self.weights[i]
      twoi = 2*i
      if twoi < self.num_weights:
        weight_sum += self.weight_tree[twoi] + self.weight_tree[twoi+1]
      elif twoi == self.num_weights:
        weight_sum += self.weights[twoi]
      self.weight_tree[i] = weight_sum
      i /= 2 # Only need to modify the parents of this node

  def sample(self):
    """ Returns an item index sampled from the distribution. """
    i = 1
    while True:
      twoi = 2*i

      if twoi < self.num_weights:
        # Two children
        val = numpy.random.random() * self.weight_tree[i]
        if val < self.weights[i]:
          # all indices are offset by 1 for fast traversal of the
          # internal binary tree
          return i-1
        elif val < self.weights[i] + self.weight_tree[twoi]:
          i = twoi # descend into the subtree
        else:
          i = twoi + 1

      elif twoi == self.num_weights:
        # One child
        val = numpy.random.random() * self.weight_tree[i]
        if val < self.weights[i]:
          return i-1
        else:
          i = twoi

      else:
        # No children
        return i-1


def validate_distribution_results(dpd, weights, samples_per_item=1000):
  import time

  bins = numpy.zeros((len(weights),), 'float32')
  num_samples = samples_per_item * numpy.sum(weights)

  start = time.time()
  for i in xrange(num_samples):
    bins[dpd.sample()] += 1
  duration = time.time() - start

  bins *= numpy.sum(weights)
  bins /= num_samples

  print "Time to make %s samples: %s" % (num_samples, duration)

  # These should be very close to each other
  print "\nWeights:\n", weights
  print "\nBins:\n", bins

  sdev_tolerance = 10 # very unlikely to be exceeded
  tolerance = float(sdev_tolerance) / numpy.sqrt(samples_per_item)
  print "\nTolerance:\n", tolerance

  error = numpy.abs(weights - bins)
  print "\nError:\n", error

  assert (error < tolerance).all()


#@test
def test_DynamicProbDistribution():
  # First test that the initial distribution generates valid samples.

  weights = [2,5,4, 8,3,6, 6,1,3, 4,7,9]
  dpd = DynamicProbDistribution(weights)

  validate_distribution_results(dpd, weights)

  # Now test that we can change the weights and still sample from the 
  # distribution.

  print "\nChanging weights..."
  dpd.set_weight(4, 10)
  weights[4] = 10
  dpd.set_weight(9, 2)
  weights[9] = 2
  dpd.set_weight(5, 4)
  weights[5] = 4
  dpd.set_weight(11, 3)
  weights[11] = 3

  validate_distribution_results(dpd, weights)

  print "\nTest passed"


if __name__ == '__main__':
  test_DynamicProbDistribution()
于 2012-01-23T00:05:36.927 回答
0

我已经实现了一个与 Ken 的代码相关的版本,但在最坏的情况下 O(log n) 操作与红/黑树平衡。这可以作为 weightedDict.py 获得:https ://github.com/google/weighted-dict

(我会将此添加为对肯的回答的评论,但没有这样做的声誉!)

于 2018-06-29T14:10:33.673 回答