2015-03-13 879 views
2

我有一个大文件,我必须通过块阅读它。每次当我读块时,我都要做一些耗时的操作,所以我认为多线程读取可能会有所帮助,每个线程都会逐个读取一块并执行操作。这是我在C++编写代码11如何多线程读取C++ 11中的文件?

#include<iostream> 
#include<fstream> 
#include <condition_variable> 
#include <mutex> 
#include <thread> 
using namespace std; 
const int CHAR_PER_FILE = 1e8; 
const int NUM_THREAD = 2; 
int order = -1; 
bool is_reading = false; 
mutex mtx; 
condition_variable file_not_reading; 
void partition(ifstream& is) 
{ 
    while (is.peek() != EOF) 
    { 
     unique_lock<mutex> lock(mtx); 
     while (is_reading) 
      file_not_reading.wait(lock); 

     is_reading = true; 
     char *c = new char[CHAR_PER_FILE]; 

     is.read(c, CHAR_PER_FILE); 
     order++; 

     is_reading = false; 

     file_not_reading.notify_all(); 
     lock.unlock(); 

     char oc[3]; 
     sprintf(oc, "%d", order); 
     this_thread::sleep_for(chrono::milliseconds(2000));//some operations that take long time 
     ofstream os(oc, ios::binary); 
     os.write(c, CHAR_PER_FILE); 
     delete[] c; 
     os.close(); 
    } 
} 

int main() 
{ 
    ifstream is("bigfile.txt",ios::binary); 
    thread threads[NUM_THREAD]; 
    for (int i = 0; i < NUM_THREAD; i++) 
     threads[i] = thread(partition, ref(is)); 

    for (int i = 0; i < NUM_THREAD; i++) 
     threads[i].join(); 

    is.close(); 
    system("pause"); 
    return 0; 
} 

