2010-08-06 80 views
6

我正在尝试使用Java中的并行化算法。我从合并排序开始,并在question中发布我的尝试。我的修改尝试在下面的代码中,我现在尝试并行快速排序。Java:通过多线程并行化快速排序

在我的多线程实现或解决此问题的方法中是否存在任何菜鸟错误?如果不是,我认为在双核心上的顺序算法和并行算法之间的速度增加不应超过32%(见底部的时序)?

这里是多线程算法:

public class ThreadedQuick extends Thread 
    { 
     final int MAX_THREADS = Runtime.getRuntime().availableProcessors(); 

     CountDownLatch doneSignal; 
     static int num_threads = 1; 

     int[] my_array; 
     int start, end; 

     public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) { 
      this.my_array = array; 
      this.start = start; 
      this.end = end; 
      this.doneSignal = doneSignal; 
     } 

     public static void reset() { 
      num_threads = 1; 
     } 

     public void run() { 
      quicksort(my_array, start, end); 
      doneSignal.countDown(); 
      num_threads--; 
     } 

     public void quicksort(int[] array, int start, int end) { 
      int len = end-start+1; 

      if (len <= 1) 
       return; 

      int pivot_index = medianOfThree(array, start, end); 
      int pivotValue = array[pivot_index]; 

      swap(array, pivot_index, end); 

      int storeIndex = start; 
      for (int i = start; i < end; i++) { 
       if (array[i] <= pivotValue) { 
        swap(array, i, storeIndex); 
        storeIndex++; 
       } 
      } 

      swap(array, storeIndex, end); 

      if (num_threads < MAX_THREADS) { 
       num_threads++; 

       CountDownLatch completionSignal = new CountDownLatch(1); 

       new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start(); 
       quicksort(array, storeIndex + 1, end); 

       try { 
        completionSignal.await(1000, TimeUnit.SECONDS); 
       } catch(Exception ex) { 
        ex.printStackTrace(); 
       } 
      } else { 
       quicksort(array, start, storeIndex - 1); 
       quicksort(array, storeIndex + 1, end); 
      } 
     } 
    } 

这是我如何开始它关闭:

ThreadedQuick.reset(); 
CountDownLatch completionSignal = new CountDownLatch(1); 
new ThreadedQuick(completionSignal, array, 0, array.length-1).start(); 
try { 
    completionSignal.await(1000, TimeUnit.SECONDS); 
} catch(Exception ex){ 
    ex.printStackTrace(); 
} 

我测试了这对Arrays.sort和类似的连续快速排序算法。下面是在Intel决斗核戴尔笔记本电脑的计时结果,在秒:

元素:500,000, 顺序:0.068592, 螺纹:0.046871, Arrays.sort:0.079677

元素:1,000,000, 顺序:0.14416, 螺纹:0.095492, Arrays.sort:0.167155

元素:2000000, 顺序:0.301666, 螺纹:0.205719, Arrays.sort:0.350982

元素:4000000, 顺序:0.623291, 螺纹:0.424119, Arrays.sort:0.712698

元素:8000000, 顺序:1.279374, 螺纹:0.859363, Arrays.sort:1.487671

上面的每个数字是100次测试的平均时间,抛出了3个最低和3个最高的情况。我使用Random.nextInt(Integer.MAX_VALUE)为每个测试生成一个数组,每10次测试使用相同的种子初始化一次。每个测试包括使用System.nanoTime对给定算法进行计时。平均之后,我四舍五入到小数点后六位。显然,我确实检查了每种是否工作。如你所见,在每一组测试中,序列和线程的情况之间的速度增加约32%。正如我上面所问,我不应该期望更多吗?

+1

你的问题是什么? – 2011-08-12 11:21:23

+0

你在说什么语言?你想知道如何在Java或C#排序# – 2011-08-12 11:26:07

+0

所以问题是如何在Java中实现这种语法和优化线程与CPU的工作。 – 2011-08-12 11:28:51

回答

10

将numThreads设置为静态可能会导致问题,因此很可能最终MAX_THREADS会在某个点运行。

很可能你之所以没有获得完整性能的原因是你的快速排序不能完全平行。请注意,第一次调用quicksort会在开始真正并行运行之前在初始线程中通过整个数组。在为了分离线程而进行养殖时,还有一种以上下文切换和模式转换形式并行化算法的开销。

看看Fork/Join框架,这个问题可能会非常整齐地适合那里。

实施中的一些要点。实现Runnable而不是扩展Thread。只有在创建Thread类的新版本时才应使用扩展线程。当你只是想做一些并行运行的工作时,你最好用Runnable。在实现Runnable的同时,您还可以扩展另一个类,从而为您提供更多的面向对象设计的灵活性。使用限制在系统中可用线程数的线程池。也不要使用numThreads来决定是否分离新线程。你可以预先计算。使用最小分区大小,这是整个阵列大小除以可用处理器数量的大小。喜欢的东西:

