2016-07-05 64 views
3

我想将我的一些R代码移植到Julia; 基本上我已经重写了下述R代码朱莉娅:Julia pmap性能

library(parallel) 

eps_1<-rnorm(1000000) 
eps_2<-rnorm(1000000) 

large_matrix<-ifelse(cbind(eps_1,eps_2)>0,1,0) 
matrix_to_compare = expand.grid(c(0,1),c(0,1)) 
indices<-seq(1,1000000,4) 
large_matrix<-lapply(indices,function(i)(large_matrix[i:(i+3),])) 

function_compare<-function(x){ 
    which((rowSums(x==matrix_to_compare)==2) %in% TRUE) 
} 

> system.time(lapply(large_matrix,function_compare)) 
    user system elapsed 
38.812 0.024 38.828 
> system.time(mclapply(large_matrix,function_compare,mc.cores=11)) 
    user system elapsed 
63.128 1.648 6.108 

从一个核心去11.现在我试图做同样在朱莉娅的时候作为一个可以看到我越来越显著加速:

#Define cluster: 

addprocs(11); 

using Distributions; 
@everywhere using Iterators; 
d = Normal(); 

eps_1 = rand(d,1000000); 
eps_2 = rand(d,1000000); 


#Create a large matrix: 
large_matrix = hcat(eps_1,eps_2).>=0; 
indices = collect(1:4:1000000) 

#Split large matrix: 
large_matrix = [large_matrix[i:(i+3),:] for i in indices]; 

#Define the function to apply: 
@everywhere function function_split(x) 
    matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4))); 
    matrix_to_compare = matrix_to_compare.>0; 
    find(sum(x.==matrix_to_compare,2).==2) 
end 

@time map(function_split,large_matrix) 
@time pmap(function_split,large_matrix) 

    5.167820 seconds (22.00 M allocations: 2.899 GB, 12.83% gc time) 
    18.569198 seconds (40.34 M allocations: 2.082 GB, 5.71% gc time) 

正如人们可以注意到我没有得到任何加快与pmap。也许有人可以提出替代方案。

+1

'large_matrix'是'250000-元件阵列{任何,1}:'也许这是问题? – daycaster

+0

我真的不知道我是很新的朱莉娅 – Vitalijs

+0

在朱莉娅0.4.6我得到'结果如下addprocs(3)':'4.173674秒(22.97中号分配:2.943 GB,14.57%GC时间)'和 '0.795733秒(292.07 k分配:12.377 MB,0.83%gc时间)'。此外,'large_matrix'的类型是'Array {BitArray {2},1}'。 – tim

回答

1

我认为这里的一些问题是@parallel@pmap并不总是能够很好地处理来往工作人员的数据。因此,他们倾向于在所执行的内容不需要太多数据移动的情况下工作得最好。我也怀疑可能有些事情可以改善他们的表现,但我不确定细节。

对于情况下,你确实需要更多的数据走动,它可能是最好坚持使用那些直接调用工人功能,与当时的那些功能的那些工人的存储空间内访问对象的选择。下面给出一个例子,它使用多个工作者加速你的功能。它使用也许是最简单的选择,这是@everywhere,但@spawnremotecall()等也是值得考虑的,根据您的情况。

addprocs(11); 

using Distributions; 
@everywhere using Iterators; 
d = Normal(); 

eps_1 = rand(d,1000000); 
eps_2 = rand(d,1000000); 

#Create a large matrix: 
large_matrix = hcat(eps_1,eps_2).>=0; 
indices = collect(1:4:1000000); 

#Split large matrix: 
large_matrix = [large_matrix[i:(i+3),:] for i in indices]; 

large_matrix = convert(Array{BitArray}, large_matrix); 

function sendto(p::Int; args...) 
    for (nm, val) in args 
     @spawnat(p, eval(Main, Expr(:(=), nm, val))) 
    end 
end 

getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm))) 

@everywhere function function_split(x::BitArray) 
    matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4))); 
    matrix_to_compare = matrix_to_compare.>0; 
    find(sum(x.==matrix_to_compare,2).==2) 
end 


function distribute_data(X::Array, WorkerName::Symbol) 
    size_per_worker = floor(Int,size(X,1)/nworkers()) 
    StartIdx = 1 
    EndIdx = size_per_worker 
    for (idx, pid) in enumerate(workers()) 
     if idx == nworkers() 
      EndIdx = size(X,1) 
     end 
     @spawnat(pid, eval(Main, Expr(:(=), WorkerName, X[StartIdx:EndIdx]))) 
     StartIdx = EndIdx + 1 
     EndIdx = EndIdx + size_per_worker - 1 
    end 
end 

distribute_data(large_matrix, :large_matrix) 


function parallel_split() 
    @everywhere begin 
     if myid() != 1 
      result = map(function_split,large_matrix); 
     end 
    end 
    results = cell(nworkers()) 
    for (idx, pid) in enumerate(workers()) 
     results[idx] = getfrom(pid, :result) 
    end 
    vcat(results...) 
end 

## results given after running once to compile 
@time a = map(function_split,large_matrix); ## 6.499737 seconds (22.00 M allocations: 2.899 GB, 13.99% gc time) 
@time b = parallel_split(); ## 1.097586 seconds (1.50 M allocations: 64.508 MB, 3.28% gc time) 

julia> a == b 
true 

注意:即使是这样,从多个进程中加速并不完美。但是,这是可以预料的,因为你的函数仍然会返回中等数量的数据,并且这些数据需要移动,需要时间。

P.S.看到这个帖子(Julia: How to copy data to another processor in Julia)或该包(https://github.com/ChrisRackauckas/ParallelDataTransfer.jl)更多关于我这里使用的sendtogetfrom功能。