2012-07-11 116 views
1

我正在使用python boto和线程从S3快速下载许多文件。我在我的程序中使用了这几次,它很好用。但是,有一次它不起作用。在这一步中,我尝试在32核心机器上下载3,000个文件(Amazon EC2 cc2.8xlarge)。多线程S3下载不会终止

下面的代码实际上可以成功地下载每个文件(除了有时会出现一个httplib.IncompleteRead错误,它不会被重试所修复)。但是,32个线程中只有10个实际终止,程序只是挂起。不知道这是为什么。所有的文件已经下载完毕,所有的线程都退出了。当我下载较少的文件时,他们会采取其他步骤。我已经减少到使用单个线程下载所有这些文件(它工作但超级慢)。任何见解将不胜感激!

from boto.ec2.connection import EC2Connection 
from boto.s3.connection import S3Connection 
from boto.s3.key import Key 

from boto.exception import BotoClientError 
from socket import error as socket_error 
from httplib import IncompleteRead 

import multiprocessing 
from time import sleep 
import os 

import Queue 
import threading 

def download_to_dir(keys, dir): 
    """ 
    Given a list of S3 keys and a local directory filepath, 
    downloads the files corresponding to the keys to the local directory. 
    Returns a list of filenames. 
    """ 
    filenames = [None for k in keys] 

    class DownloadThread(threading.Thread): 

     def __init__(self, queue, dir): 
      # call to the parent constructor 
      threading.Thread.__init__(self) 
      # create a connection to S3 
      connection = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) 
      self.conn = connection 
      self.dir = dir 
      self.__queue = queue 

     def run(self): 
      while True: 
       key_dict = self.__queue.get() 
       print self, key_dict 
       if key_dict is None: 
        print "DOWNLOAD THREAD FINISHED" 
        break 
       elif key_dict == 'DONE': #last job for last worker 
        print "DOWNLOADING DONE" 
        break 
       else: #still work to do! 
        index = key_dict.get('idx') 
        key = key_dict.get('key') 
        bucket_name = key.bucket.name 
        bucket = self.conn.get_bucket(bucket_name) 
        k = Key(bucket) #clone key to use new connection 
        k.key = key.key 

        filename = os.path.join(dir, k.key) 
        #make dirs if don't exist yet 
        try: 
         f_dirname = os.path.dirname(filename) 
         if not os.path.exists(f_dirname): 
          os.makedirs(f_dirname) 
        except OSError: #already written to 
         pass 

        #inspired by: http://code.google.com/p/s3funnel/source/browse/trunk/scripts/s3funnel?r=10 
        RETRIES = 5 #attempt at most 5 times 
        wait = 1 
        for i in xrange(RETRIES): 
         try: 
          k.get_contents_to_filename(filename) 
          break 
         except (IncompleteRead, socket_error, BotoClientError), e: 
          if i == RETRIES-1: #failed final attempt 
           raise Exception('FAILED TO DOWNLOAD %s, %s' % (k, e)) 
           break 
          wait *= 2 
          sleep(wait) 

        #put filename in right spot! 
        filenames[index] = filename 

    num_cores = multiprocessing.cpu_count() 

    q = Queue.Queue(0) 

    for i, k in enumerate(keys): 
     q.put({'idx': i, 'key':k}) 
    for i in range(num_cores-1): 
     q.put(None) # add end-of-queue markers 
    q.put('DONE') #to signal absolute end of job 

    #Spin up all the workers 
    workers = [DownloadThread(q, dir) for i in range(num_cores)] 
    for worker in workers: 
     worker.start() 

    #Block main thread until completion 
    for worker in workers: 
     worker.join() 

    return filenames 

回答

4

升级到AWS SDK版本1.4.4.0或更高版本,或者坚持到2个线程。较旧的版本具有最多2个同时连接的limit。这意味着如果你启动2个线程,你的代码就能正常工作;如果您启动3个或更多,您肯定会看到不完整的读取和用尽超时。

你会看到,虽然2线程可以大大提高你的吞吐量,但2以上并没有太大的改变,因为你的网卡总是无时无刻不忙。

+0

谢谢@Jirka Hanika - 改为两个线程似乎解决了这个问题。虽然我认为亚马逊机器如此狂野,实际上拥有大量的下载线程确实使效率更高。我试图找到AWS SDK> 1.4.4,但亚马逊上的最新下载是1.3.13 ... – Max 2012-07-12 13:59:03

+0

@Max - 对于大多数语言,1.4会在[这里]找到(https://github.com/amazonwebservices/ ),并希望Python会出现在[有一天](http://aws.typepad.com/aws/2012/01/big-news-regarding-python-boto-andawaws.html)。在此之前,你可能会走运,我不确定。 – 2012-07-12 15:02:38

0

S3Connection使用httplib.py,并且该库不是线程安全的,因此确保每个线程都拥有自己的连接至关重要。看起来你正在这样做。

Boto已经拥有了自己的重试机制,但是您在其上层叠了一个来处理某些其他错误。我想知道是否建议在except块中创建一个新的S3Connection对象。它似乎就像底层的http连接在那个时候处于一种不寻常的状态,最好从一个新的连接开始。

只是一个想法。