2017-03-08 53 views
0

下载当我尝试下载从FTP文件顺序它完美并行文件从FTP

import ftplib 
import os 
import logging 

class pyFTPDownload(object): 
    def __init__(self, 
       remote_host=None, 
       port=None, 
       username=None, 
       passwd=None, 
       input_folder=None, 
       output_folder=None, 
       ftp_conn_id=None, 
       timeout=10 
       ): 
     super(pyFTPDownload, self).__init__() 
     self.remote_host = remote_host 
     self.port = port 
     self.username = username 
     self.passwd = passwd 
     self.input_folder = input_folder 
     self.output_folder = output_folder 
     self.ftp_conn_id = ftp_conn_id 
     self.timeout = timeout 
     self.client = None 

    def get_conn(self): 
     if not self.client: 
      logging.info('creating ftp client for conn_id: {0}'.format(self.ftp_conn_id)) 

      if not self.username: 
       raise Exception("Missing required param: username") 
      if not self.passwd: 
       raise Exception("Missing required param: passwd") 
      if not self.remote_host: 
       raise Exception("Missing required param: remote_host") 
      if not self.ftp_conn_id: 
       self.ftp_conn_id = str(self.username) + '@' + str(self.remote_host) + ":" + (
        str(self.port) if self.port else "") 

      try: 
       client = ftplib.FTP() 
       client.connect(self.remote_host, (self.port if not self.port else None)) 
       client.login(self.username, self.passwd) 
       self.client = client 
      except ftplib.all_errors as remote_host_error: 
       logging.error("Auth failed while connecting to host: {0}, error: {1}" 
           .format(self.remote_host, remote_host_error)) 
      except Exception as error: 
       logging.error("Error connecting to host: {0}, error: {1}" 
           .format(self.remote_host, error)) 
     return self.client 


    def get_file(self, input_folder, output_folder=None,file_ext=None,thread_nbr=1): 

     #os.chdir(output_folder) 
     ftp = self.get_conn() 
     #print ftp.dir(input_folder) 
     logging.debug(input_folder) 
     if not os.path.exists(input_folder): 
      try: 
       os.makedirs(output_folder, 0755) 
       logging.debug("{0} is created".format(output_folder)) 
      except OSError, e: 
       logging.error("ERROR: {0}".format(e)) 

     logging.info(output_folder) 
     ftp.cwd(input_folder) 
     for filename in ftp.nlst(file_ext): # Loop - looking for matching files 
      try: 
       logging.debug("filename {0}".format(filename)) 
       local_file=os.path.join(output_folder,filename) 
       logging.debug("local_file {0}".format(local_file)) 
       with open(local_file, 'wb') as fhandle: 
        logging.debug('Getting ' + filename) # for confort sake, shows the file that's being retrieved 
        ftp.retrbinary('RETR ' + filename, fhandle.write) 
        fhandle.close() 
      except Exception as e: 
       logging.error("could not download file:{0}, terminated with error {1}".format(filename,e)) 

但是,当我试图做同样的事情在并行,我得到

[Errno 9] Bad file descriptor 

,或者当我尝试分解以下两行

xftp.connect(self.remote_host,self.port) 
xftp.login(self.username,self.passwd) 

我得到这个错误:一个浮点数是必需的。但是,如果没有一个堆栈跟踪,这样我可以调试它

我完整的代码如下:

# coding=utf-8 
from itertools import izip, repeat 
import ftplib 
import os 
import multiprocessing 
from pathos.multiprocessing import ProcessingPool as Pool 
import logging 
import traceback 


class pyFTPDownload(object): 
    def __init__(self, 
       remote_host, 
       port, 
       username, 
       passwd, 
       ftp_conn_id=None 
       ): 
     super(pyFTPDownload, self).__init__() 
     self.remote_host = remote_host 
     self.port = port 
     self.username = username 
     self.passwd = passwd 
     self.ftp_conn_id = ftp_conn_id 
     self.client = None 
     if not self.client: 
      logging.info('creating ftp client for conn_id: {0}'.format(self.ftp_conn_id)) 

      if not self.username: 
       raise Exception("Missing required param: username") 
      if not self.passwd: 
       raise Exception("Missing required param: passwd") 
      if not self.remote_host: 
       raise Exception("Missing required param: remote_host") 
      if not self.ftp_conn_id: 
       self.ftp_conn_id = str(self.username) + '@' + str(self.remote_host) + ":" + (
        str(self.port) if self.port else "") 

      try: 
       client = ftplib.FTP() 
       client.connect(self.remote_host, (self.port if self.port else None)) 
       client.login(self.username, self.passwd) 
       self.client = client 
      except ftplib.all_errors as remote_host_error: 
       logging.error("Auth failed while connecting to host: {0}, error: {1}" 
           .format(self.remote_host, remote_host_error)) 
      except Exception as error: 
       logging.error("Error connecting to host: {0}, error: {1}" 
           .format(self.remote_host, error)) 


    def get_conn(self): 
     return self.client 


