2017-02-28 96 views
1

我一直在围绕C++中的回调函数进行封装。我试图实现以下内容:C++ lambda回调触发事件

我有两个对象,每个都有自己的线程。一个对象A具有指向第二个对象B的指针。见例如:

class A 
{ 
    public: 
    // ... 
    private: 
    std::unique_ptr<B> b; 
}; 

class B 
{ 
    public: 
    void add_message(MessageType msg); 
    // ... 
}; 

我想实现是有目的A使用指针添加一条消息,B,然后继续做其他的东西,但有当B有一个是被触发的回调或处理程序或东西回复该消息。 B对消息做了一些处理,并可能将其传递给其他对象以在其自己的线程上进行处理,但最终会得到答复。所以,我怎么能知道什么时候B有我的消息的答复,例如:

// In class A 
MessageType m(); 
b->add_message(m) 
// A's thread continues doing other stuff 
... 
// some notification that b has a reply? 

我知道我可能要使用std ::功能,我想用一个回调,但我不能让我通过查看大量示例来了解如何完成此操作。任何帮助表示赞赏,并注意我已经看了很多例子,但不能将其与我正在尝试实现或不理解的东西联系起来......

+1

你可能不希望线程互相访问。处理这种事情的通常方法是使用消息队列。一个线程放入消息,另一个线程收到消息。队列可以处理同步细节。现在,ZeroMQ似乎成为这种类型的流行图书馆。 – vincent

+0

@Vincent我已经在'B'类中使用了一个队列。 'B'拥有队列,'A'通过'add_message(MessageType msg)'方法将消息添加到队列中。我丢失的地方是'A'希望得到答复,那么它怎么知道什么时候有...? –

+0

你需要一个互斥或其他同步机制来防止你的线程互相破坏内存。您不能只从同一个线程读取/写入相同的内存(变量),而无需同步。 –

回答

3

线程是执行的序列。它们的行为大致类似于线性C++程序,嵌入在内存模型中,可以让它们进行通信并注意由其他执行线程引起的状态更改。

对线程的回调无法在没有来自线程的合作的情况下接管一系列执行。您想要通知的线程必须明确检查消息是否已到达并处理它。


有两种常见的方法来处理对消息的响应。

第一种是std::future之类的方法。其中,调用者获取某种类型的令牌,该令牌代表将来可能或将要产生的答案。

第二个是再次使用消息。您向B发送消息,要求回复。 B向A发送一条包含响应的消息。与B接收消息的方式相同,A收到消息。该消息可能包含某种“返回目标”,以帮助将其链接到原始消息。

在基于消息的系统中,通常有一个“事件循环”。而不是一个大型的线性程序,你有一个线程可以反复返回到“事件循环”。在那里它检查消息的队列,如果没有消息等待一些消息。

任务有这样的制度下被分解成一口大小的块,这样你检查事件循环的频率足以响应。

执行此操作的一种方法是使用协程,这是一种没有拥有自己的执行程序的执行状态(如拥有两者的线程)。协程定期放弃优先级并“稍后保存它们的状态”。


未来的解决方案通常是最简单的,但它依赖于A周期性地检查响应。

首先,threaded_queue<T>,它可以让任何数量的生产者和消费者传递的东西放进一个队列,吃起来关前:

template<class T> 
struct threaded_queue { 
    using lock = std::unique_lock<std::mutex>; 
    void push_back(T t) { 
    { 
     lock l(m); 
     data.push_back(std::move(t)); 
    } 
    cv.notify_one(); 
    } 
    boost::optional<T> pop_front() { 
    lock l(m); 
    cv.wait(l, [this]{ return abort || !data.empty(); }); 
    if (abort) return {}; 
    auto r = std::move(data.back()); 
    data.pop_back(); 
    return std::move(r); 
    } 
    void terminate() { 
    { 
     lock l(m); 
     abort = true; 
     data.clear(); 
    } 
    cv.notify_all(); 
    } 
    ~threaded_queue() 
    { 
    terminate(); 
    } 
private: 
    std::mutex m; 
    std::deque<T> data; 
    std::condition_variable cv; 
    bool abort = false; 
}; 

现在,我们要的任务传递到这样的队列,并有通过任务的人得到结果。下面是一个使用上述的与包装的任务:

template<class...Args> 
struct threaded_task_queue { 
    threaded_task_queue() = default; 
    threaded_task_queue(threaded_task_queue&&) = delete; 
    threaded_task_queue& operator=(threaded_task_queue&&) = delete; 
    ~threaded_task_queue() = default; 
    template<class F, class R=std::result_of_t<F&(Args...)>> 
    std::future<R> queue_task(F task) { 
    std::packaged_task<R(Args...)> p(std::move(task)); 
    auto r = p.get_future(); 
    tasks.push_back(std::move(p)); 
    return r; 
    } 
    void terminate() { 
    tasks.terminate(); 
    } 
    std::function<void(Args...)> pop_task() { 
    auto task = tasks.pop_front(); 
    if (!task) return {}; 
    auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task)); 
    return [task_ptr](Args...args){ 
     (*task_ptr)(std::forward<Args>(args)...); 
    }; 
    } 
private: 
    threaded_queue<std::packaged_task<void(Args...)>> tasks; 
}; 

如果我这样做是正确的,它的工作原理是这样的:

  • A发送队列任务的拉姆达的形式到B。这个lambda需要一些固定的参数集合(由B提供),并返回一些值。

  • B弹出队列,并获取采用参数的std::function。它调用它;它在B的上下文中返回void

  • A在排队任务时被给予future<R>。它可以查询它是否完成。

你会注意到A不能被“通知”已完成。这需要不同的解决方案。但是如果A最终达到不等待B的结果就无法进步的程度,那么这个系统就可以工作。另一方面,如果A积累了大量这样的消息并且有时需要等待来自许多这样的B的输入,直到它们中的任何一个返回数据(或者用户做了某事),则需要比一个std::future<R>。一般模式 - 具有代表未来计算的令牌 - 是可靠的。但是你需要扩展它以适应未来计算和消息循环等多种来源。

未经测试的代码。

一为threaded_task_queue方法,当你发送的消息是:

template<class Signature> 
struct message_queue; 
template<class R, class...Args> 
struct message_queue<R(Args...) : 
    threaded_task_queue< std::function<R(Args...)> > 
{ 
    std::future<R> queue_message(Args...args) { 
    return this->queue_task(
     [tup = std::make_tuple(std::forward<Args>(args)...)] 
     (std::function<R(Args...)> f) mutable 
     { 
     return std::apply(f, std::move(tup)); 
     } 
    ); 
    } 
    bool consume_message(std::function<R(Args...)> f) 
    { 
    auto task = pop_task(); 
    if (!task) return false; 
    task(std::move(f)); 
    return true; 
    } 
}; 

其中对提供商来说,你提供Args...,并在消费者一方,你消耗Args...和返回R,并在供应商方面,你一旦消费者完成,有一个future<R>来获得结果。

这可能比我写的原始threaded_task_queue更自然。

std::apply是C++ 17,但是在C++ 11和C++ 14中有野性的实现。

+0

将函数'threaded_queue :: push_back'中的'cv.notify_one()'保留在另一个代码块之外的原因是什么? – Steephen

+1

@Steephen'notify_one'不需要你持有互斥体。所以我没有。在某些实现中,这可能会有所帮助,因为侦听线程不会被通知唤醒,而会立即阻塞互斥量。 – Yakk

+0

明白了。非常感谢您的澄清。 – Steephen