2014-10-28 65 views
0

PyMongo supports发电机批处理与sDB.insert(iter_something(converted))。批量写入操作功能,可批量执行写入操作,以减少网络往返次数并提高写入吞吐量。PyMongo的批量写入操作功能与多处理和发电机

下面的代码看起来可行,但我不知道PyMongo是否仍然能够将生成器和多处理器一起迭代,直到它产生了1000个文档或16MB数据,然后在批处理插入到MongoDB时暂停生成器。

#!/usr/bin/env python 
from __future__ import absolute_import, division, print_function 
from itertools import groupby 
from pymongo import MongoClient 
from multiprocessing import Process, JoinableQueue 
import csv 

# > use test 
# switched to db test 
# > db.createCollection("abc") 
# { "ok" : 1 } 
# > db.abc.find() 


parts = [["Test", "A", "B01", 828288, 1, 7, 'C', 5], 
    ["Test", "A", "B01", 828288, 1, 7, 'T', 6], 
    ["Test", "A", "B01", 171878, 3, 7, 'C', 5], 
    ["Test", "A", "B01", 171878, 3, 7, 'T', 6], 
    ["Test", "A", "B01", 871963, 3, 9, 'A', 5], 
    ["Test", "A", "B01", 871963, 3, 9, 'G', 6], 
    ["Test", "A", "B01", 1932523, 1, 10, 'T', 4], 
    ["Test", "A", "B01", 1932523, 1, 10, 'A', 5], 
    ["Test", "A", "B01", 1932523, 1, 10, 'X', 6], 
    ["Test", "A", "B01", 667214, 1, 14, 'T', 4], 
    ["Test", "A", "B01", 667214, 1, 14, 'G', 5], 
    ["Test", "A", "B01", 667214, 1, 14, 'G', 6]] 


def iter_something(rows): 
    key_names = ['type', 'name', 'sub_name', 'pos', 's_type', 'x_type'] 
    chr_key_names = ['letter', 'no'] 
    for keys, group in groupby(rows, lambda row: row[:6]): 
     result = dict(zip(key_names, keys)) 
     result['chr'] = [dict(zip(chr_key_names, row[6:])) for row in group] 
     yield result 

class Loading(Process): 

    def __init__(self, task_queue): 
     Process.__init__(self) 
     self.task_queue = task_queue 
     db = MongoClient().test 
     self.sDB = db["abc"] 

    def run(self): 
     while True: 
      doc = self.task_queue.get() 
      if doc is None: # None means shutdown 
       self.task_queue.task_done() 
       break 
      else: 
       self.sDB.insert(doc) 

def main(): 
    num_cores = 2 

    tasks = JoinableQueue() 

    threads = [Loading(tasks) for i in range(num_cores)] 

    for i, w in enumerate(threads): 
     w.start() 
     print('Thread ' + str(i+1) + ' has started!') 

    converters = [str, str, str, int, int, int, str, int] 
    with open("/home/mic/tmp/test.txt") as f: 
     reader = csv.reader(f, skipinitialspace=True) 
     converted = ([conv(col) for conv, col in zip(converters, row)] for row in reader) 
     # sDB.insert(iter_something(converted)) 

     # Enqueue jobs 
     for i in iter_something(converted): 
      tasks.put(i) 

    # Add None to kill each thread 
    for i in range(num_cores): 
     tasks.put(None) 

    # Wait for all of the tasks to finish 
    tasks.join() 


if __name__ == '__main__': 
    main() 
+0

在每个线程中'db = MongoClient()。test'和'self.sDB = db [“abc”]'是否每次覆盖数据库? – user977828 2014-10-28 07:17:08

回答

0

在这种情况下,你是不是服用批量插入的优势。每次调用“self.sDB.insert(doc)”时,立即将文档发送到MongoDB并等待服务器的回复。你可以试试这个:

def run(self): 
    def gen(): 
     while True: 
      doc = self.task_queue.get() 
      if doc is None: # None means shutdown 
       self.task_queue.task_done() 
       break 

      else: 
       yield doc 

    try: 
     self.sDB.insert(gen()) 
    except InvalidOperation as e: 
     # Perhaps "Empty bulk write", this process received no documents. 
     print(e) 

使用mongosniff来验证您要发送大批量的服务器,而不是在同一时间将一个文档。根据文档数量和进程数量,某些进程可能不会获得任何文档。如果你试图从一个空的迭代器插入,PyMongo抛出InvalidOperation,所以我用“try/except”插入。

顺便说一下,您不需要使用MongoDB调用createCollection:第一次插入到集合中会自动创建它。 createCollection仅在需要特殊选项时才是必需的,例如封顶集合。

+0

谢谢,但现在我得到了'InvalidOperation:不能做空批量写入'。以某种方式将None置于任务队列不起作用。 – user977828 2014-10-29 00:06:49

+0

我将编辑我的答案以处理InvalidOperation异常。 – 2014-10-29 13:23:21