1

假设我有大量整数(比如 50,000,000 个)。

我想编写一个函数,它返回集合中不超过作为参数传递给函数的值的最大整数。例如,如果值是:

 Values = [ 10, 20, 30, 40, 50, 60]

然后find(Values, 25)应该返回 20。

该函数将在一秒钟内被调用多次,并且集合很大。假设蛮力搜索的性能太慢,什么是有效的方法呢?整数很少会改变,因此它们可以存储在可以提供最快访问的数据结构中。

我看过 gb_trees 但我认为您无法获得“插入点”然后获得上一个条目。

我意识到我可以通过构建自己的树结构或二进制切分排序数组来从头开始做到这一点,但是有一些我忽略的内置方法吗?

4

5 回答 5

4

要在大型未排序列表中找到最接近的值,我建议您使用分而治之策略 - 并并行处理列表的不同部分。但是列表的足够小部分可能会被顺序处理

这是给你的代码:

-module( finder ).
-export( [ nearest/2 ] ).

-define( THRESHOLD, 1000 ).

%%
%% sequential finding of nearest value
%%
%% if nearest value doesn't exists - return null
%%
nearest( Val, List ) when length(List) =< ?THRESHOLD ->
        lists:foldl(
                fun
                ( X, null ) when X < Val ->
                        X;
                ( _X, null ) ->
                        null;
                ( X, Nearest ) when X < Val, X > Nearest ->
                        X;
                ( _X, Nearest ) ->
                        Nearest
                end,
                null,
                List );
%%
%% split large lists and process each part in parallel
%%
nearest( Val, List ) ->
        { Left, Right } = lists:split( length(List) div 2, List ),
        Ref1 = spawn_nearest( Val, Left ),
        Ref2 = spawn_nearest( Val, Right ),
        Nearest1 = receive_nearest( Ref1 ),
        Nearest2 = receive_nearest( Ref2 ),
        %%
        %% compare nearest values from each part
        %%
        case { Nearest1, Nearest2 } of
                { null, null } ->
                        null;
                { null, Nearest2 } ->
                        Nearest2;
                { Nearest1, null } ->
                        Nearest1;
                { Nearest1, Nearest2 } when Nearest2 > Nearest1 ->
                        Nearest2;
                { Nearest1, Nearest2 } when Nearest2 =< Nearest1 ->
                        Nearest1
        end.

spawn_nearest( Val, List ) ->
        Ref = make_ref(),
        SelfPid = self(),
        spawn(
                fun() ->
                        SelfPid ! { Ref, nearest( Val, List ) }
                end ),
        Ref.

receive_nearest( Ref ) ->
        receive
                { Ref, Nearest } -> Nearest
        end.

在此处输入图像描述

在 shell 中测试:

1> c(finder).
{ok,finder}
2> 
2> List = [ random:uniform(1000) || _X <- lists:seq(1,100000) ].
[444,724,946,502,312,598,916,667,478,597,143,210,698,160,
 559,215,458,422,6,563,476,401,310,59,579,990,331,184,203|...]
3> 
3> finder:nearest( 500, List ).
499
4>
4> finder:nearest( -100, lists:seq(1,100000) ).
null
5> 
5> finder:nearest( 40000, lists:seq(1,100000) ).
39999
6> 
6> finder:nearest( 4000000, lists:seq(1,100000) ).
100000

性能:(单节点)

7> 
7> timer:tc( finder, nearest, [ 40000, lists:seq(1,10000) ] ). 
{3434,10000}
8> 
8> timer:tc( finder, nearest, [ 40000, lists:seq(1,100000) ] ).
{21736,39999}
9>
9> timer:tc( finder, nearest, [ 40000, lists:seq(1,1000000) ] ).
{314399,39999}

与普通迭代相比:

1> 
1> timer:tc( lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,10000) ] ).
{14994,null}
2> 
2> timer:tc( lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,100000) ] ).
{141951,null}
3>
3> timer:tc( lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,1000000) ] ).
{1374426,null}

所以,你可能会看到,在具有1000000 个元素的列表上,函数finder:nearest比使用.lists:foldl

THRESHOLD您可能会在您的情况下找到最佳值。

如果在不同节点上生成进程,您也可以提高性能。

于 2012-09-06T23:23:46.463 回答
3

这是另一个使用 ets 的代码示例。我相信会在大约恒定的时间内进行查找:

