我正在尝试编写一个C++ 11/14程序,其中固定数量的线程(比如4个)持续从线程安全队列中取出工作,直到那里在队列中没有工作。具有固定数量的线程和线程安全队列的C++多线程
线程安全的队列实现:
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
threadsafe_queue(threadsafe_queue const &other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T &value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T &value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
功能每个线程运行:
void insertintobidask(std::string connstring, std::string ziparchivename, OFStreamWriter &errlog) { /.../ }
这些线程应该直到没有留在工作地脱下工作队列工作的主队列:
int main()
{
std::ofstream errlog
errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out);
OFStreamWriter ofsw(&errlog);
threadsafe_queue<std::string> wqueue;
boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data");
std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip");
for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter)
{
std::string name = iter->path().filename().string();
if (std::regex_match(name, pattern_fx))
{
wqueue.push(name);
}
}
/* Each thread below would run once, how do I modify it to make it continuously take a work off the queue and run until there is no work left in the queue?
std::thread consumer1 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer2 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer3 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer4 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
*/
errlog.close();
return 0;
}
我尝试了另一种方法,基于尼姆的答案下面和 有用。
/* g++ -std=gnu++11 fxetl.cxx -o fxetl -lboost_system -lboost_filesystem -lzip -lpqxx -lpq -pthread */
#include <boost/filesystem.hpp>
#include <regex>
#include <iostream>
#include <fstream>
#include <string>
#include <pqxx/pqxx>
#include <zip.h>
#include <thread>
#include <boost/asio.hpp>
#include "threadsafe_oerrlog.h"
void insertintobidask(pqxx::nontransaction &txn, std::string ziparchivename, OFStreamWriter &errlog)
{
std::string fileyearmonth = ziparchivename.substr(27, 6);
std::string ziparchivepath = "/home/vorlket/Desktop/Project/Code/Test/Data/HISTDATA_COM_ASCII_AUDUSD_T" + fileyearmonth + ".zip";
std::string zipfilepath = "DAT_ASCII_AUDUSD_T_" + fileyearmonth + ".csv";
int err, r;
char buffer[39]; // each line takes up 39 bytes
struct zip *ziparchive = zip_open(ziparchivepath.c_str(), 0, &err);
if (ziparchive)
{
struct zip_file *zipfile = zip_fopen(ziparchive, zipfilepath.c_str(), 0);
if (zipfile)
{
while ((r = zip_fread(zipfile, buffer, sizeof(buffer))) > 0)
{
std::string str(buffer);
txn.exec("INSERT INTO fx.bidask VALUES('AUDUSD', to_timestamp(" +txn.quote(str.substr(0, 18)) + ", 'YYYYMMDD HH24MISSMS'), " + txn.quote(str.substr(19, 8)) + ", " + txn.quote(str.substr(28, 8)) + ")");
}
zip_fclose(zipfile);
std::cout << fileyearmonth << std::endl;
}
else
{
errlog << zipfilepath;
}
}
else
{
errlog << ziparchivepath;
}
zip_close(ziparchive);
}
int main()
{
pqxx::connection conn1("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn1(conn1);
pqxx::connection conn2("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn2(conn2);
pqxx::connection conn3("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn3(conn3);
pqxx::connection conn4("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn4(conn4);
std::ofstream errlog("/home/vorlket/Desktop/Project/Code/Test/errlog.txt");
OFStreamWriter ofsw(&errlog);
boost::asio::io_service service1; // queue
boost::asio::io_service service2;
boost::asio::io_service service3;
boost::asio::io_service service4;
boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data");
std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip");
int serviceid = 0;
for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter)
{
std::string name = iter->path().filename().string();
if (std::regex_match(name, pattern_fx))
{
serviceid %= 3;
switch (serviceid)
{
case 0 :
service1.post([&txn1, name, &ofsw]() { insertintobidask(txn1, name, ofsw); });
break;
case 1 :
service2.post([&txn2, name, &ofsw]() { insertintobidask(txn2, name, ofsw); });
break;
case 2 :
service3.post([&txn3, name, &ofsw]() { insertintobidask(txn3, name, ofsw); });
break;
case 3 :
service4.post([&txn4, name, &ofsw]() { insertintobidask(txn4, name, ofsw); });
break;
}
++serviceid;
}
}
std::thread t1([&service1]() { service1.run(); });
std::thread t2([&service2]() { service2.run(); });
std::thread t3([&service3]() { service3.run(); });
std::thread t4([&service4]() { service4.run(); });
t1.join();
t2.join();
t3.join();
t4.join();
}
不知道哪种方法更快,但我想这取决于工作负载和正在开发的平台。值得一试,看看哪个更快。任何意见,哪种方法会更快,谁赞赏。
类似这样的问题在这里不欢迎,并经常导致重downvoting。与我们分享您已经实施/尝试到现在,并询问您的代码无法工作的具体问题。说,欢迎来到SO! – Arunmu
看看[线程支持库](http://en.cppreference.com/w/cpp/thread)。你会在那里找到你需要的大部分东西。 – Aconcagua
编辑这个问题来分享我所尝试过的。 – vorlket