2016-06-15 82 views
2

我正在尝试编写一个C++ 11/14程序,其中固定数量的线程(比如4个)持续从线程安全队列中取出工作,直到那里在队列中没有工作。具有固定数量的线程和线程安全队列的C++多线程

线程安全的队列实现:

template<typename T> 
class threadsafe_queue 
{ 
private: 
    mutable std::mutex mut; 
    std::queue<T> data_queue; 
    std::condition_variable data_cond; 
public: 
    threadsafe_queue() {} 
    threadsafe_queue(threadsafe_queue const &other) 
    { 
    std::lock_guard<std::mutex> lk(other.mut); 
    data_queue = other.data_queue; 
    } 

    void push(T new_value) 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    data_queue.push(new_value); 
    data_cond.notify_one(); 
    } 

    void wait_and_pop(T &value) 
    { 
    std::unique_lock<std::mutex> lk(mut); 
    data_cond.wait(lk, [this]{return !data_queue.empty();}); 
    value = data_queue.front(); 
    data_queue.pop(); 
    } 

    std::shared_ptr<T> wait_and_pop() 
    { 
    std::unique_lock<std::mutex> lk(mut); 
    data_cond.wait(lk, [this]{return !data_queue.empty();}); 
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); 
    data_queue.pop(); 
    return res; 
    } 

    bool try_pop(T &value) 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    if (data_queue.empty()) 
     return false; 
    value = data_queue.front(); 
    data_queue.pop(); 
    return true; 
    } 

    std::shared_ptr<T> try_pop() 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    if (data_queue.empty()) 
     return std::shared_ptr<T>(); 
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); 
    data_queue.pop(); 
    return res; 
    } 

    bool empty() const 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    return data_queue.empty(); 
    } 
}; 

功能每个线程运行:

void insertintobidask(std::string connstring, std::string ziparchivename, OFStreamWriter &errlog) { /.../ } 

这些线程应该直到没有留在工作地脱下工作队列工作的主队列:

int main() 
{ 
    std::ofstream errlog 
    errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out); 
    OFStreamWriter ofsw(&errlog); 

    threadsafe_queue<std::string> wqueue; 
    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     wqueue.push(name); 
    } 
    } 

    /* Each thread below would run once, how do I modify it to make it continuously take a work off the queue and run until there is no work left in the queue? 
    std::thread consumer1 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer2 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer3 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer4 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 

    consumer1.join(); 
    consumer2.join(); 
    consumer3.join(); 
    consumer4.join(); 
    */ 

    errlog.close(); 
    return 0; 
} 

我尝试了另一种方法,基于尼姆的答案下面和 有用。

/* g++ -std=gnu++11 fxetl.cxx -o fxetl -lboost_system -lboost_filesystem -lzip -lpqxx -lpq -pthread */ 

#include <boost/filesystem.hpp> 
#include <regex> 
#include <iostream> 
#include <fstream> 
#include <string> 
#include <pqxx/pqxx> 
#include <zip.h> 
#include <thread> 
#include <boost/asio.hpp> 
#include "threadsafe_oerrlog.h" 

void insertintobidask(pqxx::nontransaction &txn, std::string ziparchivename, OFStreamWriter &errlog) 
{ 
    std::string fileyearmonth = ziparchivename.substr(27, 6); 
    std::string ziparchivepath = "/home/vorlket/Desktop/Project/Code/Test/Data/HISTDATA_COM_ASCII_AUDUSD_T" + fileyearmonth + ".zip"; 
    std::string zipfilepath = "DAT_ASCII_AUDUSD_T_" + fileyearmonth + ".csv"; 
    int err, r; 
    char buffer[39]; // each line takes up 39 bytes 

    struct zip *ziparchive = zip_open(ziparchivepath.c_str(), 0, &err); 
    if (ziparchive) 
    { 
    struct zip_file *zipfile = zip_fopen(ziparchive, zipfilepath.c_str(), 0); 
    if (zipfile) 
    { 
     while ((r = zip_fread(zipfile, buffer, sizeof(buffer))) > 0) 
     { 
     std::string str(buffer); 
     txn.exec("INSERT INTO fx.bidask VALUES('AUDUSD', to_timestamp(" +txn.quote(str.substr(0, 18)) + ", 'YYYYMMDD HH24MISSMS'), " + txn.quote(str.substr(19, 8)) + ", " + txn.quote(str.substr(28, 8)) + ")"); 
     } 
     zip_fclose(zipfile); 
     std::cout << fileyearmonth << std::endl; 
    } 
    else 
    { 
     errlog << zipfilepath; 
    } 
    } 
    else 
    { 
    errlog << ziparchivepath; 
    } 

    zip_close(ziparchive); 
} 


