2008-11-08 65 views
8

当使用shutil.copy()复制大文件,你没有操作如何进展的迹象..如何使用进度条在Python中复制文件?

我已经把一些作品 - 它使用一个简单的进度条类(简单的返回一个简单的ASCII进度条,如一个字符串)以及一个open().read().write()的循环进行实际的复制。它使用sys.stdout.write("\r%s\r" % (the_progress_bar))来显示进度条,这有点冒险,但它有效。

你可以看到(在上下文中)on github here

是否有任何内置模块,将做到这更好的代码?这个代码有什么改进吗?

+0

它复制电视节目文件到特定路径([fixed path]/$ showname/$ season_number] /),文件a回调一般为300-700MB的尺寸 – dbr 2008-11-11 01:59:38

回答

14

两件事情:

  • 我会让默认块大小比512。很多大我将开始与16384,也许更。
  • 对于模块化,它可能是更好的有copy_with_prog功能不输出进度条本身,而是调用回调函数,所以调用者可以决定如何显示进度。

也许是这样的:

def copy_with_prog(src, dest, callback = None): 
    while True: 
     # copy loop stuff 
     if callback: 
      callback(pos, total) 

prog = ProgressBar(...) 
copy_with_prog(src, dest, lambda pos, total: prog.update(pos, total)) 
+0

+1。 – 2008-11-08 11:31:49

+2

+1,但它不anwser问题... – 2008-11-09 08:31:20

5

矫枉过正?也许。但在几乎任何系统上,Linux,Mac和Windows上快速安装wxWidgets,都可以实现真正的交易,在gui设置中可以暂停和取消按钮。目前Macs带有wxWidgets,它是Linux上的常用软件包。

单个文件是非常快的(它会立即结束,并期待坏了),所以你可以考虑创建蜱每块沿每一次文件,而不是一次一个文件集的工作。请享用!

-Jim卡罗尔

""" 
Threaded Jobs. 

Any class that does a long running process can inherit 
from ThreadedJob. This enables running as a background 
thread, progress notification, pause and cancel. The 
time remaining is also calculated by the ThreadedJob class. 
""" 
import wx.lib.newevent 
import thread 
import exceptions 
import time 

(RunEvent, EVT_RUN) = wx.lib.newevent.NewEvent() 
(CancelEvent, EVT_CANCEL) = wx.lib.newevent.NewEvent() 
(DoneEvent, EVT_DONE) = wx.lib.newevent.NewEvent() 
(ProgressStartEvent, EVT_PROGRESS_START) = wx.lib.newevent.NewEvent() 
(ProgressEvent, EVT_PROGRESS) = wx.lib.newevent.NewEvent() 

class InterruptedException(exceptions.Exception): 
    def __init__(self, args = None): 
     self.args = args 
    # 
# 

class ThreadedJob: 
    def __init__(self): 
     # tell them ten seconds at first 
     self.secondsRemaining = 10.0 
     self.lastTick = 0 

     # not running yet 
     self.isPaused = False 
     self.isRunning = False 
     self.keepGoing = True 

    def Start(self): 
     self.keepGoing = self.isRunning = True 
     thread.start_new_thread(self.Run,()) 

     self.isPaused = False 
    # 

    def Stop(self): 
     self.keepGoing = False 
    # 

    def WaitUntilStopped(self): 
     while self.isRunning: 
      time.sleep(0.1) 
      wx.SafeYield() 
     # 
    # 

    def IsRunning(self): 
     return self.isRunning 
    # 

    def Run(self): 
     # this is overridden by the 
     # concrete ThreadedJob 
     print "Run was not overloaded" 
     self.JobFinished() 

     pass 
    # 

    def Pause(self): 
     self.isPaused = True 
     pass 
    # 

    def Continue(self): 
     self.isPaused = False 
     pass 
    # 

    def PossibleStoppingPoint(self): 
     if not self.keepGoing: 
      raise InterruptedException("process interrupted.") 
     wx.SafeYield() 

     # allow cancel while paused 
     while self.isPaused: 
      if not self.keepGoing: 
       raise InterruptedException("process interrupted.") 

      # don't hog the CPU 
      time.sleep(0.1) 
     # 
    # 

    def SetProgressMessageWindow(self, win): 
     self.win = win 
    # 

    def JobBeginning(self, totalTicks): 

     self.lastIterationTime = time.time() 
     self.totalTicks = totalTicks 

     if hasattr(self, "win") and self.win: 
      wx.PostEvent(self.win, ProgressStartEvent(total=totalTicks)) 
     # 
    # 

    def JobProgress(self, currentTick): 
     dt = time.time() - self.lastIterationTime 
     self.lastIterationTime = time.time() 
     dtick = currentTick - self.lastTick 
     self.lastTick = currentTick 

     alpha = 0.92 
     if currentTick > 1: 
      self.secondsPerTick = dt * (1.0 - alpha) + (self.secondsPerTick * alpha) 
     else: 
      self.secondsPerTick = dt 
     # 

     if dtick > 0: 
      self.secondsPerTick /= dtick 

     self.secondsRemaining = self.secondsPerTick * (self.totalTicks - 1 - currentTick) + 1 

     if hasattr(self, "win") and self.win: 
      wx.PostEvent(self.win, ProgressEvent(count=currentTick)) 
     # 
    # 

    def SecondsRemaining(self): 
     return self.secondsRemaining 
    # 

    def TimeRemaining(self): 

     if 1: #self.secondsRemaining > 3: 
      minutes = self.secondsRemaining // 60 
      seconds = int(self.secondsRemaining % 60.0) 
      return "%i:%02i" % (minutes, seconds) 
     else: 
      return "a few" 
    # 

    def JobFinished(self): 
     if hasattr(self, "win") and self.win: 
      wx.PostEvent(self.win, DoneEvent()) 
     # 

     # flag we're done before we post the all done message 
     self.isRunning = False 
    # 
