2010-06-06 78 views
6

我有一个算法,将通过一个大型的数据集读取一些文本文件,并在这些行中搜索特定的术语。我已经用Java实现了它,但我不想发布代码,以便它看起来不在我正在寻找某人为我实现它,但这确实是我真的需要很多帮助!这不是我的项目计划,但数据集是巨大的,所以老师告诉我,我必须这样做。需要帮助实现这个算法与地图Hadoop MapReduce

编辑(我没有澄清我previos版本)的数据集我是Hadoop集群上,我应该做出的MapReduce实现

我读到关于MapReduce和thaught,我第一次做标准的实现,然后用mapreduce做起来会更容易/不那么容易。但没有发生,因为算法是相当愚蠢的,没有什么特别的,地图减少...我不能笼络它。

所以这里不久伪我的算法的代码

LIST termList (there is method that creates this list from lucene index) 
FOLDER topFolder 

INPUT topFolder 
IF it is folder and not empty 
    list files (there are 30 sub folders inside) 
    FOR EACH sub folder 
     GET file "CheckedFile.txt" 
     analyze(CheckedFile) 
    ENDFOR 
END IF 


Method ANALYZE(CheckedFile) 

read CheckedFile 
WHILE CheckedFile has next line 
    GET line 
    FOR(loops through termList) 
      GET third word from line 
      IF third word = term from list 
     append whole line to string buffer 
    ENDIF 
ENDFOR 
END WHILE 
OUTPUT string buffer to file 

而且,正如你所看到的,每次当“分析”之称,新文件被创建时,我明白了地图减轻困难写入许多输出?

我明白mapreduce的直觉,我的例子似乎完全适合mapreduce,但是当涉及到这样做时,显然我不知道足够多,而且我很饿!

请帮忙。

回答

3

您可以使用空的缩减器,并对作业进行分区,以便为每个文件运行一个映射器。每个映射器将在您的输出文件夹中创建它自己的输出文件。

+0

嗨! Thanx的答案!但我不确定我明白:/你能给我更多的信息吗?你可能有这样的例子吗? – Julia 2010-06-08 15:30:26

2

Map Reduce使用一些很好的Java 6并发功能,特别是Future,Callable和ExecutorService,很容易实现。

我创建了一个可赎回,将在路分析文件指定

public class FileAnalyser implements Callable<String> { 

    private Scanner scanner; 
    private List<String> termList; 

    public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException { 
    this.termList = termList; 
    scanner = new Scanner(new File(filename)); 
    } 

    @Override 
    public String call() throws Exception { 
    StringBuilder buffer = new StringBuilder(); 
    while (scanner.hasNextLine()) { 
     String line = scanner.nextLine(); 
     String[] tokens = line.split(" "); 
     if ((tokens.length >= 3) && (inTermList(tokens[2]))) 
     buffer.append(line); 
    } 
    return buffer.toString(); 
    } 

    private boolean inTermList(String term) { 
    return termList.contains(term); 
    } 
} 

我们需要创建一个新的可调用找到的每个文件,并提交该执行人的服务。提交的结果是一个Future,我们稍后可以使用它来获取文件解析的结果。

public class Analayser { 

    private static final int THREAD_COUNT = 10; 

    public static void main(String[] args) { 

    //All callables will be submitted to this executor service 
    //Play around with THREAD_COUNT for optimum performance 
    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); 

    //Store all futures in this list so we can refer to them easily 
    List<Future<String>> futureList = new ArrayList<Future<String>>(); 

    //Some random term list, I don't know what you're using. 
    List<String> termList = new ArrayList<String>(); 
    termList.add("terma"); 
    termList.add("termb"); 

    //For each file you find, create a new FileAnalyser callable and submit 
    //this to the executor service. Add the future to the list 
    //so we can check back on the result later 
    for each filename in all files { 
     try { 
     Callable<String> worker = new FileAnalyser(filename, termList); 
     Future<String> future = executor.submit(worker); 
     futureList.add(future); 
     } 
     catch (FileNotFoundException fnfe) { 
     //If the file doesn't exist at this point we can probably ignore, 
     //but I'll leave that for you to decide. 
     System.err.println("Unable to create future for " + filename); 
     fnfe.printStackTrace(System.err); 
     } 
    } 

    //You may want to wait at this point, until all threads have finished 
    //You could maybe loop through each future until allDone() holds true 
    //for each of them. 

    //Loop over all finished futures and do something with the result 
    //from each 
    for (Future<String> current : futureList) { 
     String result = current.get(); 
     //Do something with the result from this future 
    } 
    } 
} 

我的例子很不完整,远没有效率。我还没有考虑样本的大小,如果它真的很大,你可以不断循环在futureList,除去已完成的元素,类似于:

while (futureList.size() > 0) { 
     for (Future<String> current : futureList) { 
     if (current.isDone()) { 
      String result = current.get(); 
      //Do something with result 
      futureList.remove(current); 
      break; //We have modified the list during iteration, best break out of for-loop 
     } 
     } 
} 

或者你可以实现一个生产者 - 消费者类型设置,其中的制片人向执行者服务器提交可调用卡并产生未来,消费者将获得未来的结果,然后丢弃未来。

这可能需要生产者和消费者自行创建线索,并且需要同步添加/删除期货列表。

有任何问题,请提出。

+0

嗨!非常感谢您提出的解决方案!我很抱歉,我可能没有明确指出问题,尽管我尝试了。我的错误是,我刚刚在标题中提到了Hadoop,但我的数据集位于运行hadoop的群集上,所以我应该根据Hadoop MaPreduce框架来实现它...现在编辑我的帖子。我分析的数据集是6GB :/太多的并发来应付它? – Julia 2010-06-07 17:10:20

+0

哎呀,我是一个小菜在这里:D 为了让我自己稍微兑现,我在100个文件上运行我的代码,每个约61MB,总共约6GB。我不完全确定你的文件解析器是干什么的,所以把这些血腥的细节排除在外,只是扫描每一行并返回一个空字符串。我知道有点做作。 性能不是太糟糕,线程池大小为100,因此所有100个文件都被解析,而不会被执行程序服务排队。我的Atom处理器的总运行时间为17分钟。 对不起,我无法正确回答你的问题。我没有使用Hadoop的经验,但在阅读SquareCog的答案之后才有意义。 – 2010-06-08 05:57:10

+0

嗨!非常感谢你,你帮了很多忙,因为我无法应付hadoop先生和大脑的时间。我将有几个更类似的算法来实现,所以我必须以我能够做到的方式来尝试它。无法在任何地方获得hadoop帮助:/ 因此,我的代码已采用,并且在我的英特尔2Ghz上带有线程池42花了大约20分钟时间解析并将结果输出到新文件中,但只有200Mb数据(42个文件)。再次,我必须对解析器做一些修改,它必须做一些更严格的匹配,而不是纯粹的“包含”术语,所以当我运行它时,我让你知道结果:) – Julia 2010-06-09 21:15:54