2012-08-13 55 views
3

考虑下面的代码:并发处理从文件

std::vector<int> indices = /* Non overlapping ranges. */; 
std::istream& in = /*...*/; 

for(std::size_t i= 0; i< indices.size()-1; ++i) 
{ 
    in.seekg(indices[i]); 

    std::vector<int> data(indices[i+1] - indices[i]); 

    in.read(reinterpret_cast<char*>(data.data()), data.size()*sizeof(int)); 

    process_data(data); 
} 

我想使这个代码的并行和尽可能快的可能。

使用PPL将parallizing它的一个方法:

std::vector<int> indices = /* Non overlapping ranges. */; 
std::istream& in = /*...*/; 
std::vector<concurrency::task<void>> tasks;  

for(std::size_t i= 0; i< indices.size()-1; ++i) 
{ 
    in.seekg(indices[i]); 

    std::vector<int> data(indices[i+1] - indices[i]); 

    in.read(reinterpret_cast<char*>(data.data()), data.size()*sizeof(int)); 

    tasks.emplace_back(std::bind(&process_data, std::move(data))); 
} 
concurrency::when_all(tasks.begin(), tasks.end()).wait(); 

这种方法的问题是,我要处理在同一个线程中的数据(配合到CPU高速缓存),因为它被读入内存(数据在缓存中很热),但这并不是这种情况,它只是在浪费使用热数据的机会。

我有两个想法如何改善这个,但是,我还没有能够实现。

  1. 在单独的任务上开始下一次迭代。

    /* ??? */ 
    { 
        in.seekg(indices[i]); 
    
        std::vector<int> data(indices[i+1] - indices[i]); // data size will fit into CPU cache. 
    
        in.read(reinterpret_cast<char*>(data.data()), data.size()*sizeof(int)); 
    
        /* Start a task that begins the next iteration? */ 
    
        process_data(data); 
    } 
    
  2. 使用内存映射文件和映射文件的所需区域和,而不是寻求公正从正确的偏移量指针读取。使用parallel_for_each处理数据范围。但是,我不明白内存映射文件在读取内存和缓存时的性能意义。也许我甚至不必考虑缓存,因为文件只是DMA:d到系统内存,从来没有通过CPU?

任何建议或意见?

+0

为什么不尝试将文件拆分成更小的文件,然后在单独的线程中读取和处理它们?对于文件,您必须考虑CPU绑定和I/O绑定 – askmish 2012-08-16 14:30:43

+0

+1。看起来,如果将内存保存在缓存中,您将获得的任何好处都将因文件I/O的等待而变得不足。 – 2012-08-17 14:37:15

+0

@BenFulton:假设文件不在OS缓存中,并且/或者我没有执行此功能的多个并发执行,您是对的。 – ronag 2012-08-17 14:44:35

回答

0

这很可能是你追求错误的目标。如前所述,'热门数据'的任何优势都将因磁盘速度而变得不足。否则,有些重要的细节你没有告诉。
1)是否该文件是“大”
2)无论是单记录是“大”
3)处理是否是“慢”

如果文件是“大”,你最大的任务是确保文件按顺序读取。你的“指数”让我觉得不然。根据我自己的经验,最近的例子是6秒比20分钟,这取决于随机和顺序读取。没有开玩笑。

如果这个文件很小,并且你确信它完全被缓存了,你只需要一个同步队列将任务传递给你的线程,那么在同一个线程中处理就不会有问题。

另一种方式是将“索引”分成两半,每个线程一个。