2012-09-06 55 views
1

假设我有大量的整数(比如说有50,000,000个)。如何在Erlang的大列表中找到“最接近”的值

我想写一个函数,它返回集合中最大的整数,它不超过作为函数参数传递的值。例如。如果值分别为:

Values = [ 10, 20, 30, 40, 50, 60] 

然后find(Values, 25)应该返回20

该函数将被调用很多次第二和收集大。假设蛮力搜索的执行速度太慢,那么执行它的有效方法是什么?这些整数很少会改变,所以它们可以存储在一个数据结构中,从而实现最快的访问。

我看过gb_trees,但我不认为你可以获得“插入点”,然后获取以前的条目。

我意识到我可以通过构建我自己的树结构或二元斩波排序的数组从头开始这样做,但是有没有一些内置的方法可以做到这一点,我忽略了?

+0

如果值是*不*排序,这将如果样本数据是*不*排序,则更清楚。 (排序的方法绝对是最简单的,并且具有很好的时间复杂性,尤其是在多次调用时分期付款。) – 2012-09-06 17:45:56

回答

2

这是另一个使用ets的代码示例。我相信,查找会在大约恒定的时间进行:

1> ets:new(tab,[named_table, ordered_set, public]). 
2> lists:foreach(fun(N) -> ets:insert(tab,{N,[]}) end, lists:seq(1,50000000)). 
3> timer:tc(fun() -> ets:prev(tab, 500000) end). 
{21,499999} 
4> timer:tc(fun() -> ets:prev(tab, 41230000) end). 
{26,41229999} 

代码周围会比这当然有点多但比较整齐

+0

这很有趣,我认为ets:prev/2要求钥匙是现有的钥匙,我会试试 - 谢谢! –

+0

太简单了!正如保罗所说,我记得钥匙你应该添加一个小的值来得到正确的答案ets:prev(tab,500000.1):o) – Pascal

+0

@PaulCager不需要键不存在** IFF **表的类型是* ordered_set *。对于* set *和* bag *键必须存在。 – rvirding

1

因此,如果输入未排序,你可以通过执行获得线性的版本:

closest(Target, [Hd | Tl ]) -> 
     closest(Target, Tl, Hd). 

closest(_Target, [], Best) -> Best; 
closest(Target, [ Target | _ ], _) -> Target; 
closest(Target, [ N | Rest ], Best) -> 
    CurEps = erlang:abs(Target - Best), 
    NewEps = erlang:abs(Target - N), 
    if NewEps < CurEps -> 
      closest(Target, Rest, N); 
     true -> 
      closest(Target, Rest, Best) 
    end. 

你应该能够在输入排序做的更好。

我在这里发明了我自己的“最接近”指标,因为我允许最接近的值高于目标值 - 如果您喜欢,您可以将其改为“最接近但不大于”。

4

要查找大量无序列表,我建议你使用分而治之的策略最近的价值 - 并行处理列表中的不同部分。但足够的列表的小部分可能会按顺序处理

这里是你的代码:

-module(finder). 
-export([ nearest/2 ]). 

-define(THRESHOLD, 1000). 

%% 
%% sequential finding of nearest value 
%% 
%% if nearest value doesn't exists - return null 
%% 
nearest(Val, List) when length(List) =< ?THRESHOLD -> 
     lists:foldl(
       fun 
       (X, null) when X < Val -> 
         X; 
       (_X, null) -> 
         null; 
       (X, Nearest) when X < Val, X > Nearest -> 
         X; 
       (_X, Nearest) -> 
         Nearest 
       end, 
       null, 
       List); 
%% 
%% split large lists and process each part in parallel 
%% 
nearest(Val, List) -> 
     { Left, Right } = lists:split(length(List) div 2, List), 
     Ref1 = spawn_nearest(Val, Left), 
     Ref2 = spawn_nearest(Val, Right), 
     Nearest1 = receive_nearest(Ref1), 
     Nearest2 = receive_nearest(Ref2), 
     %% 
     %% compare nearest values from each part 
     %% 
     case { Nearest1, Nearest2 } of 
       { null, null } -> 
         null; 
       { null, Nearest2 } -> 
         Nearest2; 
       { Nearest1, null } -> 
         Nearest1; 
       { Nearest1, Nearest2 } when Nearest2 > Nearest1 -> 
         Nearest2; 
       { Nearest1, Nearest2 } when Nearest2 =< Nearest1 -> 
         Nearest1 
     end. 

spawn_nearest(Val, List) -> 
     Ref = make_ref(), 
     SelfPid = self(), 
     spawn(
       fun() -> 
         SelfPid ! { Ref, nearest(Val, List) } 
       end), 
     Ref. 

receive_nearest(Ref) -> 
     receive 
       { Ref, Nearest } -> Nearest 
     end. 

enter image description here

测试壳:

