2012-08-16 154 views
14

我正在写一个应用程序,将行附加到来自多个线程的同一个文件。蟒蛇 - 附加到同一个文件从多个线程

我有一个问题,其中一些行被添加而没有新行。

任何解决方案?

class PathThread(threading.Thread): 
    def __init__(self, queue): 
     threading.Thread.__init__(self) 
     self.queue = queue 

    def printfiles(self, p): 
     for path, dirs, files in os.walk(p): 
      for f in files: 
       print(f, file=output) 

    def run(self): 
     while True: 
      path = self.queue.get() 
      self.printfiles(path) 
      self.queue.task_done() 


pathqueue = Queue.Queue() 
paths = getThisFromSomeWhere() 

output = codecs.open('file', 'a') 

# spawn threads 
for i in range(0, 5): 
    t = PathThread(pathqueue) 
    t.setDaemon(True) 
    t.start() 

# add paths to queue 
for path in paths: 
    pathqueue.put(path) 

# wait for queue to get empty 
pathqueue.join() 
+3

发布一些代码,这将有助于。 – 2012-08-16 09:10:00

+2

追加一个新行。 – Kuf 2012-08-16 09:11:19

+1

听起来像* impossibru *。 – plaes 2012-08-16 09:11:27

回答

22

解决方法是仅在一个线程中写入文件。

import Queue # or queue in Python 3 
import threading 

class PrintThread(threading.Thread): 
    def __init__(self, queue): 
     threading.Thread.__init__(self) 
     self.queue = queue 

    def printfiles(self, p): 
     for path, dirs, files in os.walk(p): 
      for f in files: 
       print(f, file=output) 

    def run(self): 
     while True: 
      result = self.queue.get() 
      self.printfiles(result) 
      self.queue.task_done() 

class ProcessThread(threading.Thread): 
    def __init__(self, in_queue, out_queue): 
     threading.Thread.__init__(self) 
     self.in_queue = in_queue 
     self.out_queue = out_queue 

    def run(self): 
     while True: 
      path = self.in_queue.get() 
      result = self.process(path) 
      self.out_queue.put(result) 
      self.in_queue.task_done() 

    def process(self, path): 
     # Do the processing job here 

pathqueue = Queue.Queue() 
resultqueue = Queue.Queue() 
paths = getThisFromSomeWhere() 

output = codecs.open('file', 'a') 

# spawn threads to process 
for i in range(0, 5): 
    t = ProcessThread(pathqueue, resultqueue) 
    t.setDaemon(True) 
    t.start() 

# spawn threads to print 
t = PrintThread(resultqueue) 
t.setDaemon(True) 
t.start() 

# add paths to queue 
for path in paths: 
    pathqueue.put(path) 

# wait for queue to get empty 
pathqueue.join() 
resultqueue.join() 
+0

,行 - 结果= self.process(路径) ? 你不配置过程()方法在那里.. – user1251654 2012-08-16 12:26:36

+0

你想定义过程方法来做你想做的。我只是修改代码来澄清这一点。 – Dikei 2012-08-16 12:34:02

+0

对,我的不好。谢谢。这个帮助很大。 – user1251654 2012-08-16 13:03:36

0

也许一些更多的换行符,他们不应该是? 您应该记住共享资源不应该一次被多个线程访问,否则不可预知的后果可能会发生。 (它在使用线程时使用'原子操作') 看一看这个页面有一点直觉。
Thread-Synchronization

1

的事实,你永远不会看到在一条线上的中间同一线路或新线路杂乱的文字,你实际上不需要到syncronize附加到文件的线索。问题是您使用print来写入单个文件句柄。我怀疑print实际上是在一次调用中对文件句柄进行2次操作,并且这些操作正在线程之间竞争。基本上print正在做这样的事情:

file_handle.write('whatever_text_you_pass_it') 
file_handle.write(os.linesep) 

而且由于不同线程对同一文件句柄同时这样做有时一个线程将在第一时间拿到写,然后其他线程将获得在其第一次写,然后你将连续获得两个回车。或者真的是这些的任何排列。

解决此问题的最简单方法是停止使用print并直接使用write。尝试这样的:

output.write(f + os.linesep) 

这对我仍然是危险的。我不知道你可以期待什么gaurantees与所有线程使用相同的文件句柄对象和竞争其内部缓冲区。个人身份证方面的步骤整个问题,并让每个线程获得自己的文件句柄。还要注意,这是可行的,因为写入缓冲区刷新的默认值是行缓冲的,所以当它对文件进行刷新时,它会以os.linesep结束。强制它使用行缓冲发送1作为open的第三个参数。你可以测试出来是这样的:

#!/usr/bin/env python 
import os 
import sys 
import threading 

def hello(file_name, message, count): 
    with open(file_name, 'a', 1) as f: 
    for i in range(0, count): 
     f.write(message + os.linesep) 

if __name__ == '__main__': 
    #start a file 
    with open('some.txt', 'w') as f: 
    f.write('this is the beginning' + os.linesep) 
    #make 10 threads write a million lines to the same file at the same time 
    threads = [] 
    for i in range(0, 10): 
    threads.append(threading.Thread(target=hello, args=('some.txt', 'hey im thread %d' % i, 1000000))) 
    threads[-1].start() 
    for t in threads: 
    t.join() 
    #check what the heck the file had 
    uniq_lines = set() 
    with open('some.txt', 'r') as f: 
    for l in f: 
     uniq_lines.add(l) 
    for u in uniq_lines: 
    sys.stdout.write(u) 

输出看起来是这样的:

hey im thread 6 
hey im thread 7 
hey im thread 9 
hey im thread 8 
hey im thread 3 
this is the beginning 
hey im thread 5 
hey im thread 4 
hey im thread 1 
hey im thread 0 
hey im thread 2