2014-03-05 40 views
7

我想叉加入优化

我要上叉的优化工作是什么/连接算法。通过优化,我的意思是只计算最佳线程数,或者如果你想 - 计算SEQUENTIAL_THRESHOLD(见下面的代码)。

// PSEUDOCODE 
Result solve(Problem problem) { 
    if (problem.size < SEQUENTIAL_THRESHOLD) 
     return solveSequentially(problem); 
    else { 
     Result left, right; 
     INVOKE-IN-PARALLEL { 
      left = solve(extractLeftHalf(problem)); 
      right = solve(extractRightHalf(problem)); 
     } 
     return combine(left, right); 
    } 
} 

如何想象

例如,我要计算大阵列的产品。然后我刚评估所有部件,并获得最佳的线程量:

SEQUENTIAL_THRESHOLD = PC * IS/MC(只是例子)

PC - 处理器核的数量; IS - 常数,表示具有一个处理器内核的最佳阵列大小以及对数据的最简单操作(例如读数); MC - 乘以经营成本;

假设MC = 15; PC = 4和IS = 10000; SEQUENTIAL_THRESHOLD = 2667。如果subtask-array大于2667,我会分叉它。

广泛的问题

  1. 是否有可能使SEQUENTIAL_THRESHOLD公式中这样的方式?
  2. 是否可以为更复杂的计算完成相同的操作:不仅适用于数组/集合和排序操作?

窄的问题:

不要存在关于SEQUENTIAL_THRESHOLD用于阵列/收藏/整理计算了一些调查?他们如何实现这一目标?

更新2014 3月7日:

  1. 如果没有办法写为阈值计算单式,我可以写将执行在PC上预定义测试的UTIL,而且比得到最佳阈?这也是不可能的或不是?
  2. Java 8 Streams API可以做什么?它可以帮助我吗? Java 8 Streams API是否消除了Fork/Join中的需求?
+1

“为串行和并行执行之间选择理想的阈值是协调并行任务的成本的函数。如果协调成本是零,更细粒度的任务数量较多倾向于提供更好的并行;下层的协调成本,我们可以在我们需要切换到顺序方法之前进行细化。“ - 从网站引用的伪代码来自:http://www.ibm.com/developerworks/library/j-jtp11137/;) – Marco13

+0

换句话说,不要使用CPU内核的数量作为*可用内核*可随时更改。阈值应该足够大,以便与问题大小相比,分裂的开销并不重要。越多的子任务'Executor'/scheduler适应实际系统负载情况的自由度越大。 – Holger

+1

这是map-reduce任务的典型问题。您应该查看Java 8的Streams API。它将处理接近最佳的执行。只要您没有使用HPC,您就不应过分担心达到最佳效果。如果计算发生在那里,即使花费更长时间,您也希望桌面保持负责。 – allprog

回答

1

这是一个非常有趣的调查问题。我已经编写了这个简单的代码来测试顺序阈值的最优值。尽管我无法得出任何具体的结论,但最有可能的原因是我在仅有2个处理器的旧笔记本电脑上运行它。多次运行后唯一一致的观察结果是,所用时间迅速下降,直到达到100的连续阈值。尝试运行此代码并让我知道您找到了什么。同样在底部,我附加了一个用于绘制结果的Python脚本,以便我们可以直观地看到趋势。

import java.io.FileWriter; 
import java.util.concurrent.ForkJoinPool; 
import java.util.concurrent.RecursiveAction; 

public class Testing { 

static int SEQ_THRESHOLD; 

public static void main(String[] args) throws Exception { 
    int size = 100000; 
    int[] v1 = new int[size]; 
    int[] v2 = new int[size]; 
    int[] v3 = new int[size]; 
    for (int i = 0; i < size; i++) { 
     v1[i] = i; // Arbitrary initialization 
     v2[i] = 2 * i; // Arbitrary initialization 
    } 
    FileWriter fileWriter = new FileWriter("OutTime.dat"); 

    // Increment SEQ_THRESHOLD and save time taken by the code to run in a file 
    for (SEQ_THRESHOLD = 10; SEQ_THRESHOLD < size; SEQ_THRESHOLD += 50) { 
     double avgTime = 0.0; 
     int samples = 5; 
     for (int i = 0; i < samples; i++) { 
      long startTime = System.nanoTime(); 
      ForkJoinPool fjp = new ForkJoinPool(); 
      fjp.invoke(new VectorAddition(0, size, v1, v2, v3)); 
      long endTime = System.nanoTime(); 
      double secsTaken = (endTime - startTime)/1.0e9; 
      avgTime += secsTaken; 
     } 
     fileWriter.write(SEQ_THRESHOLD + " " + (avgTime/samples) + "\n"); 
    } 

    fileWriter.close(); 
} 
} 