1> c(finder). 
{ok,finder} 
2> 
2> List = [ random:uniform(1000) || _X <- lists:seq(1,100000) ]. 
[444,724,946,502,312,598,916,667,478,597,143,210,698,160, 
559,215,458,422,6,563,476,401,310,59,579,990,331,184,203|...] 
3> 
3> finder:nearest(500, List). 
499 
4> 
4> finder:nearest(-100, lists:seq(1,100000)). 
null 
5> 
5> finder:nearest(40000, lists:seq(1,100000)). 
39999 
6> 
6> finder:nearest(4000000, lists:seq(1,100000)). 
100000 

性能:(单节点)

7> 
7> timer:tc(finder, nearest, [ 40000, lists:seq(1,10000) ]). 
{3434,10000} 
8> 
8> timer:tc(finder, nearest, [ 40000, lists:seq(1,100000) ]). 
{21736,39999} 
9> 
9> timer:tc(finder, nearest, [ 40000, lists:seq(1,1000000) ]). 
{314399,39999} 

对战平原迭代:

1> 
1> timer:tc(lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,10000) ]). 
{14994,null} 
2> 
2> timer:tc(lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,100000) ]). 
{141951,null} 
3> 
3> timer:tc(lists, foldl, [ fun(_X, Acc) -> Acc end, null, lists:seq(1,1000000) ]). 
{1374426,null} 

所以,哟可以看到,与百万元素列表,功能finder:nearest不是通过列表中以纯迭代与lists:foldl

对于您的情况,您可能会发现THRESHOLD的最佳值。

此外,如果在不同节点上生成进程,您可以提高性能。

+0

+1。伟大的,完整的回应。非常好。 –

1

在我看来,如果你有大量的数据不经常改变,你应该考虑组织它。 我写了一个简单的基于有序列表,包括插入删除功能。它为插入和搜索提供了很好的结果。

-module(finder). 

-export([test/1,find/2,insert/2,remove/2,new/0]). 

-compile(export_all). 

new() -> []. 

insert(V,L) -> 
    {R,P} = locate(V,L,undefined,-1), 
    insert(V,R,P,L). 

find(V,L) -> 
    locate(V,L,undefined,-1). 

remove(V,L) -> 
    {R,P} = locate(V,L,undefined,-1), 
    remove(V,R,P,L). 

test(Max) -> 
    {A,B,C} = erlang:now(), 
    random:seed(A,B,C), 
    L = lists:seq(0,100*Max,100), 
    S = random:uniform(100000000), 
    I = random:uniform(100000000), 
    io:format("start insert at ~p~n",[erlang:now()]), 
    L1 = insert(I,L), 
    io:format("start find at ~p~n",[erlang:now()]), 
    R = find(S,L1), 
    io:format("end at ~p~n result is ~p~n",[erlang:now(),R]). 

remove(_,_,-1,L) -> L; 
remove(V,V,P,L) -> 
    {L1,[V|L2]} = lists:split(P,L), 
    L1 ++ L2; 
remove(_,_,_,L) ->L. 

insert(V,V,_,L) -> L; 
insert(V,_,-1,L) -> [V|L]; 
insert(V,_,P,L) -> 
    {L1,L2} = lists:split(P+1,L), 
    L1 ++ [V] ++ L2. 

locate(_,[],R,P) -> {R,P}; 
locate (V,L,R,P) -> 
    %% io:format("locate, value = ~p, liste = ~p, current result = ~p, current pos = ~p~n",[V,L,R,P]), 
    {L1,[M|L2]} = lists:split(Le1 = (length(L) div 2), L), 
    locate(V,R,P,Le1+1,L1,M,L2). 

locate(V,_,P,Le,_,V,_) -> {V,P+Le}; 
locate(V,_,P,Le,_,M,L2) when V > M -> locate(V,L2,M,P+Le); 
locate(V,R,P,_,L1,_,_) -> locate(V,L1,R,P). 

其中得到下列结果

(EXEC @ WXFRB1824L)6>取景器:试验(10000000)。在{} 1347,28177,618000

开始

开始插入找到{} 1347,28178,322000

末在{} 1347,28178,728000

结果是{72983500, 729836}

即704ms在10万个元素列表中插入一个新值并在406ms中插入一个新值以在同一列表中找到最接近的值。

+0

但是,在我的笔记本电脑上,当我尝试生成50 000 000个整数列表时(eheap_alloc:无法分配298930300个字节的内存),卡住了。所以它不是正确的存储使用:o(。 – Pascal

+0

谢谢。这是我想到的那种 - 如何构造数据以便于查找。我正在考虑固定数组和二进制搜索 –

0

我试图得到一个关于我上面提出的算法性能的更准确的信息,阅读Stemm非常有趣的解决方案,我决定使用tc:timer/3函数。大欺骗:o)。在我的笔记本电脑上,我的时间精确度很差。所以我决定离开我的corei5(2核心* 2线程)+ 2Gb DDR3 + Windows XP 32位以使用我的家用PC:Phantom(6核心)+ 8Gb + Linux 64bit。