class loadData(pyFTPDownload): 
    def __init__(self, 
       remote_host, 
       port, 
       username, 
       passwd, 
       input_folder, 
       output_folder, 
       file_ext=None, 
       nbr_processes=None, 
       ftp_conn_id = None): 
     super(loadData, self).__init__(remote_host,port,username,passwd) 
     self.input_folder=input_folder 
     self.output_folder=output_folder 
     self.file_ext=file_ext 
     self.nbr_processes=nbr_processes 


     if not input_folder: 
      raise Exception("Missing required params: input_folder") 
     if not output_folder: 
      raise Exception("Missing required params: output_folder") 
     if not file_ext: 
      logging.warn("All the existing files in {0} will be considered".format(input_folder)) 
     if not nbr_processes: 
      logging.warn("The number of processes to be started will be set to {0}".format(Pool.ncpus)) 
      self.nbr_processes=multiprocessing.cpu_count() 


    def downloadfunc(self,a): 
     return self.downloadf(*a) 

    def downloadf(self, inputf, filename, outputf): 

     global xftp 
     global local_file 
     global fhandle 
     print filename 
     try: 
      xftp = self.get_conn() 
      xftp.connect(self.remote_host,self.port) 
      xftp.login(self.username,self.passwd) 
      print xftp 
     except ftplib.all_errors as remote_host_error: 
      logging.error("Auth failed while connecting to host: {0}, error: {1}" 
          .format(self.remote_host, remote_host_error)) 
     except Exception as error: 
      logging.error("Error connecting to host: {0}, error: {1}" 
          .format(self.remote_host, error)) 

     try: 
      logging.debug("filename {0}".format(filename)) 
      local_file = os.path.join(outputf, filename) 
      logging.debug("local_file {0}".format(local_file)) 
     except Exception as sd: 
      logging.error("Unkmown error: {}".format(sd)) 
     xftp.cwd(inputf) 
     try: 
      with open(local_file, 'wb') as fhandle: 
       logging.debug('Getting ' + filename) # for confort sake, shows the file that's being retrieved 
       xftp.retrbinary('RETR ' + filename, fhandle.write) 
       fhandle.close() 
     except Exception as k: 
      logging.error("Could not download {0} : {1}".format(local_file,k)) 
     finally: 
      xftp.quit() 
     print traceback 


    def get_file(self): 
     print "PREPARING FILE DOWNLOAD" 
     print self.output_folder 
     if not os.path.exists(self.output_folder): 
      try: 
       logging.debug("{} does not exists".format(self.output_folder)) 
       os.makedirs(self.output_folder,0755) 
       logging.debug("{0} is created".format(self.output_folder)) 
      except OSError, e: 
       logging.error("ERROR:{0} could not be created {1}, {2}".format(self.output_folder,e,OSError)) 
      except Exception as d: 
       logging.error(d) 
     ftpObj=self.get_conn() 
     ftpObj.cwd(self.input_folder) 
     files_to_dl=ftpObj.nlst(self.file_ext) 
     p = Pool(self.nbr_processes) 
     try: 
      p.map(self.downloadfunc, izip(repeat(self.input_folder),files_to_dl,repeat(self.output_folder))) 
      p.close() 
      p.join() 

     except Exception as f: 
      logging.error(f) 

我没有很多与蟒蛇的经验,所以这将是非常好的,你要检查我的代码。 我也有一些问题在这种情况下实现多处理的最佳方式是什么?

+0

我不得不问,你为什么这样做? FTP下载的限制几乎总是客户端和服务器之间的连接速度。如果您是“多进程”,那么您可能只会增加上下文切换开销并可能导致TCP吞吐量最大化。为什么您希望多路复用产生比连续传输更高的吞吐量? –

+0

@ JamesK.Lowden我想每天下载超过250.000个文件。你是否认为这不是多进程下载的用例? – sdikby

+0

它可能是25万,也可能是25。一旦管道满了,它已满。你为什么期望复用产生更高的吞吐量?您是否测量了吞吐量并将其与广告带宽进行了比较?您的连接在饱和之前支持多少次同时下载? –

回答

0

我发现我的代码存在问题。 在downloadf功能,正是在这部分代码

try: 
     xftp = self.get_conn() 
     xftp.connect(self.remote_host,self.port) 
     xftp.login(self.username,self.passwd) 
     print xftp 
    except ftplib.all_errors as remote_host_error: 
     logging.error("Auth failed while connecting to host: {0}, error: {1}" 
         .format(self.remote_host, remote_host_error)) 
    except Exception as error: 
     logging.error("Error connecting to host: {0}, error: {1}" 
         .format(self.remote_host, error)) 

的问题是,我用同样的FTP连接比如通过,xftp = self.get_conn()这就是为什么我得到的似乎并没有描述错误源代码问题,我介绍如何修改我的代码来解决这个问题。 解决的办法是实例化的每个进程一个新的FTP连接,所以不是上面的代码行我现在做folloying:xftp = ftplib.FTP()

也许有一种灵魂,这会带来更多的“Python化”解释这问题。我会格雷夫