1> ets:new(tab,[named_table, ordered_set, public]).
2> lists:foreach(fun(N) -> ets:insert(tab,{N,[]}) end, lists:seq(1,50000000)).
3> timer:tc(fun() -> ets:prev(tab, 500000) end).
{21,499999}
4> timer:tc(fun() -> ets:prev(tab, 41230000) end).
{26,41229999}

周围的代码当然会比这多一点,但它相当整洁

于 2012-09-10T08:36:14.947 回答
1

因此,如果输入未排序,您可以通过执行以下操作获得线性版本:

closest(Target, [Hd | Tl ]) ->
        closest(Target, Tl, Hd).

closest(_Target, [], Best) -> Best;
closest(Target, [ Target | _ ], _) -> Target;
closest(Target, [ N | Rest ], Best) ->
    CurEps = erlang:abs(Target - Best),
    NewEps = erlang:abs(Target -  N),
    if NewEps < CurEps ->
            closest(Target, Rest, N);
       true ->
            closest(Target, Rest, Best)
    end.

如果对输入进行排序,您应该能够做得更好。

我在这里发明了自己的“最接近”指标,因为我允许最接近的值高于目标值——如果你愿意,你可以将其更改为“最接近但不大于”。

于 2012-09-06T21:37:50.350 回答
1

在我看来,如果你有大量不经常变化的数据,你应该考虑组织它。我写了一个基于有序列表的简单的,包括插入和删除功能。它为插入和搜索提供了良好的结果。

-module(finder).

-export([test/1,find/2,insert/2,remove/2,new/0]).

-compile(export_all).

new() -> [].

insert(V,L) -> 
    {R,P} = locate(V,L,undefined,-1),
    insert(V,R,P,L).

find(V,L) -> 
    locate(V,L,undefined,-1).

remove(V,L) ->  
    {R,P} = locate(V,L,undefined,-1),
    remove(V,R,P,L).

test(Max) -> 
    {A,B,C} = erlang:now(),
    random:seed(A,B,C),
    L = lists:seq(0,100*Max,100),
    S = random:uniform(100000000),
    I = random:uniform(100000000),
    io:format("start insert at ~p~n",[erlang:now()]),
    L1 = insert(I,L),
    io:format("start find at ~p~n",[erlang:now()]),
    R = find(S,L1),
    io:format("end at ~p~n result is ~p~n",[erlang:now(),R]).

remove(_,_,-1,L) -> L;
remove(V,V,P,L) ->
    {L1,[V|L2]} = lists:split(P,L),
    L1 ++ L2;
remove(_,_,_,L) ->L.

insert(V,V,_,L) -> L;
insert(V,_,-1,L) -> [V|L];
insert(V,_,P,L) ->
    {L1,L2} = lists:split(P+1,L),
    L1 ++ [V] ++ L2.

locate(_,[],R,P) -> {R,P};
locate (V,L,R,P) -> 
    %% io:format("locate, value = ~p, liste = ~p, current result = ~p, current pos = ~p~n",[V,L,R,P]),
    {L1,[M|L2]} = lists:split(Le1 = (length(L) div 2), L),
    locate(V,R,P,Le1+1,L1,M,L2).

locate(V,_,P,Le,_,V,_) -> {V,P+Le};
locate(V,_,P,Le,_,M,L2) when V > M -> locate(V,L2,M,P+Le);
locate(V,R,P,_,L1,_,_) -> locate(V,L1,R,P).

给出以下结果

(exec@WXFRB1824L)6> finder:test(10000000)。

从 {1347,28177,618000} 开始插入

从 {1347,28178,322000} 开始查找

结束于 {1347,28178,728000}

结果是 {72983500,729836}

在 10 000 000 个元素的列表中插入一个新值需要 704 毫秒,在同一个列表中找到最接近的值需要 406 毫秒。

于 2012-09-07T22:32:59.827 回答
0

我试图获得关于我上面提出的算法性能的更准确信息,阅读了 Stemm 非常有趣的解决方案,我决定使用 tc:timer/3 函数。大骗局:o)。在我的笔记本电脑上,我的时间准确性非常差。所以我决定留下我的 corei5(2 核 * 2 线程)+ 2Gb DDR3 + windows XP 32bit 来使用我的家用电脑:Phantom(6 核)+ 8Gb + Linux 64bit。

现在 tc:timer 按预期工作,我能够操作 100 000 000 个整数的列表。我能够看到我在每一步调用 length 函数都浪费了很多时间,所以我稍微重构了代码以避免它:

-module(finder).

-export([test/2,find/2,insert/2,remove/2,new/0]).

%% interface

new() -> {0,[]}.

insert(V,{S,L}) -> 
    {R,P} = locate(V,L,S,undefined,-1),
    insert(V,R,P,L,S).

find(V,{S,L}) -> 
    locate(V,L,S,undefined,-1).

remove(V,{S,L}) ->  
    {R,P} = locate(V,L,S,undefined,-1),
    remove(V,R,P,L,S).

remove(_,_,-1,L,S) -> {S,L};
remove(V,V,P,L,S) ->
    {L1,[V|L2]} = lists:split(P,L),
    {S-1,L1 ++ L2};
remove(_,_,_,L,S) ->{S,L}.

%% local

insert(V,V,_,L,S) -> {S,L};
insert(V,_,-1,L,S) -> {S+1,[V|L]};
insert(V,_,P,L,S) ->
    {L1,L2} = lists:split(P+1,L),
    {S+1,L1 ++ [V] ++ L2}.

locate(_,[],_,R,P) -> {R,P};
locate (V,L,S,R,P) -> 
    S1 = S div 2,
    S2 = S - S1 -1,
    {L1,[M|L2]} = lists:split(S1, L),
    locate(V,R,P,S1+1,L1,S1,M,L2,S2).

locate(V,_,P,Le,_,_,V,_,_) -> {V,P+Le};
locate(V,_,P,Le,_,_,M,L2,S2) when V > M -> locate(V,L2,S2,M,P+Le);
locate(V,R,P,_,L1,S1,_,_,_) -> locate(V,L1,S1,R,P).

%% test

test(Max,Iter) -> 
    {A,B,C} = erlang:now(),
    random:seed(A,B,C),
    L = {Max+1,lists:seq(0,100*Max,100)},
    Ins = test_insert(L,Iter,[]),
    io:format("insert:~n~s~n",[stat(Ins,Iter)]),
    Fin = test_find(L,Iter,[]),
    io:format("find:~n ~s~n",[stat(Fin,Iter)]).

test_insert(_L,0,Res) -> Res;
test_insert(L,I,Res) ->
    V = random:uniform(1000000000),
    {T,_} = timer:tc(finder,insert,[V,L]),
    test_insert(L,I-1,[T|Res]).

test_find(_L,0,Res) -> Res;
test_find(L,I,Res) ->
    V = random:uniform(1000000000),
    {T,_} = timer:tc(finder,find,[V,L]),
    test_find(L,I-1,[T|Res]).

stat(L,N) ->
    Aver = lists:sum(L)/N,
    {Min,Max,Var} = lists:foldl(fun (X,{Mi,Ma,Va}) -> {min(X,Mi),max(X,Ma),Va+(X-Aver)*(X-Aver)} end, {999999999999999999999999999,0,0}, L),
    Sig = math:sqrt(Var/N),
    io_lib:format("   average: ~p,~n   minimum: ~p,~n   maximum: ~p,~n   sigma   : ~p.~n",[Aver,Min,Max,Sig]).

以下是一些结果。

1> 查找器:测试(1000,10)。插入:

平均:266.7,

最低:216,

最大:324,

西格玛:36.98121144581393。

寻找:

average: 136.1,

最低:105,

最大:162,

西格玛:15.378231367748375。

2> 查找器:测试(100000,10)。

插入:

平均:10096.5,

最低:9541,

最大值:12222,

西格玛:762.5642595873478。

寻找:

average: 5077.4,

最低:4666,

最大值:6937,

西格玛:627.126494417195。

3> 查找器:测试(1000000,10)。

插入:

平均:109871.1,

最低:94747,

最大值:139916,

西格玛:13852.211285206417。

发现:平均:40428.0,

最低:31297,

最大值:56965,

西格玛:7797.425562325042。

4> 查找器:测试(100000000,10)。

插入:

平均:8067547.8,

最低:6265625,

最大值:16590349,

西格玛:3199868.809140206。

寻找:

average: 8484876.4,

最低:5158504,

最大值:15950944,

西格玛:4044848.707872872。

在 100 000 000 列表上,它很慢,并且多进程解决方案无法帮助解决这种二分法算法......这是该解决方案的一个弱点,但如果您有多个进程并行请求找到最接近的值,它无论如何都可以使用多核。

帕斯卡。

于 2012-09-10T04:45:42.917 回答