# 

class EggTimerJob(ThreadedJob): 
    """ A sample Job that demonstrates the mechanisms and features of the Threaded Job""" 
    def __init__(self, duration): 
     self.duration = duration 
     ThreadedJob.__init__(self) 
    # 

    def Run(self): 
     """ This can either be run directly for synchronous use of the job, 
     or started as a thread when ThreadedJob.Start() is called. 

     It is responsible for calling JobBeginning, JobProgress, and JobFinished. 
     And as often as possible, calling PossibleStoppingPoint() which will 
     sleep if the user pauses, and raise an exception if the user cancels. 
     """ 
     self.time0 = time.clock() 
     self.JobBeginning(self.duration) 

     try: 
      for count in range(0, self.duration): 
       time.sleep(1.0) 
       self.JobProgress(count) 
       self.PossibleStoppingPoint() 
      # 
     except InterruptedException: 
      # clean up if user stops the Job early 
      print "canceled prematurely!" 
     # 

     # always signal the end of the job 
     self.JobFinished() 
     # 
    # 

    def __str__(self): 
     """ The job progress dialog expects the job to describe its current state.""" 
     response = [] 
     if self.isPaused: 
      response.append("Paused Counting") 
     elif not self.isRunning: 
      response.append("Will Count the seconds") 
     else: 
      response.append("Counting") 
     # 
     return " ".join(response) 
    # 
# 

class FileCopyJob(ThreadedJob): 
    """ A common file copy Job. """ 

    def __init__(self, orig_filename, copy_filename, block_size=32*1024): 

     self.src = orig_filename 
     self.dest = copy_filename 
     self.block_size = block_size 
     ThreadedJob.__init__(self) 
    # 

    def Run(self): 
     """ This can either be run directly for synchronous use of the job, 
     or started as a thread when ThreadedJob.Start() is called. 

     It is responsible for calling JobBeginning, JobProgress, and JobFinished. 
     And as often as possible, calling PossibleStoppingPoint() which will 
     sleep if the user pauses, and raise an exception if the user cancels. 
     """ 
     self.time0 = time.clock() 

     try: 
      source = open(self.src, 'rb') 

      # how many blocks? 
      import os 
      (st_mode, st_ino, st_dev, st_nlink, st_uid, st_gid, st_size, st_atime, st_mtime, st_ctime) = os.stat(self.src) 
      num_blocks = st_size/self.block_size 
      current_block = 0 

      self.JobBeginning(num_blocks) 

      dest = open(self.dest, 'wb') 

      while 1: 
       copy_buffer = source.read(self.block_size) 
       if copy_buffer: 
        dest.write(copy_buffer) 
        current_block += 1 
        self.JobProgress(current_block) 
        self.PossibleStoppingPoint() 
       else: 
        break 

      source.close() 
      dest.close() 

     except InterruptedException: 
      # clean up if user stops the Job early 
      dest.close() 
      # unlink/delete the file that is partially copied 
      os.unlink(self.dest) 
      print "canceled, dest deleted!" 
     # 

     # always signal the end of the job 
     self.JobFinished() 
     # 
    # 

    def __str__(self): 
     """ The job progress dialog expects the job to describe its current state.""" 
     response = [] 
     if self.isPaused: 
      response.append("Paused Copy") 
     elif not self.isRunning: 
      response.append("Will Copy a file") 
     else: 
      response.append("Copying") 
     # 
     return " ".join(response) 
    # 
# 

