线程是执行的序列。它们的行为大致类似于线性C++程序,嵌入在内存模型中,可以让它们进行通信并注意由其他执行线程引起的状态更改。
对线程的回调无法在没有来自线程的合作的情况下接管一系列执行。您想要通知的线程必须明确检查消息是否已到达并处理它。
有两种常见的方法来处理对消息的响应。
第一种是std::future
之类的方法。其中,调用者获取某种类型的令牌,该令牌代表将来可能或将要产生的答案。
第二个是再次使用消息。您向B发送消息,要求回复。 B向A发送一条包含响应的消息。与B接收消息的方式相同,A收到消息。该消息可能包含某种“返回目标”,以帮助将其链接到原始消息。
在基于消息的系统中,通常有一个“事件循环”。而不是一个大型的线性程序,你有一个线程可以反复返回到“事件循环”。在那里它检查消息的队列,如果没有消息等待一些消息。
任务有这样的制度下被分解成一口大小的块,这样你检查事件循环的频率足以响应。
执行此操作的一种方法是使用协程,这是一种没有拥有自己的执行程序的执行状态(如拥有两者的线程)。协程定期放弃优先级并“稍后保存它们的状态”。
未来的解决方案通常是最简单的,但它依赖于A周期性地检查响应。
首先,threaded_queue<T>
,它可以让任何数量的生产者和消费者传递的东西放进一个队列,吃起来关前:
template<class T>
struct threaded_queue {
using lock = std::unique_lock<std::mutex>;
void push_back(T t) {
{
lock l(m);
data.push_back(std::move(t));
}
cv.notify_one();
}
boost::optional<T> pop_front() {
lock l(m);
cv.wait(l, [this]{ return abort || !data.empty(); });
if (abort) return {};
auto r = std::move(data.back());
data.pop_back();
return std::move(r);
}
void terminate() {
{
lock l(m);
abort = true;
data.clear();
}
cv.notify_all();
}
~threaded_queue()
{
terminate();
}
private:
std::mutex m;
std::deque<T> data;
std::condition_variable cv;
bool abort = false;
};
现在,我们要的任务传递到这样的队列,并有通过任务的人得到结果。下面是一个使用上述的与包装的任务:
template<class...Args>
struct threaded_task_queue {
threaded_task_queue() = default;
threaded_task_queue(threaded_task_queue&&) = delete;
threaded_task_queue& operator=(threaded_task_queue&&) = delete;
~threaded_task_queue() = default;
template<class F, class R=std::result_of_t<F&(Args...)>>
std::future<R> queue_task(F task) {
std::packaged_task<R(Args...)> p(std::move(task));
auto r = p.get_future();
tasks.push_back(std::move(p));
return r;
}
void terminate() {
tasks.terminate();
}
std::function<void(Args...)> pop_task() {
auto task = tasks.pop_front();
if (!task) return {};
auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task));
return [task_ptr](Args...args){
(*task_ptr)(std::forward<Args>(args)...);
};
}
private:
threaded_queue<std::packaged_task<void(Args...)>> tasks;
};
如果我这样做是正确的,它的工作原理是这样的:
A发送队列任务的拉姆达的形式到B。这个lambda需要一些固定的参数集合(由B提供),并返回一些值。
B弹出队列,并获取采用参数的std::function
。它调用它;它在B的上下文中返回void
。
A在排队任务时被给予future<R>
。它可以查询它是否完成。
你会注意到A不能被“通知”已完成。这需要不同的解决方案。但是如果A最终达到不等待B的结果就无法进步的程度,那么这个系统就可以工作。另一方面,如果A积累了大量这样的消息并且有时需要等待来自许多这样的B的输入,直到它们中的任何一个返回数据(或者用户做了某事),则需要比一个std::future<R>
。一般模式 - 具有代表未来计算的令牌 - 是可靠的。但是你需要扩展它以适应未来计算和消息循环等多种来源。
未经测试的代码。
一为threaded_task_queue
方法,当你发送的消息是:
template<class Signature>
struct message_queue;
template<class R, class...Args>
struct message_queue<R(Args...) :
threaded_task_queue< std::function<R(Args...)> >
{
std::future<R> queue_message(Args...args) {
return this->queue_task(
[tup = std::make_tuple(std::forward<Args>(args)...)]
(std::function<R(Args...)> f) mutable
{
return std::apply(f, std::move(tup));
}
);
}
bool consume_message(std::function<R(Args...)> f)
{
auto task = pop_task();
if (!task) return false;
task(std::move(f));
return true;
}
};
其中对提供商来说,你提供Args...
,并在消费者一方,你消耗Args...
和返回R
,并在供应商方面,你一旦消费者完成,有一个future<R>
来获得结果。
这可能比我写的原始threaded_task_queue
更自然。
std::apply
是C++ 17,但是在C++ 11和C++ 14中有野性的实现。
你可能不希望线程互相访问。处理这种事情的通常方法是使用消息队列。一个线程放入消息,另一个线程收到消息。队列可以处理同步细节。现在,ZeroMQ似乎成为这种类型的流行图书馆。 – vincent
@Vincent我已经在'B'类中使用了一个队列。 'B'拥有队列,'A'通过'add_message(MessageType msg)'方法将消息添加到队列中。我丢失的地方是'A'希望得到答复,那么它怎么知道什么时候有...? –
你需要一个互斥或其他同步机制来防止你的线程互相破坏内存。您不能只从同一个线程读取/写入相同的内存(变量),而无需同步。 –