2014-04-29 22 views
3

我有一个非常大的CSV文件。超过7500万行。将Large .csv处理成Redis

我必须每隔一小时将这个.csv文件“发布”到一个Redis群集中。 (每小时)

脚本:

import csv 
import redis 
import random 
from redis import StrictRedis 
import multiprocessing as mp 
import itertools 
import time 

def worker(chunk): 
    return len(chunk) 

def keyfunc(row): 
    return row[0] 

def main(): 
    client = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0) 
    client1 = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0) 
    client2 = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0) 
    list1 =(client, client1, client2) 
    pool = mp.Pool() 
    largefile = 'Example.csv' 
    num_chunks = 10 
    results = [] 
    with open(largefile) as f: 
     reader = csv.reader(f) 
     chunks = itertools.groupby(reader, keyfunc) 
     while True: 
      # make a list of num_chunks chunks 
      groups = [list(chunk) for key, chunk in 
         itertools.islice(chunks, num_chunks)] 
      if groups: 
       result = pool.map(worker, groups) 
       results.extend(result) 
      else: 
       break 

    key1 = 'AAM_CDF_Traits' 
    doc = chunk 
    random.choice(list1).publish(key1, pool) 
    pool.close() 
    pool.join() 
    print(results) 

if __name__ == '__main__': 

    main() 

问题:

这是一个需要解决这个问题的正确方法?我还可以通过其他方式解决这个问题。

为什么我有这个错误?

回溯(最近通话最后一个):

文件 “./AAM_Redis4.sh”,第47行,在

main() 

文件 “./AAM_Redis4.sh”,33行,在主

itertools.islice(chunks, num_chunks)] 

类型错误:“元组”对象不callabl

+0

我可以诚实地说,这种方法把我的处理时间从1-2小时缩短到5-7分钟。对记忆影响最小。 – user2748540

回答

0

,因为你使用的内置我你可能会得到一个错误n功能列表作为变量名:

list =(client, client1, client2) 

后来,当你调用函数列表,它被称为是一个变量,这可能会导致一个问题:

groups = [list(chunk) for key, chunk in 
+0

我完全错过了。谢谢。 – user2748540