2012-02-13 120 views
3

我有一个巨大的CSV文件,我想在Amazon EMR(python)上使用Hadoop MapReduce进行处理。在Python中使用Hadoop来处理一个大的csv文件

该文件有7个领域,但是,我只在看日期数量场。

"date" "receiptId" "productId" "quantity" "price" "posId" "cashierId" 

首先,我mapper.py

import sys 

def main(argv): 
    line = sys.stdin.readline() 
    try: 
     while line: 
      list = line.split('\t') 

      #If date meets criteria, add quantity to express key 
       if int(list[0][11:13])>=17 and int(list[0][11:13])<=19: 
        print '%s\t%s' % ("Express", int(list[3])) 
      #Else, add quantity to non-express key 
       else: 
        print '%s\t%s' % ("Non-express", int(list[3])) 

      line = sys.stdin.readline() 
except "end of file": 
     return None 
if __name__ == "__main__": 
     main(sys.argv) 

对于减速,我将使用流命令:集料。

问:

  1. 是我的代码吗?我在Amazon EMR中运行它,但是我得到了一个空输出。

  2. 所以我最终的结果应该是:表达,XXX和非表达,YYY。在返回结果之前,我可以让它做分割操作吗?只是XXX/YYY的结果。我应该在哪里放这个代码?减速器??

  3. 此外,这是一个巨大的CSV文件,所以将映射分解成几个分区?或者我需要显式调用FileSplit?如果是这样,我该怎么做?

+0

为什么不使用python内置的csv解析器? – 2012-02-14 23:18:15

回答

3

在这里回答我自己的问题!

  1. The code is wrong。如果您使用聚合库进行缩减,则您的输出不会遵循常用的键值对。它需要一个“前缀”。

    if int(list[0][11:13])>=17 and int(list[0][11:13])<=19: 
        #This is the correct way of printing for aggregate library 
        #Print all as a string. 
        print "LongValueSum:" + "Express" + "\t" + list[3] 
    

    其他的 “前缀” 可以是:DoubleValueSum,LongValueMax,LongValueMin,StringValueMax,StringValueMin,UniqValueCount,ValueHistogram。欲了解更多信息,请看这里http://hadoop.apache.org/common/docs/r0.15.2/api/org/apache/hadoop/mapred/lib/aggregate/package-summary.html

  2. 是的,如果您想要做的不仅仅是基本总和,最小值,最大值或者计数,您需要编写自己的reducer。

  3. 我还没有答案。

+0

感谢您使用前缀“LongValueSum:”指针。 – Suman 2012-07-25 16:52:29

+0

@Deyang嗨,我是新来的hadoop -python。我也有类似的工作要做,但我在hadoop目录中有多个csv文件,我已经编写了在本地机器上正常运行的脚本。当我在群集上运行它时,它会给出一个错误,因为“Streaming Command Failed”。你能建议如何从hdfs目录中读取所有的csv文件。 – MegaBytes 2015-04-22 06:17:36