class JobProgress(wx.Dialog): 
    """ This dialog shows the progress of any ThreadedJob. 

    It can be shown Modally if the main application needs to suspend 
    operation, or it can be shown Modelessly for background progress 
    reporting. 

    app = wx.PySimpleApp() 
    job = EggTimerJob(duration = 10) 
    dlg = JobProgress(None, job) 
    job.SetProgressMessageWindow(dlg) 
    job.Start() 
    dlg.ShowModal() 


    """ 
    def __init__(self, parent, job): 
     self.job = job 

     wx.Dialog.__init__(self, parent, -1, "Progress", size=(350,200)) 

     # vertical box sizer 
     sizeAll = wx.BoxSizer(wx.VERTICAL) 

     # Job status text 
     self.JobStatusText = wx.StaticText(self, -1, "Starting...") 
     sizeAll.Add(self.JobStatusText, 0, wx.EXPAND|wx.ALL, 8) 

     # wxGague 
     self.ProgressBar = wx.Gauge(self, -1, 10, wx.DefaultPosition, (250, 15)) 
     sizeAll.Add(self.ProgressBar, 0, wx.EXPAND|wx.ALL, 8) 

     # horiz box sizer, and spacer to right-justify 
     sizeRemaining = wx.BoxSizer(wx.HORIZONTAL) 
     sizeRemaining.Add((2,2), 1, wx.EXPAND) 

     # time remaining read-only edit 
     # putting wide default text gets a reasonable initial layout. 
     self.remainingText = wx.StaticText(self, -1, "???:??") 
     sizeRemaining.Add(self.remainingText, 0, wx.LEFT|wx.RIGHT|wx.ALIGN_CENTER_VERTICAL, 8) 

     # static text: remaining 
     self.remainingLabel = wx.StaticText(self, -1, "remaining") 
     sizeRemaining.Add(self.remainingLabel, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 8) 

     # add that row to the mix 
     sizeAll.Add(sizeRemaining, 1, wx.EXPAND) 

     # horiz box sizer & spacer 
     sizeButtons = wx.BoxSizer(wx.HORIZONTAL) 
     sizeButtons.Add((2,2), 1, wx.EXPAND|wx.ADJUST_MINSIZE) 

     # Pause Button 
     self.PauseButton = wx.Button(self, -1, "Pause") 
     sizeButtons.Add(self.PauseButton, 0, wx.ALL, 4) 
     self.Bind(wx.EVT_BUTTON, self.OnPauseButton, self.PauseButton) 

     # Cancel button 
     self.CancelButton = wx.Button(self, wx.ID_CANCEL, "Cancel") 
     sizeButtons.Add(self.CancelButton, 0, wx.ALL, 4) 
     self.Bind(wx.EVT_BUTTON, self.OnCancel, self.CancelButton) 

     # Add all the buttons on the bottom row to the dialog 
     sizeAll.Add(sizeButtons, 0, wx.EXPAND|wx.ALL, 4) 

     self.SetSizer(sizeAll) 
     #sizeAll.Fit(self) 
     sizeAll.SetSizeHints(self) 

     # jobs tell us how they are doing 
     self.Bind(EVT_PROGRESS_START, self.OnProgressStart) 
     self.Bind(EVT_PROGRESS, self.OnProgress) 
     self.Bind(EVT_DONE, self.OnDone) 

     self.Layout() 
    # 

    def OnPauseButton(self, event): 
     if self.job.isPaused: 
      self.job.Continue() 
      self.PauseButton.SetLabel("Pause") 
      self.Layout() 
     else: 
      self.job.Pause() 
      self.PauseButton.SetLabel("Resume") 
      self.Layout() 
     # 
    # 

    def OnCancel(self, event): 
     self.job.Stop() 
    # 

    def OnProgressStart(self, event): 
     self.ProgressBar.SetRange(event.total) 
     self.statusUpdateTime = time.clock() 
    # 

    def OnProgress(self, event): 
     # update the progress bar 
     self.ProgressBar.SetValue(event.count) 

     self.remainingText.SetLabel(self.job.TimeRemaining()) 

     # update the text a max of 20 times a second 
     if time.clock() - self.statusUpdateTime > 0.05: 
      self.JobStatusText.SetLabel(str(self.job)) 
      self.statusUpdateTime = time.clock() 
      self.Layout() 
     # 
    # 

    # when a job is done 
    def OnDone(self, event): 
     self.ProgressBar.SetValue(0) 
     self.JobStatusText.SetLabel("Finished") 
     self.Destroy() 
    # 
# 

if __name__ == "__main__": 
    app = wx.PySimpleApp() 
    #job = EggTimerJob(duration = 10) 
    job = FileCopyJob("VeryBigFile.mp4", "/tmp/test_junk.mp4", 1024*1024*10) 
    dlg = JobProgress(None, job) 
    job.SetProgressMessageWindow(dlg) 
    job.Start() 
    dlg.ShowModal() 
#