2010-03-27 227 views
1

解释有点复杂,但我们现在就去。基本上,问题是“如何以有效的方式将问题分解为子问题”。这里的“高效”意味着,破碎的子问题尽可能大。基本上,如果我根本不需要解决问题,那将是理想的。但是,因为工人只能在特定的问题上工作,所以我需要分手。但我想找到尽可能粗糙的方法。分配工作的高效算法?

下面是一些伪代码..

我们有这样的问题(对不起这是在Java中,如果你不明白,我会很高兴来解释)。

class Problem { 
    final Set<Integer> allSectionIds = { 1,2,4,6,7,8,10 }; 
    final Data data = //Some data 
} 

而且一个子问题是:

class SubProblem { 
    final Set<Integer> targetedSectionIds; 
    final Data data; 

    SubProblem(Set<Integer> targetedSectionsIds, Data data){ 
     this.targetedSectionIds = targetedSectionIds; 
     this.data = data; 
    } 
} 

工作将这个样子,然后。

class Work implements Runnable { 
    final Set<Section> subSections; 
    final Data data; 
    final Result result; 

    Work(Set<Section> subSections, Data data) { 
     this.sections = SubSections; 
     this.data = data; 
    } 

    @Override 
    public void run(){ 
     for(Section section : subSections){ 
      result.addUp(compute(data, section)); 
     } 
    } 
} 

现在我们有“工人”的情况下,有自己的状态sections I have

class Worker implements ExecutorService { 
    final Map<Integer,Section> sectionsIHave; 
    { 
     sectionsIHave = {1:section1, 5:section5, 8:section8 }; 
    } 

    final ExecutorService executor = //some executor. 

    @Override 
    public void execute(SubProblem problem){ 
     Set<Section> sectionsNeeded = fetchSections(problem.targetedSectionIds); 
     super.execute(new Work(sectionsNeeded, problem.data); 
    } 

} 

phew。

所以,我们有很多Problem s和Workers不断要求更多。我的任务是将​​分解为SubProblem并提供给他们。然而,难点在于我必须稍后收集SubProblems的所有结果,并将它们合并(减少)为整个ProblemResult。然而,这是昂贵的,所以我想给工人尽可能大的“块”(尽可能多的targetedSections)。

它不一定是完美的(数学上尽可能高效或者什么的)。我的意思是,我想不可能有完美的解决方案,因为你无法预测每次计算需要多长时间,等等。但是有没有一个很好的启发式解决方案?或者,也许我可以在设计之前阅读一些资源?

任何意见是高度赞赏!

编辑: 我们也控制部分分配,所以控制这是另一种选择。基本上,对此的唯一限制是工人只能有固定数量的部分。

+0

我真的不知道它是否适用,因为我没有足够的了解它,但叉/加入似乎是这样做的算法。 http://www.ibm.com/developerworks/java/library/j-jtp11137.html – nicerobot 2010-03-27 21:50:45

+0

谢谢。我试图让我的头在附近。但问题是,即使我使用这个框架,我仍然必须提供分割任务的逻辑等等。所以我仍然会遇到这个问题。 – 2010-03-27 22:42:08

+0

分布式计算当然是一项不平凡的任务,而且一个活动高性能计算(HPC)研究领域。一本体面的大学文本是McGraw-Hill的Michael Quinn的“用MPI和OpenMP编写C语言的并行编程”ISBM 0-07-282256-2 – 2010-03-28 03:59:05

回答

1

好吧,看起来你的网络服务有一个分片模型,我们做类似的事情,我们使用“entityId”(sectionId)的反向索引到“client”(worker),它将连接到特定网络服务将处理该特定实体。最简单的方法(见下文)是对工作人员使用id的反向映射。如果内存是一个限制,另一种可能是使用一个函数(例如,sectionId%的服务数)。

为了给服务尽可能多的工作,有一个简单的批处理算法,将批量填充到某些用户指定的最大值。这将允许根据远程服务能够使用它们的速度大致确定大小的工作量。

public class Worker implements Runnable { 

    private final Map<Integer, Section> sections; 
    private final BlockingQueue<SubProblem> problemQ = new ArrayBlockingQueue<SubProblem>(4096); 
    private final int batchSize; 

    public Worker(final Map<Integer, Section> sectionsIHave, final int batchSize) { 
     this.sections = sectionsIHave; 
     this.batchSize = batchSize; 
    } 

    public Set<Integer> getSectionIds() { 
     return sections.keySet(); 
    } 

    public void execute(final SubProblem command) throws InterruptedException { 

     if (sections.containsKey(command.getSectionId())) { 
      problemQ.put(command); 
     } else { 
      throw new IllegalArgumentException("Invalid section id for worker: " + command.getSectionId()); 
     } 

    } 

    @Override 
    public void run() { 
     final List<SubProblem> batch = new ArrayList<SubProblem>(batchSize); 
     while (!Thread.interrupted()) { 
      batch.clear(); 

      try { 
       batch.add(problemQ.take()); 
       for (int i = 1; i < batchSize; i++) { 
        final SubProblem problem = problemQ.poll(); 
        if (problem != null) { 
         batch.add(problem); 
        } else { 
         break; 
        } 

        process(batch); 
       } 
      } catch (final InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
    } 

    private void process(final List<SubProblem> batch) { 
     // Submit to remote process. 
    } 

    private static Map<Integer, Worker> indexWorkers(final List<Worker> workers) { 
     final Map<Integer, Worker> temp = new HashMap<Integer, Worker>(); 
     for (final Worker worker : workers) { 
      for (final Integer sectionId : worker.getSectionIds()) { 
       temp.put(sectionId, worker); 
      } 
     } 
     return Collections.unmodifiableMap(temp); 
    } 

    public static void main(final String[] args) throws InterruptedException { 
    // Load workers, where worker is bound to single remote service 
     final List<Worker> workers = getWorkers(); 
     final Map<Integer, Worker> workerReverseIndex = indexWorkers(workers); 
     final List<SubProblem> subProblems = getSubProblems(); 
     for (final SubProblem problem : subProblems) { 
      final Worker w = workerReverseIndex.get(problem.getSectionId()); 
      w.execute(problem); 
     } 
    } 
} 
+0

谢谢!但是你知道,问题是我们的'线程'有状态(实际上它们是不同的物理机器)。如果我只是在中间切割部分,执行左或右的线程可能没有那个“部分”... – 2010-03-28 17:13:40