2010-07-07 89 views
27

是否有可能为执行程序执行的任务设置优先级?我在JCIP中发现了一些关于这是可能的陈述,但我找不到任何示例,并且我找不到任何与文档相关的内容。Java执行程序:如何设置任务优先级?

从JCIP:

执行策略指定 “什么,在哪里,何时以及如何” 任务 执行的,其中包括:

  • ...
  • 在哪订单应执行任务(FIFO,LIFO,优先级订单)?
  • ...

UPD:我意识到,我问不正是我要问。我真正想要的是:

如何使用/仿真设置线程优先级(即什么是thread.setPriority())与执行者框架?

回答

48

目前的the Executor interface唯一具体的实现是the ThreadPoolExecutorthe ScheduledThreadpoolExecutor

而不是使用公共设施/工厂类Executors的,你应该创建使用构造一个实例。

您可以将BlockingQueue传递给ThreadPoolExecutor的构造函数。

BlockingQueue的一个实现the PriorityBlockingQueue允许您将一个Comparator传递给构造函数,这样您可以决定执行的顺序。

+3

+1的PriorityBlockingQueue是要走的路。您可以实现比较器或将任务自己设置为Comparable。 – 2010-07-08 00:47:24

+2

本文是一个很好的参考资料:http://binkley.blogspot.fr/2009/04/jumping-work-queue-in-executor.html – Snicolas 2013-06-04 08:43:52

+0

我的解决方案按优先顺序排列任务,但保留相同优先级的提交顺序: http://stackoverflow.com/a/42831172/1386911 – 2017-03-16 10:26:45

0

请注意,setPriority(..)一般不会在Linux下工作。请参阅以下链接的全部细节:

+2

评论有意见;答案是答案。评论不是答案。答案不是评论。如果它不回答被问到的问题,那实际上是一个评论。 – 2012-09-30 19:02:37

+0

+1 @尼克 - 哈哈,喜欢它!为什么用一个词,当你可以使用一个冗长,单调的评论。好点(好脸色)。 – TedTrippin 2013-04-19 14:01:24

2

您可以指定在ThreadPoolExecutor构造函数(或Executors工厂法)ThreadFactory。这使您可以为执行程序提供给定线程优先级的线程。

要获得不同作业的不同线程优先级,您需要将它们发送给具有不同线程工厂的执行程序。

30

这里的想法是在执行器中使用PriorityBlockingQueue。为此:

  • 创建比较器,比较我们的期货。
  • 为未来创建代理以保持优先级。
  • 覆盖'newTaskFor'以便在我们的代理中包装每个未来。

首先,你需要在你的未来保持优先级:

class PriorityFuture<T> implements RunnableFuture<T> { 

    private RunnableFuture<T> src; 
    private int priority; 

    public PriorityFuture(RunnableFuture<T> other, int priority) { 
     this.src = other; 
     this.priority = priority; 
    } 

    public int getPriority() { 
     return priority; 
    } 

    public boolean cancel(boolean mayInterruptIfRunning) { 
     return src.cancel(mayInterruptIfRunning); 
    } 

    public boolean isCancelled() { 
     return src.isCancelled(); 
    } 

    public boolean isDone() { 
     return src.isDone(); 
    } 

    public T get() throws InterruptedException, ExecutionException { 
     return src.get(); 
    } 

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 
     return src.get(); 
    } 

    public void run() { 
     src.run(); 
    } 
} 

接下来,您需要定义比较,将正确的优先期货排序:

class PriorityFutureComparator implements Comparator<Runnable> { 
    public int compare(Runnable o1, Runnable o2) { 
     if (o1 == null && o2 == null) 
      return 0; 
     else if (o1 == null) 
      return -1; 
     else if (o2 == null) 
      return 1; 
     else { 
      int p1 = ((PriorityFuture<?>) o1).getPriority(); 
      int p2 = ((PriorityFuture<?>) o2).getPriority(); 

      return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); 
     } 
    } 
} 

接下来让我们假设我们有一个冗长的工作是这样的:

class LenthyJob implements Callable<Long> { 
    private int priority; 

    public LenthyJob(int priority) { 
     this.priority = priority; 
    } 

    public Long call() throws Exception { 
     System.out.println("Executing: " + priority); 
     long num = 1000000; 
     for (int i = 0; i < 1000000; i++) { 
      num *= Math.random() * 1000; 
      num /= Math.random() * 1000; 
      if (num == 0) 
       num = 1000000; 
     } 
     return num; 
    } 

