2011-03-29 90 views
5

我们有很大的文件,1-1.5GB的顺序(主要是日志文件)与原始数据很容易解析为csv,随后应该生成图形一组图形图像。从巨大文件中读取和绘制数据

目前,我们使用bash脚本将原始数据转换为csv文件,只需要绘制数字,然后将其提供给gnuplot脚本。但是这个过程非常缓慢。我试图通过用单个awk命令替换一些管道cut s,tr s等来加速bash脚本,虽然这提高了速度,但整个过程仍然非常缓慢。

因此,我开始相信这个过程有更好的工具。我目前正在用python + numpy或R重写这个过程。我的一位朋友建议使用JVM,如果我要这样做,我会使用clojure,但不知道JVM如何执行。

我在处理这类问题方面没有太多经验,所以关于如何进行的任何建议都会很棒。谢谢。另外,我将要存储(到磁盘)生成的中间数据,即csv,所以我不必重新生成它,我应该选择我想要一个不同的图表。

编辑2:的原始数据文件具有每一条线,其字段由分隔符(|)是分开的一个记录。并非所有字段都是数字。我在输出csv中需要的每个字段都是通过在输入记录上应用某个公式获得的,这些公式可能会使用输入数据中的多个字段。输出csv每行有3-4个字段,我需要在(可能是)条形图中绘制1-2,1-3,1-4字段的图表。我希望能给出更好的图片。

编辑3:我修改了@ adirau的脚本,它似乎工作得很好。我已经走得够远了,我正在读取数据,发送到处理器线程池(伪处理,将线程名称附加到数据),并通过另一个收集器线程将它聚合到输出文件中。

PS:我不确定这个问题的标记,请随时纠正它。

+0

对于那些大小为R的文件可能会变得棘手,因为它相当耗费内存。 R的图形可能性超过了Python的图形可能性(参见http://addictedtor.free.fr/graphiques/)。确保你检查R中的多线程(包裹降雪)。但最重要的是,你熟悉的代码。如果您对R不太熟悉,则很难对其进行优化。 – 2011-03-29 09:15:42

+0

是的,这也是另外一点,我在R中几乎没有任何经验,和numpy和matplotlib一样,但我对python很满意。这也会影响我的选择。 – 2011-03-29 09:26:50

回答

4

python的声音是一个不错的选择,因为它有一个很好的线程API(实现是有问题的),matplotlib和pylab。我错过了一些更多的规格,但也许这可能是一个很好的起点:matplotlib: async plotting with threads。 我会去单线程处理大容量磁盘I/O读取和同步队列到线程池进行数据处理(如果您有固定的记录长度,事前可能会通过预计算读取偏移量并将偏移量仅传递到线程池);使用diskio线程,我会映射数据源文件,读取预定义的num个字节并再读一次,最终将最后一个字节抓到当前数据源行输入的末尾;应该选择接近平均线路输入长度的数字;接下来是通过队列进行游泳池馈送以及线程池中发生的数据处理/绘图;我在这里没有一个好的照片(你准确地绘制了什么),但我希望这有助于。

编辑:有文件。readlines([sizehint])一次抓取多行;以及它可能不那么迅速的Cuz文档都在说使用它的ReadLine()在内部

编辑:快速骨架代码

import threading 
from collections import deque 
import sys 
import mmap 


class processor(Thread): 
    """ 
     processor gets a batch of data at time from the diskio thread 
    """ 
    def __init__(self,q): 
     Thread.__init__(self,name="plotter") 
     self._queue = q 
    def run(self): 
     #get batched data 
     while True: 
      #we wait for a batch 
      dataloop = self.feed(self._queue.get()) 
      try: 
       while True: 
        self.plot(dataloop.next()) 
      except StopIteration: 
       pass 
      #sanitizer exceptions following, maybe 

    def parseline(self,line): 
     """ return a data struct ready for plotting """ 
     raise NotImplementedError 

    def feed(self,databuf): 
     #we yield one-at-time datastruct ready-to-go for plotting 
     for line in databuf: 
      yield self.parseline(line) 

    def plot(self,data): 
     """integrate 
     https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads 
     maybe 
     """ 
class sharedq(object): 
    """i dont recall where i got this implementation from 
    you may write a better one""" 
    def __init__(self,maxsize=8192): 
     self.queue = deque() 
     self.barrier = threading.RLock() 
     self.read_c = threading.Condition(self.barrier) 
     self.write_c = threading.Condition(self.barrier) 
     self.msz = maxsize 
    def put(self,item): 
     self.barrier.acquire() 
     while len(self.queue) >= self.msz: 
      self.write_c.wait() 
     self.queue.append(item) 
     self.read_c.notify() 
     self.barrier.release() 
    def get(self): 
     self.barrier.acquire() 
     while not self.queue: 
      self.read_c.wait() 
     item = self.queue.popleft() 
     self.write_c.notify() 
     self.barrier.release() 
     return item 



q = sharedq() 
#sizehint for readine lines 
numbytes=1024 
for i in xrange(8): 
    p = processor(q) 
    p.start() 
for fn in sys.argv[1:] 
    with open(fn, "r+b") as f: 
     #you may want a better sizehint here 
     map = mmap.mmap(f.fileno(), 0) 
     #insert a loop here, i forgot 
     q.put(map.readlines(numbytes)) 

#some cleanup code may be desirable 
+0

感谢您的想法adirau,我的意图是使用python,以便我可以使用从队列中读取数据的池线程。至于更好的图片,我用更多的信息编辑了这个问题,希望能更好地了解我所要达到的目标。 – 2011-03-29 07:36:36

+0

非常感谢代码adirou,因为我从来没有使用过'deque'和'mmap',所以我花了一段时间。你能指出更多关于这些的信息吗,'queue.Queue'和'deque'之间的区别是什么?为什么不简单地打开文件并依次读取它的行? – 2011-03-29 10:23:32

+0

collections.deque应该提供快速追加和popleft原子操作,不需要锁定(从队列文档中剪下);你可以在python文档中找到deque和mmap文档;我选择批量阅读作为某种快速优化;阅读和排队行似乎是一个坏主意(更多的readline调用,更多的排队操作),所以我认为它更好更快地批量阅读和批量处理; – user237419 2011-03-29 10:32:33

1

我认为Python + numpy的将是最有效的方式,对于速度和易于实施。 Numpy是高度优化的,所以性能很好,python会缓解算法实现部分。

这个组合应该适合您的情况,为您优化内存中的文件加载,尝试找到处理数据块之间的中间点,该数据块不是太大但足够大以最小化读写因为这是什么会减慢程序的速度

如果你觉得这需要更多的加速(我真诚怀疑),你可以使用Cython来加速缓慢的部分。

+0

我没有完全理解你的第二段,如果我做了'.read(2000).splitlines()',它会比每行执行一个'.readline()'好得多。 – 2011-03-29 07:46:04

+0

我会建议如此,因为它会尽量减少读取和写入的周期,再次你必须根据你的配置找到最佳的大小** readline()**的另一件事是,它可能会导致你错误,因为它读了一些时间这些行的顺序与读取文件的顺序不同,特别是在将read-in与file-iteration混合时 – P2bM 2011-03-29 07:58:40