2017-07-06 84 views
1

我有产生从一个巨大的CSV文件行功能懒洋洋地蟒蛇多线程巨大的CSV文件

def get_next_line(): 
    with open(sample_csv,'r') as f: 
     for line in f: 
      yield line 

def do_long_operation(row): 
    print('Do some operation that takes a long time') 

我需要使用螺纹,使得每个记录我从上面的功能得到我可以叫 do_long_operation

互联网上大部分地方有这样的例子,我不是很肯定,如果我在正确的道路上

import threading 
thread_list = [] 
for i in range(8): 
    t = threading.Thread(target=do_long_operation, args=(get_next_row from get_next_line)) 
    thread_list.append(t) 

for thread in thread_list: 
    thread.start() 

for thread in thread_list: 
    thread.join() 

我的问题是

a)我该如何开始只说有限数量的线程说8?

b)如何确保每个线程从get_next_line获得一行 ?

回答

2

你可以使用线程池的多处理和映射任务以工人池:

from multiprocessing.pool import ThreadPool as Pool 
# from multiprocessing import Pool 
from random import randint 
from time import sleep 


def process_line(l): 
    print l, "started" 
    sleep(randint(0,3)) 
    print l, "done" 


def get_next_line(): 
    with open("sample.csv",'r') as f: 
     for line in f: 
      yield line 

f = get_next_line() 

t = Pool(processes=8) 

for i in f: 
    t.map(process_line, (i,)) 

t.join() 
t.close() 

这将创建8名工人,并通过一个提交您的线给他们,一个。一旦进程“免费”,它将被分配一个新的任务。

也有注释掉的导入语句。如果您注释掉ThreadPool并从多处理中导入Pool,您将获得子进程而不是线程,这对您而言可能更有效。

哈努哈利

+0

哇典雅做:) – user3249433

+0

我会用池从多处理来代替。试一试,看看哪一个工作更快,但我确信它将是多处理解决方案,但是如果您需要共享大量内部变量或类似的东西,无论如何这是不好的编程,可能会阻止您使用多处理池。两个池的功能接口完全相同。 – Hannu