    public int getPriority() { 
     return priority; 
    } 
} 

然后,为了执行优先这些工作的代码如下:

public class TestPQ { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     int nThreads = 2; 
     int qInitialSize = 10; 

     ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, 
       new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) { 

      protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
       RunnableFuture<T> newTaskFor = super.newTaskFor(callable); 
       return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority()); 
      } 
     }; 

     for (int i = 0; i < 20; i++) { 
      int priority = (int) (Math.random() * 100); 
      System.out.println("Scheduling: " + priority); 
      LenthyJob job = new LenthyJob(priority); 
      exec.submit(job); 
     } 
    } 
} 

这是一个很大的代码,但是这是几乎可以完成此的唯一途径。

在我的机器的输出是这样的:

Scheduling: 39 
Scheduling: 90 
Scheduling: 88 
Executing: 39 
Scheduling: 75 
Executing: 90 
Scheduling: 15 
Scheduling: 2 
Scheduling: 5 
Scheduling: 24 
Scheduling: 82 
Scheduling: 81 
Scheduling: 3 
Scheduling: 23 
Scheduling: 7 
Scheduling: 40 
Scheduling: 77 
Scheduling: 49 
Scheduling: 34 
Scheduling: 22 
Scheduling: 97 
Scheduling: 33 
Executing: 2 
Executing: 3 
Executing: 5 
Executing: 7 
Executing: 15 
Executing: 22 
Executing: 23 
Executing: 24 
Executing: 33 
Executing: 34 
Executing: 40 
Executing: 49 
Executing: 75 
Executing: 77 
Executing: 81 
Executing: 82 
Executing: 88 
Executing: 97 
+0

虽然*接受答案*确实回答了这个问题,这个提供了一个工作解决方案。非常感谢。 – m02ph3u5 2015-10-06 16:13:48

+0

感谢您的回答。是否可以在ExecutorCompletionService中使用这种方法?我尝试传入ExecutorCompletionService构造函数中的ExecutorService对象,但结果无法传递给比较器中的PriorityFuture。 – Arash 2015-11-27 23:46:55

+0

我在我的机器上测试过。这是不正确的。在我的机器上,执行3之前执行了72,这显然是错误的。 – 2016-11-03 14:43:53

0

只是想我的贡献位加入讨论。我已经实现了这个ReorderingThreadPoolExecutor用于一个非常具体的目的,它能够在需要时显式地将执行者的BlockingQueue(在这种情况下是LinkedBlockingDeque)带到前面,而不必处理优先级(这可能导致死锁并且是无论如何,固定)。

我正在使用它来管理(在Android应用程序内)的情况下,我必须下载许多图像显示在长列表视图。每当用户快速向下滚动时,执行程序队列就会充满图像下载请求:通过移动队列顶部的最新队列,我在加载实际在屏幕上的图像方面取得了更好的性能,延迟了下载那些稍后可能需要的。请注意,我使用内部并发映射键(可以像图像URL字符串一样简单)将任务添加到执行程序,以便稍后可以检索它们以进行重新排序。

还有很多其他方法可以做到这一点,也许它过于复杂,但它工作正常,而且他的Android SDK中的Facebook在其自己的工作线程队列中也做了类似的事情。

随意看看代码,给我的建议,这是一个Android项目内,但剥离了几个日志和注解会使类纯Java 6

0

您可以实现自己的ThreadFactory和设置它内部的ThreadPoolExecutor是这样的:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2)); 

在我的OpJobThreadFactory如下所示:

public final static class OpJobThreadFactory implements ThreadFactory { 
    private int priority; 
    private boolean daemon; 
    private final String namePrefix; 
    private static final AtomicInteger poolNumber = new AtomicInteger(1); 
    private final AtomicInteger threadNumber = new AtomicInteger(1); 

    public OpJobThreadFactory(int priority) { 
     this(priority, true); 
    } 

    public OpJobThreadFactory(int priority, boolean daemon) { 
     this.priority = priority; 
     this.daemon = daemon; 
     namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-"; 
    } 

    @Override 
    public Thread newThread(Runnable r) { 
     Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement()); 
     t.setDaemon(daemon); 
     t.setPriority(priority); 
     return t; 
    } 
}