int main() 
{ 
    pqxx::connection conn1("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn1(conn1); 
    pqxx::connection conn2("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn2(conn2); 
    pqxx::connection conn3("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn3(conn3); 
    pqxx::connection conn4("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn4(conn4); 

    std::ofstream errlog("/home/vorlket/Desktop/Project/Code/Test/errlog.txt"); 
    OFStreamWriter ofsw(&errlog); 

    boost::asio::io_service service1; // queue 
    boost::asio::io_service service2; 
    boost::asio::io_service service3; 
    boost::asio::io_service service4; 

    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    int serviceid = 0; 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     serviceid %= 3; 
     switch (serviceid) 
     { 
     case 0 : 
      service1.post([&txn1, name, &ofsw]() { insertintobidask(txn1, name, ofsw); }); 
      break; 
     case 1 : 
      service2.post([&txn2, name, &ofsw]() { insertintobidask(txn2, name, ofsw); }); 
      break; 
     case 2 : 
      service3.post([&txn3, name, &ofsw]() { insertintobidask(txn3, name, ofsw); }); 
      break; 
     case 3 : 
      service4.post([&txn4, name, &ofsw]() { insertintobidask(txn4, name, ofsw); }); 
      break; 
     } 
     ++serviceid; 
    } 
    } 

    std::thread t1([&service1]() { service1.run(); }); 
    std::thread t2([&service2]() { service2.run(); }); 
    std::thread t3([&service3]() { service3.run(); }); 
    std::thread t4([&service4]() { service4.run(); }); 

    t1.join(); 
    t2.join(); 
    t3.join(); 
    t4.join(); 

} 

不知道哪种方法更快,但我想这取决于工作负载和正在开发的平台。值得一试,看看哪个更快。任何意见,哪种方法会更快,谁赞赏。

+0

类似这样的问题在这里不欢迎,并经常导致重downvoting。与我们分享您已经实施/尝试到现在,并询问您的代码无法工作的具体问题。说,欢迎来到SO! – Arunmu

+0

看看[线程支持库](http://en.cppreference.com/w/cpp/thread)。你会在那里找到你需要的大部分东西。 – Aconcagua

+0

编辑这个问题来分享我所尝试过的。 – vorlket

回答

2

除非这是为了学习/它不够快的事情,否则我会将这些crud操作委托给现有机制。而我更喜欢使用boost::asio::io_service这个确切类型的事情..

代码如下:

// Additional header 
#include <boost/asio.hpp> 

int main() 
{ 
    std::ofstream errlog 
    errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out); 
    OFStreamWriter ofsw(&errlog); 

    boost::asio::io_service service; // queue 
    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     service.post([name]() { 
     // Do something with this file 
     }); 
    } 
    } 
    // Now start-up n-threads to dispatch on the io_service 
    std::thread t1([&service]() { service.run(); }); // this will dispatch on queue until there is nothing left to do... 
    std::thread t2([&service]() { service.run(); }); 
    std::thread t3([&service]() { service.run(); }); 
    std::thread t4([&service]() { service.run(); }); 
    : 

    // Wait for them to complete 
    t1.join(); 
    t2.join(); 
    t3.join(); 
    t4.join(); 
} 
+0

Nim,我在lambda中插入了一个函数调用,但这些线程似乎没有做任何工作。我试过的代码在上面编辑的问题中。 – vorlket

+0

@vorlket,这些变化看起来不错,我看不出有什么特别的错误 - 可能是一些日志会帮助我们看看事情是在哪里? – Nim

+0

inserintobidask函数中存在相对/绝对路径的错误。有用。感谢分享。 – vorlket