class VectorAddition extends RecursiveAction { 

int[] v1, v2, v3; 
int start, end; 

VectorAddition(int start, int end, int[] v1, int[] v2, int[] v3) { 
    this.start = start; 
    this.end = end; 
    this.v1 = v1; 
    this.v2 = v2; 
    this.v3 = v3; 
} 

int SEQ_THRESHOLD = Testing.SEQ_THRESHOLD; 

@Override 
protected void compute() { 
    if (end - start < SEQ_THRESHOLD) { 
     // Simple vector addition 
     for (int i = start; i < end; i++) { 
      v3[i] = v1[i] + v2[i]; 
     } 
    } else { 
     int mid = (start + end)/2; 
     invokeAll(new VectorAddition(start, mid, v1, v2, v3), 
       new VectorAddition(mid, end, v1, v2, v3)); 
    } 
} 
} 

,这里是密谋的结果Python脚本:

from pylab import * 

threshold = loadtxt("./OutTime.dat", delimiter=" ", usecols=(0,)) 
timeTaken = loadtxt("./OutTime.dat", delimiter=" ", usecols=(1,)) 

plot(threshold, timeTaken) 
show() 
+0

看看我的问题的更新版本请 –

3

你不能熬下来,以一个简单的公式有以下几个原因:

  • 每台PC都会有很大的不同参数不仅取决于核心,还取决于RAM时序或后台任务等其他因素。

  • Java本身在执行期间正在优化循环。所以瞬间的完美设置可能会在几秒钟后变得不理想。或者更糟糕的是,这种调整可能会阻止完美的优化。

我能看到的唯一方法是动态调整某种形式的AI或遗传算法的值。但是,这包括程序经常检查非最佳设置,以确定当前设置是否仍然最佳。因此,获得的速度实际上是否高于尝试其他设置所损失的速度,这是值得怀疑的。最终可能只是在初始学习阶段的一个解决方案,而进一步的执行然后将这些训练的值用作固定数字。因为这不仅花费时间,而且大大增加了代码复杂度,所以我认为这不是大多数程序的选项。通常,首先不要使用Fork-Join更为有利,因为还有许多其他并行化选项可能更适合该问题。

“遗传”算法的一个想法是测量每次运行的循环效率,然后有一个不断更新的背景散列图loop-parameters -> execution time,并且大多数运行选择最快的设置。

+0

是的,我可以做一些像'loop-parameters - > execution time'。但是,我应该为每个PC配置计算该表,这是不可能的。我应该如何链接所有与PC配置或者我应该考虑的参数? –

+0

是否有任何有16+内核的服务,我可以测试我的fork/join算法的性能? –

+0

你不能预先计算这些参数,你需要在实际的机器上进行。因此,要么有一个像校准阶段那样添加到您的代码来确定这些参数,或者 - 如果您只有一小部分机器 - 在实际执行之前手动执行一些测量。 – TwoThe

5

有绝对,肯定没办法计算合适的门槛,除非你是亲密的执行环境。我维持sourceforge.net叉子/加入的项目,这是我在大多数的内置函数使用的代码:

private int calcThreshold(int nbr_elements, int passed_threshold) { 

    // total threads in session 
    // total elements in array 
    int threads = getNbrThreads(); 
    int count = nbr_elements + 1; 

    // When only one thread, it doesn't pay to decompose the work, 
    // force the threshold over array length 
    if (threads == 1) return count;  

    /* 
    * Whatever it takes 
    * 
    */ 
    int threshold = passed_threshold; 

    // When caller suggests a value 
    if (threshold > 0) { 

     // just go with the caller's suggestion or do something with the suggestion 

    } else { 
     // do something usful such as using about 8 times as many tasks as threads or 
     // the default of 32k 
     int temp = count/(threads << 3); 
     threshold = (temp < 32768) ? 32768 : temp; 

    } // endif  

    // whatever 
    return threshold; 

} 

编辑3月9日:

你怎么可能有一个大致的效用不仅可以知道处理器速度,可用内存,处理器数量等(物理环境),还可以知道软件的用意?答案是你不能。这就是为什么你需要为每个环境开发一个例程。上面的方法是我使用的基本阵列我使用另一个对大多数基质处理(矢量):

// When very small, just spread every row 
if (count < 6) return 1; 

// When small, spread a little 
if (count < 30) return ((count/(threads << 2) == 0)? threads : (count/(threads << 2))); 

// this works well for now 
return ((count/(threads << 3) == 0)? threads : (count/(threads << 3))); 

至于Java8流:他们使用F/J框架罩下,你不能指定阈。

+0

看看我的问题的更新版本,请 –