但我的代码没有工作,它不仅创造,而不是`bigfilesize/CHAR_PER_FILE 4个文件,和线程似乎卡住了,我怎样才能使它发挥作用?

是否有任何c + + 11多线程读取文件的实现或例子?

谢谢。

+0

'sprintf(oc,“%d”,order);'不受保护意味着'order'可能已经增加,因为解锁和名字很可能不是唯一的。而且你没有对'is.read'进行错误检查,所以你可能会写垃圾。 – molbdnilo 2015-03-13 09:22:28

+0

这里有几个线程安全问题。共享的全局变量需要是“volatile”的。 'ifstream'不是线程安全的,但它被引用到互斥体之外 - 为每个线程打开一个单独的'ifstream'。 – 2015-03-13 09:39:59

+1

@AndyBrown使这些变量不稳定是没有意义的。这对线程问题没有帮助。请参阅http://herbsutter.com/2009/01/12/effective-concurrency-volatile-vs-volatile/进行深入解释。 – Jens 2015-03-13 10:32:08

回答

4

我的建议是:

  • 使用一个线程从文件中读取数据块。每次读取块时,都会将其发布到请求队列中。这是不值得阅读多线程,因为会有内部锁/阻塞阅读一个共同的资源。
  • 使用线程池。他们每个人都从队列中读取,检索一个块,执行昂贵的操作并返回等待一个新的请求。
  • 队列必须受到互斥锁保护。
  • 请勿使用比您拥有的处理单元(CPU/Cores/HyperThreads)数量多的线程。
  • 以上主要警告是它不能保证处理顺序。您可能需要将结果发布到可重新排序的中央位置(再次集中位置 - >必须受到互斥锁保护)。
+0

那么,我目前的方式就是你说的,但是我必须将这些块存储在回购站中,它需要更多内存,所以我认为读取文件并执行昂贵的操作可能是更好的方法。 – user1024 2015-03-13 09:04:27

0

该文件是否必须以“顺序”顺​​序读取,即组块是否必须按特殊顺序“操作”?否则,你可以例如创建4个线程并让每个线程读取文件的1/4(可以通过使用tellg并在例如向量或变量中保存位置来完成)。这样你就不必使用锁。

也许你可以告诉我们你读的数据是如何评估的。

+0

我已经考虑过你的方法,但是每个线程都必须打开文件并找到正确的阅读位置,这将会有点棘手。我不知道tellg的表现。 – user1024 2015-03-13 09:07:07

+0

我明白了。你有没有考虑过使用主动对象模型(基本上是线程安全队列+异步调用+命令模式)?实现起来可能有点棘手,但如果您需要在程序中经常使用多线程,那么可能值得一试。 – Asthea 2015-03-13 09:11:56

0

也许......

void partition(ifstream& is) 
{ 
    unique_lock<mutex> lock(mtx); 
    std::vector<char> c(CHAR_PER_FILE); 
    is.read(c.data(), CHAR_PER_FILE); 
    lock.unlock(); 

    if (is.fail() && !is.eof()) return; 
    size_t num_bytes_read = is.gcount(); 

    std::ostringstream oc; 
    oc << order; 
    this_thread::sleep_for(chrono::milliseconds(2000)); //take long time 
    if (std::ofstream os(oc, ios::binary)) 
     os.write(c.data(), CHAR_PER_FILE); 
} 

注:

  • 互斥已串行化的操作 - 无需使用条件变量。

  • 我添加了一个小错误输入和字节读取处理 - os.write()后,你应该检查过,添加一个else失败ofstream创作等

+0

感谢您的代码,但速度很慢。那是因为当A在读时,B被阻塞,A完成后B仍然被阻塞。我们是否必须手动通知B?顺便说一句,在你的实现中,一个线程只读取一次,如果我不知道文件大小,我该如何决定创建多少个线程? – user1024 2015-03-14 02:40:46

+0

@ user1024你已经用非常少的分析发起了这个,或者不分享你对我们做的任何分析。这可能是因为I/O只占整个时间的很小一部分,并且几乎没有任何东西可以通过与处理并行完成大部分时间。你应该看看只需要不经过处理就读取数据需要多长时间(两者都不是在缓存中,而是当缓存中没有缓存时),并且考虑这个整个想法是否被误导。至于你的问题,当解锁时,操作系统会唤醒锁上的下一个线程。 – 2015-03-14 05:16:57

+0

而且 - 请不要将它称为“我的实现” - 我只是修复了实现中的一些错误,希望能从中学到一些东西。如果我写我自己的实现,我愿意做的事情完全不同 - 有一个线程做所有的I/O(好像是同一个物理设备线程它只是力量的目的,可以很容易减慢I/O上)然后让该线程以任何块大小似乎有意义的方式将工作交给其他线程。这与jsantander推荐的类似。 – 2015-03-14 05:22:56

1

你可以使用基于任务的并行std :: async:

class result; // result of expensive operation 
result expensive_operation(std::vector<char> const& data) 
{ 
result r = // long computation 
return r; 
} 

std::vector<char>::size_type BLOCK_SIZE = 4096; 

std::vector<std::future<result>> partition(ifstream& in) 
{ 
    std::vector<std::future<result>> tasks; 

    while (!in.eof() && !in.fail()) 
    { 
     std::vector<char> c(BLOCK_SIZE); 
     is.read(c.data(), BLOCK_SIZE); 
     c.resize(in.gcount()); 
     tasks.push_back(std::async([](std::vector<char> data) 
     { 
      return expensive_operation(data); 
     }, 
     std::move(c))); 
    } 
    return tasks; 
} 

int main() 
{ 
    ifstream is("bigfile.txt",ios::binary); 
    auto results = partition(is); 

    // iterate over results and do something with it 
} 
+0

你的实现是好的,但我如何控制创建多少个线程? – user1024 2015-03-14 03:03:41

+0

@ user1024我想你不会。 std :: async的一个要点是让plattform决定何时产生一个新的线程。我认为在Windows上它使用PPL和线程池。如果你在Windows上,看一下PPL。它包含一个更好的具有并行算法的任务并行框架。在Linux上,我建议查看提供相同内容的英特尔TBB。两者都有parallel_for和parallel_reduce。 – Jens 2015-03-14 19:20:40