2014-09-26 57 views
1

我有一个例程是为了加载和解析文件中的数据。可能需要同时从两个位置检索来自同一文件的数据,即在后台缓存过程和用户​​请求期间。并发处理数据。我需要注意什么?

具体来说,我使用的是C++ 11线程和互斥体库。我们用Visual C++ 11(aka 2012)进行编译,所以受到它缺少的限制。

我幼稚的做法又是这样的:

map<wstring,weak_ptr<DataStruct>> data_cache; 
mutex data_cache_mutex; 

shared_ptr<DataStruct> ParseDataFile(wstring file_path) { 
    auto data_ptr = make_shared<DataStruct>(); 

    /* Parses and processes the data, may take a while */ 

    return data_ptr; 
} 

shared_ptr<DataStruct> CreateStructFromData(wstring file_path) { 
    lock_guard<mutex> lock(data_cache_mutex); 

    auto cache_iter = data_cache.find(file_path); 
    if (cache_iter != end(data_cache)) { 
     auto data_ptr = cache_iter->second.lock(); 
     if (data_ptr) 
      return data_ptr; 
     // reference died, remove it 
     data_cache.erase(cache_iter); 
    } 

    auto data_ptr = ParseDataFile(file_path); 

    if (data_ptr) 
     data_cache.emplace(make_pair(file_path, data_ptr)); 

    return data_ptr; 
} 

我的目标是双重的:

  • 允许多个线程加载单独的文件同时
  • 确保文件是只处理一旦

我目前的做法的问题是,它不允许同时解析多个文件。如果我明白会发生什么,他们每个人都会触发锁定并最终以线性方式处理,一次一个线程。它可能会从运行变为运行线程先通过锁的顺序,但最终结果是相同的。

一个解决方案,我认为是要建立一个第二个地图:

map<wstring,mutex> data_parsing_mutex; 

shared_ptr<DataStruct> ParseDataFile(wstring file_path) { 
    lock_guard<mutex> lock(data_parsing_mutex[file_path]); 
    /* etc. */ 
    data_parsing_mutex.erase(file_path); 
} 

但现在我不得不与data_parsing_mutex是如何被更新有关。所以我想我需要另一个互斥体?

map<wstring,mutex> data_parsing_mutex; 
mutex data_parsing_mutex_mutex; 

shared_ptr<DataStruct> ParseDataFile(wstring file_path) { 
    unique_lock<mutex> super_lock(data_parsing_mutex_mutex); 
    lock_guard<mutex> lock(data_parsing_mutex[file_path]); 
    super_lock.unlock(); 

    /* etc. */ 

    super_lock.lock(); 
    data_parsing_mutex.erase(file_path); 
} 

事实上,看着这一点,它不会避免一定双处理一个文件,如果它没有被后台进程,当用户请求,除非我再次检查高速缓存完成。

但是到现在为止我的嗅觉已经在说There must be a better way了。在那儿?期货,承诺或原子会在这里帮助我吗?

+0

据我所知,问题不在'ParseDataFile()'本身?如果从另一个互斥体(在'CreateStructFromData()')中调用它,添加互斥体的意义是什么 – 2014-09-27 00:15:59

+0

创建了第二个互斥体并将其与传递给'ParseDataFile'的每个单独文件关联。外部互斥量投影缓存,而内部互斥量则旨在防止两个线程从同一个文件执行相同的工作。 – 2014-09-27 01:00:26

+1

@MikeE你尝试过使用[异步](http://en.cppreference.com/w/cpp/thread/async)而不是显式并行吗? – Jason 2014-09-27 04:10:55

回答

1

从你所描述的,这听起来像你试图做一个形式的使用线程池,以及参考计数缓存DataStruct的延迟初始化。 std::async应该能够提供很多这样的事情所需的调度和同步。

使用std::async,代码看起来像这样...

map<wstring,weak_ptr<DataStruct>> cache; 
map<wstring,shared_future<shared_ptr<DataStruct>>> pending; 

mutex cache_mutex, pending_mutex; 

shared_ptr<DataStruct> ParseDataFromFile(wstring file) { 
    auto data_ptr = make_shared<DataStruct>(); 

    /* Parses and processes the data, may take a while */ 

    return data_ptr; 

} 


shared_ptr<DataStruct> CreateStructFromData(wstring file) { 
    shared_future<weak_ptr<DataStruct>> pf; 
    shared_ptr<DataStruct> ce; 

    { 
     lock_guard(cache_mutex); 

     auto ci = cache.find(file); 

     if (!(ci == cache.end() || ci->second.expired())) 
      return ci->second.lock(); 
    } 

    { 
     lock_guard(pending_mutex); 

     auto fi = pending.find(file); 

     if (fi == pending.end() || fi.second.get().expired()) { 

      pf = async(ParseDataFromFile, file).share(); 
      pending.insert(fi, make_pair(file, pf)); 

     } else { 

      pf = pi->second; 

     } 
    } 

    pf.wait(); 
    ce = pf.get(); 

    { 
     lock_guard(cache_mutex); 

     auto ci = cache.find(file); 

     if (ci == cache.end() || ci->second.expired()) 
      cache.insert(ci, make_pair(file, ce)); 
    } 

    { 
     lock_guard(pending_mutex); 

     auto pi = pending.find(file); 

     if (pi != pending.end()) 
      pending.erase(pi); 
    } 

    return ce; 

} 

这可能可以优化一点,但一般的想法应该是一样的。

0

在典型的计算机上,试图同时加载文件没有意义,因为磁盘访问将成为瓶颈。相反,最好有一个线程加载文件(或使用异步I/O),并将解析分散到一个线程池中。然后将结果存储在共享容器中。

关于防止双工,你应该考虑这是否真的有必要。如果您只是通过不成熟的优化来做到这一点,那么您可能会通过专注于使程序响应而非高效来让用户更快乐。也就是说,确保用户快速获得他们所要求的内容,即使这意味着要做双重工作。

OTOH,如果存在两次解析文件的技术原因,可以跟踪每个文件(加载,解析,解析)在共享容器中的状态。

+0

在这种情况下,它是必要的。该过程所做的大部分工作是在读取数据后搅动数据。瓶颈不是I/O,几乎立即结束。我正在实现这一点,因为切换到另一组数据的延迟是一个问题,它肯定是可以在后台完成的事情。 – 2014-09-28 00:01:36

相关问题