我有一个数组,里面装满了整数。我的工作是为数组的任何部分快速找到多数元素,我需要这样做... log n time,不是线性的,但事先我可以花一些时间来准备数组。
例如:
1 5 2 7 7 7 8 4 6
和查询:
[4, 7]
返回7
[4, 8]
返回7
[1, 2]
返回0
(无多数元素),依此类推...
我需要对每个查询都有一个答案,如果可能的话,它需要快速执行。
为了准备,我可以使用O(n log n) 时间
O(log n) 查询和 O(n log n) 预处理/空间可以通过查找和使用具有以下属性的多数区间来实现:
换句话说,多数区间的唯一目的是为任何查询区间提供 O(log n) 多数元素候选者。
该算法使用以下数据结构:
map<Value, vector<Position>>
或者unordered_map
可以在这里使用来提高性能(但我们需要提取所有键并对它们进行排序,以便结构#3 以正确的顺序填充)。vector<Interval>
vector<small_map<Value, Data>>
)。其中Data
包含来自结构#1 的适当向量的两个索引,指向具有给定值的元素的下一个/上一个位置。更新:感谢@justhalf,最好将具有给定值Data
的元素的累积频率存储起来。可以实现为对的排序向量 - 预处理将附加已按排序顺序的元素,查询将仅用于线性搜索。small_map
small_map
预处理:
询问:
s3[stop][value].prev - s3[start][value].next + 1
。如果大于查询间隔的一半,则返回value。如果使用累积频率而不是下一个/上一个索引,s3[stop+1][value].freq - s3[start][value].freq
则改为计算。该算法的主要部分是从位置列表中获取多数区间:
number_of_matching_values_to_the_left - number_of_nonmatching_values_to_the_left
: 。for (auto x: positions) if (x < prefix.back()) prefix.push_back(x);
.reverse(positions); for (auto x: positions) if (x > suffix.back()) suffix.push_back(x);
.该算法保证了多数区间的属性 1 .. 3 。至于属性#4,我能想象到覆盖具有最大多数间隔数的元素的唯一方法是这样的11111111222233455666677777777
:这里元素4
被2 * log n
间隔覆盖,所以这个属性似乎是满足的。请在本文末尾查看此属性的更多正式证明。
例子:
对于输入数组“0 1 2 0 0 1 1 0”,将生成以下位置列表:
value positions
0 0 3 4 7
1 1 5 6
2 2
价值头寸0
将获得以下属性:
weights: 0:1 3:0 4:1 7:0
prefix: 0:1 3:0 (strictly decreasing)
suffix: 4:1 7:0 (strictly increasing when scanning backwards)
intervals: 0->4 3->7 4->0 7->3
merged intervals: 0-7
价值头寸1
将获得以下属性:
weights: 1:0 5:-2 6:-1
prefix: 1:0 5:-2
suffix: 1:0 6:-1
intervals: 1->none 5->6+1 6->5-1 1->none
merged intervals: 4-7
查询数据结构:
positions value next prev
0 0 0 x
1..2 0 1 0
3 0 1 1
4 0 2 2
4 1 1 x
5 0 3 2
...
查询[0,4]:
prev[4][0]-next[0][0]+1=2-0+1=3
query size=5
3>2.5, returned result 0
查询[2,5]:
prev[5][0]-next[2][0]+1=2-1+1=2
query size=4
2=2, returned result "none"
请注意,没有尝试检查元素“1”,因为它的多数区间不包括这些区间中的任何一个。
财产证明#4:
大多数区间的构造方式是严格超过其所有元素的 1/3 具有相应的值。any*(m-1) value*m any*m
对于子数组,例如 ,这个比率最接近 1/3 01234444456789
。
为了使这个证明更明显,我们可以将每个区间表示为 2D 中的一个点:每个可能的起点由水平轴表示,每个可能的结束点由垂直轴表示(见下图)。
所有有效区间都位于对角线上或之上。白色矩形表示覆盖某个数组元素的所有区间(在其右下角表示为单位大小区间)。
让我们用大小为 1、2、4、8、16... 的正方形覆盖这个白色矩形,它们共享同一个右下角。这将白色区域划分为类似于黄色区域的 O(log n) 区域(以及大小为 1 的单个正方形,其中包含该算法忽略的大小为 1 的单个间隔)。
让我们计算一下黄色区域可以放置多少个多数区间。一个区间(离对角最近)占据了离对角最远区间的元素的1/4(这个最大的区间包含了黄色区域中任意区间的所有元素)。这意味着最小间隔包含严格超过 1/12可用于整个黄色区域的值。因此,如果我们尝试将 12 个区间放置到黄色区域,我们没有足够的元素用于不同的值。所以黄色区域不能包含超过 11 个多数区间。并且白色矩形不能包含超过11 * log n
多数区间。证明完成。
11 * log n
是高估。正如我之前所说,很难想象2 * log n
多数区间会覆盖某些元素。甚至这个值也远大于覆盖多数区间的平均数。
C++11 实现。在ideone或此处查看:
#include <iostream>
#include <vector>
#include <map>
#include <algorithm>
#include <functional>
#include <random>
constexpr int SrcSize = 1000000;
constexpr int NQueries = 100000;
using src_vec_t = std::vector<int>;
using index_vec_t = std::vector<int>;
using weight_vec_t = std::vector<int>;
using pair_vec_t = std::vector<std::pair<int, int>>;
using index_map_t = std::map<int, index_vec_t>;
using interval_t = std::pair<int, int>;
using interval_vec_t = std::vector<interval_t>;
using small_map_t = std::vector<std::pair<int, int>>;
using query_vec_t = std::vector<small_map_t>;
constexpr int None = -1;
constexpr int Junk = -2;
src_vec_t generate_e()
{ // good query length = 3
src_vec_t src;
std::random_device rd;
std::default_random_engine eng{rd()};
auto exp = std::bind(std::exponential_distribution<>{0.4}, eng);
for (int i = 0; i < SrcSize; ++i)
{
int x = exp();
src.push_back(x);
//std::cout << x << ' ';
}
return src;
}
src_vec_t generate_ep()
{ // good query length = 500
src_vec_t src;
std::random_device rd;
std::default_random_engine eng{rd()};
auto exp = std::bind(std::exponential_distribution<>{0.4}, eng);
auto poisson = std::bind(std::poisson_distribution<int>{100}, eng);
while (int(src.size()) < SrcSize)
{
int x = exp();
int n = poisson();
for (int i = 0; i < n; ++i)
{
src.push_back(x);
//std::cout << x << ' ';
}
}
return src;
}
src_vec_t generate()
{
//return generate_e();
return generate_ep();
}
int trivial(const src_vec_t& src, interval_t qi)
{
int count = 0;
int majorityElement = 0; // will be assigned before use for valid args
for (int i = qi.first; i <= qi.second; ++i)
{
if (count == 0)
majorityElement = src[i];
if (src[i] == majorityElement)
++count;
else
--count;
}
count = 0;
for (int i = qi.first; i <= qi.second; ++i)
{
if (src[i] == majorityElement)
count++;
}
if (2 * count > qi.second + 1 - qi.first)
return majorityElement;
else
return None;
}
index_map_t sort_ind(const src_vec_t& src)
{
int ind = 0;
index_map_t im;
for (auto x: src)
im[x].push_back(ind++);
return im;
}
weight_vec_t get_weights(const index_vec_t& indexes)
{
weight_vec_t weights;
for (int i = 0; i != int(indexes.size()); ++i)
weights.push_back(2 * i - indexes[i]);
return weights;
}
pair_vec_t get_prefix(const index_vec_t& indexes, const weight_vec_t& weights)
{
pair_vec_t prefix;
for (int i = 0; i != int(indexes.size()); ++i)
if (prefix.empty() || weights[i] < prefix.back().second)
prefix.emplace_back(indexes[i], weights[i]);
return prefix;
}
pair_vec_t get_suffix(const index_vec_t& indexes, const weight_vec_t& weights)
{
pair_vec_t suffix;
for (int i = indexes.size() - 1; i >= 0; --i)
if (suffix.empty() || weights[i] > suffix.back().second)
suffix.emplace_back(indexes[i], weights[i]);
std::reverse(suffix.begin(), suffix.end());
return suffix;
}
interval_vec_t get_intervals(const pair_vec_t& prefix, const pair_vec_t& suffix)
{
interval_vec_t intervals;
int prev_suffix_index = 0; // will be assigned before use for correct args
int prev_suffix_weight = 0; // same assumptions
for (int ind_pref = 0, ind_suff = 0; ind_pref != int(prefix.size());)
{
auto i_pref = prefix[ind_pref].first;
auto w_pref = prefix[ind_pref].second;
if (ind_suff != int(suffix.size()))
{
auto i_suff = suffix[ind_suff].first;
auto w_suff = suffix[ind_suff].second;
if (w_pref <= w_suff)
{
auto beg = std::max(0, i_pref + w_pref - w_suff);
if (i_pref < i_suff)
intervals.emplace_back(beg, i_suff + 1);
if (w_pref == w_suff)
++ind_pref;
++ind_suff;
prev_suffix_index = i_suff;
prev_suffix_weight = w_suff;
continue;
}
}
// ind_suff out of bounds or w_pref > w_suff:
auto end = prev_suffix_index + prev_suffix_weight - w_pref + 1;
// end may be out-of-bounds; that's OK if overflow is not possible
intervals.emplace_back(i_pref, end);
++ind_pref;
}
return intervals;
}
interval_vec_t merge(const interval_vec_t& from)
{
using endpoints_t = std::vector<std::pair<int, bool>>;
endpoints_t ep(2 * from.size());
std::transform(from.begin(), from.end(), ep.begin(),
[](interval_t x){ return std::make_pair(x.first, true); });
std::transform(from.begin(), from.end(), ep.begin() + from.size(),
[](interval_t x){ return std::make_pair(x.second, false); });
std::sort(ep.begin(), ep.end());
interval_vec_t to;
int start; // will be assigned before use for correct args
int overlaps = 0;
for (auto& x: ep)
{
if (x.second) // begin
{
if (overlaps++ == 0)
start = x.first;
}
else // end
{
if (--overlaps == 0)
to.emplace_back(start, x.first);
}
}
return to;
}
interval_vec_t get_intervals(const index_vec_t& indexes)
{
auto weights = get_weights(indexes);
auto prefix = get_prefix(indexes, weights);
auto suffix = get_suffix(indexes, weights);
auto intervals = get_intervals(prefix, suffix);
return merge(intervals);
}
void update_qv(
query_vec_t& qv,
int value,
const interval_vec_t& intervals,
const index_vec_t& iv)
{
int iv_ind = 0;
int qv_ind = 0;
int accum = 0;
for (auto& interval: intervals)
{
int i_begin = interval.first;
int i_end = std::min<int>(interval.second, qv.size() - 1);
while (iv[iv_ind] < i_begin)
{
++accum;
++iv_ind;
}
qv_ind = std::max(qv_ind, i_begin);
while (qv_ind <= i_end)
{
qv[qv_ind].emplace_back(value, accum);
if (iv[iv_ind] == qv_ind)
{
++accum;
++iv_ind;
}
++qv_ind;
}
}
}
void print_preprocess_stat(const index_map_t& im, const query_vec_t& qv)
{
double sum_coverage = 0.;
int max_coverage = 0;
for (auto& x: qv)
{
sum_coverage += x.size();
max_coverage = std::max<int>(max_coverage, x.size());
}
std::cout << " size = " << qv.size() - 1 << '\n';
std::cout << " values = " << im.size() << '\n';
std::cout << " max coverage = " << max_coverage << '\n';
std::cout << " avg coverage = " << sum_coverage / qv.size() << '\n';
}
query_vec_t preprocess(const src_vec_t& src)
{
query_vec_t qv(src.size() + 1);
auto im = sort_ind(src);
for (auto& val: im)
{
auto intervals = get_intervals(val.second);
update_qv(qv, val.first, intervals, val.second);
}
print_preprocess_stat(im, qv);
return qv;
}
int do_query(const src_vec_t& src, const query_vec_t& qv, interval_t qi)
{
if (qi.first == qi.second)
return src[qi.first];
auto b = qv[qi.first].begin();
auto e = qv[qi.second + 1].begin();
while (b != qv[qi.first].end() && e != qv[qi.second + 1].end())
{
if (b->first < e->first)
{
++b;
}
else if (e->first < b->first)
{
++e;
}
else // if (e->first == b->first)
{
// hope this doesn't overflow
if (2 * (e->second - b->second) > qi.second + 1 - qi.first)
return b->first;
++b;
++e;
}
}
return None;
}
int main()
{
std::random_device rd;
std::default_random_engine eng{rd()};
auto poisson = std::bind(std::poisson_distribution<int>{500}, eng);
int majority = 0;
int nonzero = 0;
int failed = 0;
auto src = generate();
auto qv = preprocess(src);
for (int i = 0; i < NQueries; ++i)
{
int size = poisson();
auto ud = std::uniform_int_distribution<int>(0, src.size() - size - 1);
int start = ud(eng);
int stop = start + size;
auto res1 = do_query(src, qv, {start, stop});
auto res2 = trivial(src, {start, stop});
//std::cout << size << ": " << res1 << ' ' << res2 << '\n';
if (res2 != res1)
++failed;
if (res2 != None)
{
++majority;
if (res2 != 0)
++nonzero;
}
}
std::cout << "majority elements = " << 100. * majority / NQueries << "%\n";
std::cout << " nonzero elements = " << 100. * nonzero / NQueries << "%\n";
std::cout << " queries = " << NQueries << '\n';
std::cout << " failed = " << failed << '\n';
return 0;
}
相关工作:
正如该问题的其他答案所指出的,还有其他工作已经解决了这个问题:S. Durocher、M. He、I Munro、PK Nicholson、M. Skala 的“恒定时间和线性空间中的范围多数”。
本文提出的算法对查询时间有更好的渐近复杂度:O(1)
代替O(log n)
和空间:O(n)
代替O(n log n)
.
更好的空间复杂度允许该算法处理更大的数据集(与此答案中提出的算法相比)。预处理数据所需的内存更少,数据访问模式更常规,很可能允许该算法更快地预处理数据。但是查询时间并不是那么容易......
假设我们从论文中输入了最有利于算法的数据:n=1000000000(很难想象在 2013 年有超过 10..30 GB 内存的系统)。
此答案中提出的算法需要为每个查询处理多达 120 个(或 2 个查询边界 * 2 * log n)元素。但它执行非常简单的操作,类似于线性搜索。并且它顺序访问两个连续的内存区域,因此它是缓存友好的。
论文中的算法需要为每个查询执行多达 20 次操作(或 2 个查询边界 * 5 个候选者 * 2 个小波树级别)。这减少了 6 倍。但是每个操作都比较复杂。每个对位计数器的简洁表示的查询都包含一个线性搜索(这意味着 20 个线性搜索而不是一个)。最糟糕的是,每个这样的操作都应该访问几个独立的内存区域(除非查询大小和四倍大小非常小),因此查询对缓存不友好。这意味着每个查询(虽然是一个恒定时间操作)都非常慢,可能比这里提出的算法慢。如果我们减小输入数组大小,则增加此处提出的算法更快的机会。
文中算法的实际缺点是小波树和简洁的位计数器实现。从头开始实施它们可能非常耗时。使用预先存在的实现并不总是很方便。
诀窍
在寻找多数元素时,您可能会丢弃没有多数元素的区间。请参阅查找数组中的多数元素。这使您可以非常简单地解决这个问题。
准备
在准备时,递归地将数组分成两半并将这些数组间隔存储在二叉树中。对于每个节点,计算数组区间中每个元素的出现次数。您需要一个提供 O(1) 插入和读取的数据结构。我建议使用 unsorted_multiset,它的平均行为根据需要(但最坏情况下的插入是线性的)。还要检查间隔是否有多数元素,如果有,则将其存储。
运行
在运行时,当被要求计算一个范围的多数元素时,潜入树中以计算精确覆盖给定范围的间隔集。使用技巧来组合这些间隔。
如果我们有数组区间7 5 5 7 7 7
,有多数元素7
,我们可以拆分并丢弃,5 5 7 7
因为它没有多数元素。实际上,五人组已经吞噬了七人组中的两个。剩下的是一个数组7 7
,或者2x7
。将此数字称为多数元素2
的多数数7
:
数组区间的多数元素的多数计数是多数元素的出现计数减去所有其他元素的组合出现。
使用以下规则组合区间以找到潜在的多数元素:
- 丢弃没有多数元素的区间
- 组合具有相同多数元素的两个数组很容易,只需将元素的多数数相加即可。
2x7
并3x7
成为5x7
- 当组合具有不同多数元素的两个数组时,较高的多数数获胜。从较高的多数数中减去较低的多数数,以找到最终的多数数。
3x7
并2x3
成为1x7
.- 如果它们的多数元素不同但具有相同的多数数,则忽略这两个数组。
3x7
并3x5
相互抵消。
当所有间隔都被丢弃或组合时,您要么一无所有,在这种情况下,没有多数元素。或者您有一个包含潜在多数元素的组合区间。查找并在所有数组间隔中添加此元素的出现计数(也包括先前丢弃的那些),以检查它是否真的是多数元素。
例子
对于数组1,1,1,2,2,3,3,2,2,2,3,2,2
,您将获得树(括号中列出的多数数 x 多数元素)
1,1,1,2,2,3,3,2,2,2,3,2,2
(1x2)
/ \
1,1,1,2,2,3,3 2,2,2,3,2,2
(4x2)
/ \ / \
1,1,1,2 2,3,3 2,2,2 3,2,2
(2x1) (1x3) (3x2) (1x2)
/ \ / \ / \ / \
1,1 1,2 2,3 3 2,2 2 3,2 2
(1x1) (1x3) (2x2) (1x2) (1x2)
/ \ / \ / \ / \ / \
1 1 1 2 2 3 2 2 3 2
(1x1) (1x1)(1x1)(1x2)(1x2)(1x3) (1x2)(1x2) (1x3) (1x2)
范围 [5,10](1-indexed)被一组区间 2,3,3 (1x3), 2,2,2 (3x2) 覆盖。他们有不同的多数元素。减去他们的多数数,你就剩下 2x2。所以 2 是潜在的多数元素。在数组中查找并求和 2 的实际出现计数:1+3 = 4 out of 6。2 是多数元素。
范围 [1,10] 被区间 1,1,1,2,2,3,3(无多数元素)和 2,2,2 (3x2) 的集合所覆盖。忽略第一个区间,因为它没有多数元素,所以 2 是潜在的多数元素。将所有间隔中 2 的出现次数相加:2+3 = 10 中的 5。没有多数元素。
实际上,它可以在恒定时间和线性空间中完成(!)
见https://cs.stackexchange.com/questions/16671/range-majority-queries-most-freqent-element-in-range和S. Durocher, M. He, I Munro, PK Nicholson, M. Skala, Range大多数在恒定时间和线性空间,信息和计算 222 (2013) 169–179,爱思唯尔。
它们的准备时间是 O(n log n),所需的空间是 O(n),查询是 O(1)。这是一篇理论论文,我并不声称了解所有内容,但它似乎远非不可能实现。他们正在使用小波树。
有关小波树的实现,请参见https://github.com/fclaude/libcds
如果你有无限的内存,你可以和有限的数据范围(如短整数)即使在O(N) 时间内也能做到。
总共 O(1) + O(N) 次操作。
如果您使用 map 而不是数组 X,您也可以使用 O(N) 内存来限制自己。但是您需要在第 1 阶段的每次迭代中找到元素。因此您将需要 O(N*log(N)) 时间总共。
您可以使用 MAX Heap,将数字频率作为保持 Max Heap 属性的决定因素,我的意思是,例如用于以下输入数组
1 5 2 7 7 7 8 4 6 5
Heap would have all distinct elements with their frequency associated with them
Element = 1 Frequency = 1,
Element = 5 Frequency = 2,
Element = 2 Frequency = 1,
Element = 7 Frequency = 3,
Element = 8 Frequency = 1,
Element = 4 Frequency = 1,
Element = 6 Frequency = 1
作为它的最大堆,频率为 3 的元素 7 将在根级别,只需检查输入范围是否包含该元素,如果是则这是答案,如果否,则根据输入范围转到左子树或右子树并执行相同的检查。
O(N) 在创建堆时只需要一次,但一旦创建,搜索将是有效的。
编辑:对不起,我正在解决一个不同的问题。
对数组进行排序并构建对的有序列表(值,number_of_occurrences) - 它的O(N log N).
开头是
1 5 2 7 7 7 8 4 6
这将是
(1,1) (2,1) (4,1) (5,1) (6,1) (7,3) (8,1)
best_value_or_none
在这个数组之上,用对 ( , )构建一棵二叉树max_occurrences
。它看起来像:
(1,1) (2,1) (4,1) (5,1) (6,1) (7,3) (8,1)
\ / \ / \ / |
(0,1) (0,1) (7,3) (8,1)
\ / \ /
(0,1) (7,3)
\ /
(7,3)
这个结构肯定有一个花哨的名字,但我不记得了:)
从这里,它是O(log N)
获取任何间隔的模式。任何区间都可以拆分为O(log N)
预先计算好的区间;例如:
[4, 7] = [4, 5] + [6, 7]
f([4,5]) = (0,1)
f([6,7]) = (7,3)
结果是 (7,3)。