现在tc:计时器按预期工作,我可以操纵100 000 000个整数列表。我能看到,我失去了很多时间打电话在每一步的长度的功能,所以我重新分解代码一点,以避免它:

-module(finder). 

-export([test/2,find/2,insert/2,remove/2,new/0]). 

%% interface 

new() -> {0,[]}. 

insert(V,{S,L}) -> 
    {R,P} = locate(V,L,S,undefined,-1), 
    insert(V,R,P,L,S). 

find(V,{S,L}) -> 
    locate(V,L,S,undefined,-1). 

remove(V,{S,L}) -> 
    {R,P} = locate(V,L,S,undefined,-1), 
    remove(V,R,P,L,S). 

remove(_,_,-1,L,S) -> {S,L}; 
remove(V,V,P,L,S) -> 
    {L1,[V|L2]} = lists:split(P,L), 
    {S-1,L1 ++ L2}; 
remove(_,_,_,L,S) ->{S,L}. 

%% local 

insert(V,V,_,L,S) -> {S,L}; 
insert(V,_,-1,L,S) -> {S+1,[V|L]}; 
insert(V,_,P,L,S) -> 
    {L1,L2} = lists:split(P+1,L), 
    {S+1,L1 ++ [V] ++ L2}. 

locate(_,[],_,R,P) -> {R,P}; 
locate (V,L,S,R,P) -> 
    S1 = S div 2, 
    S2 = S - S1 -1, 
    {L1,[M|L2]} = lists:split(S1, L), 
    locate(V,R,P,S1+1,L1,S1,M,L2,S2). 

locate(V,_,P,Le,_,_,V,_,_) -> {V,P+Le}; 
locate(V,_,P,Le,_,_,M,L2,S2) when V > M -> locate(V,L2,S2,M,P+Le); 
locate(V,R,P,_,L1,S1,_,_,_) -> locate(V,L1,S1,R,P). 

%% test 

test(Max,Iter) -> 
    {A,B,C} = erlang:now(), 
    random:seed(A,B,C), 
    L = {Max+1,lists:seq(0,100*Max,100)}, 
    Ins = test_insert(L,Iter,[]), 
    io:format("insert:~n~s~n",[stat(Ins,Iter)]), 
    Fin = test_find(L,Iter,[]), 
    io:format("find:~n ~s~n",[stat(Fin,Iter)]). 

test_insert(_L,0,Res) -> Res; 
test_insert(L,I,Res) -> 
    V = random:uniform(1000000000), 
    {T,_} = timer:tc(finder,insert,[V,L]), 
    test_insert(L,I-1,[T|Res]). 

test_find(_L,0,Res) -> Res; 
test_find(L,I,Res) -> 
    V = random:uniform(1000000000), 
    {T,_} = timer:tc(finder,find,[V,L]), 
    test_find(L,I-1,[T|Res]). 

stat(L,N) -> 
    Aver = lists:sum(L)/N, 
    {Min,Max,Var} = lists:foldl(fun (X,{Mi,Ma,Va}) -> {min(X,Mi),max(X,Ma),Va+(X-Aver)*(X-Aver)} end, {999999999999999999999999999,0,0}, L), 
    Sig = math:sqrt(Var/N), 
    io_lib:format(" average: ~p,~n minimum: ~p,~n maximum: ~p,~n sigma : ~p.~n",[Aver,Min,Max,Sig]). 

这里有一些结果。

1> finder:test(1000,10)。 刀片:

平均:266.7,

最小:216,

最大:324,

西格玛:36.98121144581393。

发现:

average: 136.1, 

最小:105,

最大:162,

西格玛:15.378231367748375。

ok

2> finder:test(100000,10)。

插入:

平均值:10096。5,

最小:9541,

最大:12222,

西格玛:762.5642595873478。

发现:

average: 5077.4, 

最低:4666,

最大:6937,

西格玛:627.126494417195。

ok

3> finder:test(1000000,10)。

刀片:

平均:109871.1,

最小:94747,

最大:139916,

西格玛:13852.211285206417。

发现: 平均:40428.0,

最低:31297,

最大:56965,

西格玛:7797.425562325042。

ok

4> finder:test(100000000,10)。

刀片:

平均:8067547.8,

最小:6265625,

最大:16590349,

西格玛:3199868.809140206。

发现:

average: 8484876.4, 

最低:5158504,

最大:15950944,

西格玛:4044848.707872872。

确定

在100 000 000列表,它是缓慢的,并且,多进程的解决方案在这种二分法算法不能帮助...这是该解决方案的不足之处,但如果你有几个并行处理请求找到最接近的值,无论如何,它将能够使用多核。

帕斯卡。