这里是如果问你试图分裂工作,我会做什么:
public class App {
public static class Statistics {
}
public static class StatisticsCalculator implements Callable<Statistics> {
private final List<String> lines;
public StatisticsCalculator(List<String> lines) {
this.lines = lines;
}
@Override
public Statistics call() throws Exception {
//do stuff with lines
return new Statistics();
}
}
public static void main(String[] args) {
final File file = new File("path/to/my/file");
final List<List<String>> partitionedWork = partitionWork(readLines(file), 10);
final List<Callable<Statistics>> callables = new LinkedList<>();
for (final List<String> work : partitionedWork) {
callables.add(new StatisticsCalculator(work));
}
final ExecutorService executorService = Executors.newFixedThreadPool(Math.min(partitionedWork.size(), 10));
final List<Future<Statistics>> futures;
try {
futures = executorService.invokeAll(callables);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
try {
for (final Future<Statistics> future : futures) {
final Statistics statistics = future.get();
//do whatever to aggregate the individual
}
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
static List<String> readLines(final File file) {
//read lines
return new ArrayList<>();
}
static List<List<String>> partitionWork(final List<String> lines, final int blockSize) {
//divide up the incoming list into a number of chunks
final List<List<String>> partitionedWork = new LinkedList<>();
for (int i = lines.size(); i > 0; i -= blockSize) {
int start = i > blockSize ? i - blockSize : 0;
partitionedWork.add(lines.subList(start, i));
}
return partitionedWork;
}
}
我已创建一个Statistics
对象,持有所做的工作的结果。
有一个StatisticsCalculator
对象这是一个Callable<Statistics>
- 这是做计算。给它一个List<String>
,它处理这些行并创建Statistics
。
readLines
方法我给你实施。
最重要的方法在许多方面是partitionWork
方法,这个划分传入List<String>
这是所有文件中使用blockSize
线成List<List<String>>
。这基本上决定了每个线程应该有多少工作,调整参数非常重要。就好像每项工作只有一条线路,那么管理费用可能会超出优势,而如果每条线路的工作量只有一条,那么只有一条线路工作Thread
。
最后操作的肉是main
方法。这将调用read和partition方法。它产生了一个ExecutorService
,其线程数等于工作的位数,但最多为10个。您可以通过这种方式使其等于您拥有的核心数。
的main
方法然后提交所有Callable
S,一个用于每个组块的List
到executorService
。方法会阻止invokeAll
直到完成工作。
该方法现在循环遍历每个返回的List<Future>
并获取生成的每个对象的Statistics
对象;准备聚合。
之后不要忘记关闭executorService
,因为它会阻止您的申请表退出。
编辑
OP希望通过线上线读取所以这里是一个修订main
public static void main(String[] args) throws IOException {
final File file = new File("path/to/my/file");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
final List<Future<Statistics>> futures = new LinkedList<>();
try (final BufferedReader reader = new BufferedReader(new FileReader(file))) {
List<String> tmp = new LinkedList<>();
String line = null;
while ((line = reader.readLine()) != null) {
tmp.add(line);
if (tmp.size() == 100) {
futures.add(executorService.submit(new StatisticsCalculator(tmp)));
tmp = new LinkedList<>();
}
}
if (!tmp.isEmpty()) {
futures.add(executorService.submit(new StatisticsCalculator(tmp)));
}
}
try {
for (final Future<Statistics> future : futures) {
final Statistics statistics = future.get();
//do whatever to aggregate the individual
}
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
这由线流文件中的行和后线的给定数量触发新任务处理到执行者的线路。
您需要调用clear
上的Callable
的List<String>
当你用它做的Callable
实例是由Future
就是他们返回引用。如果在完成这些操作后清除List
,应该大大减少内存占用量。
进一步的增强很可能使用的建议here为ExecutorService
阻止,直到有一个备用的话题 - 这将guranatee有从来没有超过threads*blocksize
行内存在的时间,如果您清除List
■当Callable
与他们完成。
听起来像你在正确的轨道上。让我们看看它。 – 2013-03-15 12:04:41
你看看Fork/Join框架能为你做什么吗? – 2013-03-15 12:15:27