3
我有一个非常大的CSV文件。超过7500万行。将Large .csv处理成Redis
我必须每隔一小时将这个.csv文件“发布”到一个Redis群集中。 (每小时)
脚本:
import csv
import redis
import random
from redis import StrictRedis
import multiprocessing as mp
import itertools
import time
def worker(chunk):
return len(chunk)
def keyfunc(row):
return row[0]
def main():
client = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0)
client1 = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0)
client2 = redis.StrictRedis(host='XXXXXXXXX.XX.XXX.X.XXX', port=6379, db=0)
list1 =(client, client1, client2)
pool = mp.Pool()
largefile = 'Example.csv'
num_chunks = 10
results = []
with open(largefile) as f:
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
key1 = 'AAM_CDF_Traits'
doc = chunk
random.choice(list1).publish(key1, pool)
pool.close()
pool.join()
print(results)
if __name__ == '__main__':
main()
问题:
这是一个需要解决这个问题的正确方法?我还可以通过其他方式解决这个问题。
为什么我有这个错误?
回溯(最近通话最后一个):
文件 “./AAM_Redis4.sh”,第47行,在
main()
文件 “./AAM_Redis4.sh”,33行,在主
itertools.islice(chunks, num_chunks)]
类型错误:“元组”对象不callabl
我可以诚实地说,这种方法把我的处理时间从1-2小时缩短到5-7分钟。对记忆影响最小。 – user2748540