public class ThreadedQuick implements Runnable { 

    public static final int MAX_THREADS = Runtime.getRuntime().availableProcessors(); 
    static final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS); 

    final int[] my_array; 
    final int start, end; 

    private final int minParitionSize; 

    public ThreadedQuick(int minParitionSize, int[] array, int start, int end) { 
     this.minParitionSize = minParitionSize; 
     this.my_array = array; 
     this.start = start; 
     this.end = end; 
    } 

    public void run() { 
     quicksort(my_array, start, end); 
    } 

    public void quicksort(int[] array, int start, int end) { 
     int len = end - start + 1; 

     if (len <= 1) 
      return; 

     int pivot_index = medianOfThree(array, start, end); 
     int pivotValue = array[pivot_index]; 

     swap(array, pivot_index, end); 

     int storeIndex = start; 
     for (int i = start; i < end; i++) { 
      if (array[i] <= pivotValue) { 
       swap(array, i, storeIndex); 
       storeIndex++; 
      } 
     } 

     swap(array, storeIndex, end); 

     if (len > minParitionSize) { 

      ThreadedQuick quick = new ThreadedQuick(minParitionSize, array, start, storeIndex - 1); 
      Future<?> future = executor.submit(quick); 
      quicksort(array, storeIndex + 1, end); 

      try { 
       future.get(1000, TimeUnit.SECONDS); 
      } catch (Exception ex) { 
       ex.printStackTrace(); 
      } 
     } else { 
      quicksort(array, start, storeIndex - 1); 
      quicksort(array, storeIndex + 1, end); 
     } 
    }  
} 

您可以通过执行启动它:

ThreadedQuick quick = new ThreadedQuick(array/ThreadedQuick.MAX_THREADS, array, 0, array.length - 1); 
quick.run(); 

这将启动在同一个线程,以避免在启动时不必要的线程跳的那种。

警告:不确定上面的实现实际上会更快,因为我没有对它进行基准测试。

1

夫妇的意见,如果我理解你的代码的权利:

  1. 我没有看到一个锁周围,即使它可以通过多个线程访问的对象numthreads。也许你应该让它成为一个AtomicInteger。

  2. 使用线程池并安排任务,即对快速排序的单个调用,以获取线程池的优势。使用期货。

您目前的方法是按照您所做的方式划分事物,可能会留下一个线程较小的划分,而没有线程的划分会较小。也就是说,它不会优先考虑拥有自己线程的较大细分市场。

+0

实际上,对于原始的int,你可以使用volatile来确保你做内存,而不是通过缓存。 – 2010-08-06 16:45:11

3

这使用快速排序和合并排序的组合。

import java.util.Arrays; 
import java.util.Random; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

public class ParallelSortMain { 
    public static void main(String... args) throws InterruptedException { 
     Random rand = new Random(); 
     final int[] values = new int[100*1024*1024]; 
     for (int i = 0; i < values.length; i++) 
      values[i] = rand.nextInt(); 

     int threads = Runtime.getRuntime().availableProcessors(); 
     ExecutorService es = Executors.newFixedThreadPool(threads); 
     int blockSize = (values.length + threads - 1)/threads; 
     for (int i = 0; i < values.length; i += blockSize) { 
      final int min = i; 
      final int max = Math.min(min + blockSize, values.length); 
      es.submit(new Runnable() { 
       @Override 
       public void run() { 
        Arrays.sort(values, min, max); 
       } 
      }); 
     } 
     es.shutdown(); 
     es.awaitTermination(10, TimeUnit.MINUTES); 
     for (int blockSize2 = blockSize; blockSize2 < values.length/2; blockSize2 *= 2) { 
      for (int i = 0; i < values.length; i += blockSize2) { 
       final int min = i; 
       final int mid = Math.min(min + blockSize2, values.length); 
       final int max = Math.min(min + blockSize2 * 2, values.length); 
       mergeSort(values, min, mid, max); 
      } 
     } 
    } 

    private static boolean mergeSort(int[] values, int left, int mid, int end) { 
     int[] results = new int[end - left]; 
     int l = left, r = mid, m = 0; 
     for (; l < left && r < mid; m++) { 
      int lv = values[l]; 
      int rv = values[r]; 
      if (lv < rv) { 
       results[m] = lv; 
       l++; 
      } else { 
       results[m] = rv; 
       r++; 
      } 
     } 
     while (l < mid) 
      results[m++] = values[l++]; 
     while (r < end) 
      results[m++] = values[r++]; 
     System.arraycopy(results, 0, values, left, results.length); 
     return false; 
    } 
}