2017-05-24 124 views
1

我正在尝试开发一种管道,其中首先读取和处理数据,操作一次,以不同方式操作并显示数据。我有一个设计,其中数据IO馈送到第一个操纵器读取的缓冲区中。随后,第一个操纵器写入另一个缓冲区,该缓冲区在第二个操纵器可能的情况下被读取。最后,将第二个操纵器的输出写入显示缓冲区,然后由可视化工具读取并使用OpenGL进行显示。在流水线执行中采用并行处理

在我看来,这是一个相当直接的并行问题,其中每个任务都有自己的线程并通过数据缓冲区进行通信。然而,我在线程程序中遇到的所有教程似乎都表明,多线程是一些中间件(如OpenMP)决定如何划分工作负载。

我是新开发多线程应用程序,所以这可能是一个愚蠢的问题,但是我所描述的是可行的,可以用OpenMP等中间件来完成吗?我意识到显而易见的答案是“尝试它”,并且我想,但是这些教程没有阐明如何尝试它。

+0

什么是“缓冲块”的含义实验?函数调用块,而不是数据结构。 –

+0

@KerrekSB:你说得对。我的意思是读取电话会阻塞,直到有缓冲区中的数据 – marcman

+0

我想你应该看看[消费者/制造商问题](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem ) – cbuchart

回答

1

OpenMP更适合易于跨多个内核(SIMD)的算法。其他情况是可能的,但在你的情况下,我认为直接使用线程会更好,并且更容易编码和维护。

我将我的答案分为两部分:一般解决方案没有 OpenMP,以及一些使用OpenMP的特定更改。

正如在评论中提到的,你面对生产者/消费者问题,但是两次:一个线程正在填充一个缓冲区(产生一个项目),然后必须通过第二个线程读取(并修改) )。你的问题的特殊性在于,这第二个线程也是一个生产者(要绘制的图像),第三个线程是负责使用它的人(可视化器)。如你所知,P/C问题是通过使用一个缓冲区(可能是一个循环缓冲区或一个生产项目队列)来解决的,其中缓冲区的每个元素都被标记为生成或消耗,并且线程具有独占性访问时添加或从中获取项目。


让我们在下面的示例程序中使用队列方法处理您的问题。

  • 生产物品将存储在队列中。队列的前面包含最早的元素,这些元素必须先被消费。
  • 有两个队列:一个用于第一个操纵器生成的数据(并被第二个操纵器使用),另一个用于第二个操纵器生成的数据(并且将由另一个线程显示)。
  • 生产阶段很简单:获得对相应队列的独占访问权,并在最后插入元素。
  • 消费类似,但必须等待队列至少有一个元素(非空)。
  • 我添加了一些来模拟其他操作。
  • 停止条件仅用于说明目的。

注:我假设你有机会获得一个C++编译器11为简单起见。使用其他API的实现相对相似。

#include <iostream> 
#include <thread> 
#include <mutex> 
#include <atomic> 
#include <chrono> 
#include <list> 

using namespace std::chrono_literals; 

std::mutex g_data_produced_by_m1_mutex; 
std::list<int> g_data_produced_by_m1; 

std::mutex g_data_produced_by_m2_mutex; 
std::list<int> g_data_produced_by_m2; 

std::atomic<bool> stop = false; 

void manipulator1_kernel() 
{ 
    while (!stop) { 
    // Producer 1: generate data 
    { 
     std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex); 
     g_data_produced_by_m1.push_back(rand()); 
    } 
    std::this_thread::sleep_for(100ms); 
    } 
} 

void manipulator2_kernel() 
{ 
    int data; 

    while (!stop) { 
    // Consumer 1 
    while (!stop) { // wait until there is an item to be consumed 
     { 
     std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex); 
     if (!g_data_produced_by_m1.empty()) { // is there data to be consumed? 
      data = g_data_produced_by_m1.front(); // consume 
      g_data_produced_by_m1.pop_front(); 
      break; 
     } 
     } 
     std::this_thread::sleep_for(100ms); 
    } 

    // Producer 2: modify and send to the visualizer 
    { 
     std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex); 
     g_data_produced_by_m2.push_back(5 * data); 
    } 

    std::this_thread::sleep_for(100ms); 
    } 
} 

void visualizer_kernel() 
{ 
    int data; 

    while (!stop) { 
    // Consumer 2 
    while (!stop) { // wait until there is an item to be visualized 
     { 
     std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex); 
     if (!g_data_produced_by_m2.empty()) { 
      data = g_data_produced_by_m2.front(); 
      g_data_produced_by_m2.pop_front(); 
      break; 
     } 
     } 
     std::this_thread::sleep_for(100ms); 
    } 

    std::cout << data << std::endl; // render to display 
    std::this_thread::sleep_for(100ms); 

    if (data % 8 == 0) stop = true; // some stop condition for the example 
    } 
} 

int main() 
{ 
    std::thread manipulator1(manipulator1_kernel); 
    std::thread manipulator2(manipulator2_kernel); 
    std::thread visualizer(visualizer_kernel); 

    visualizer.join(); 
    manipulator2.join(); 
    manipulator1.join(); 

    return 0; 
} 

如果你仍然想使用OpenMP的,也许你可以找到最接近的是tasks(因为OpenMP的3.0,我认为)。我没有用他们很担心,但是上面的程序可以写成这样:

int main() 
{ 
    #pragma omp parallel 
    { 
    #pragma omp task 
    manipulator1_kernel(); 
    #pragma omp task 
    manipulator2_kernel(); 
    #pragma omp task 
    visualizer_kernel(); 

    #pragma omp taskwait 
    }  

    return 0; 
} 

的代码的其余部分可改为使用OpenMP的功能太多,但我觉得这回答了你的问题。

这种方法的主要问题是,您必须为OpenMP parallel中的任务创建一个代码块,从而使您的应用程序逻辑和结构的其余部分变得复杂。

1

要解决此特定问题英特尔®线程构建模块库包含特殊结构。 Intel® TBB是跨平台的库,它有助于多线程编程。 我们可以在四个不同的任务提供者处查看应用程序中涉及的实体。一种类型的任务是输入任务 - 那些提供输入数据的任务,另一种类型的任务由第一个操作例程提供,等等。

因此,用户需要做的唯一事情就是为这些任务提供正文。库中有几个API用于指定要处理的物体以及如何并行处理。其他一切(这里我指的是线程创建,任务执行之间的同步,工作平衡等)由库完成。

我想到的最简单的解决方案是使用parallel_pipeline函数。这里是原型:

#include "tbb/pipeline.h" 
using namespace tbb; 

int main() { 
    parallel_pipeline(/*specify max number of bodies executed in parallel, e.g.*/16, 
     make_filter<void, input_data_type>(
      filter::serial_in_order, // read data sequentially 
      [](flow_control& fc) -> input_data_type { 
       if (/*check some stop condition: EOF, etc.*/) { 
        fc.stop(); 
        return input_data_type(); // return dummy value 
       } 
       auto input_data = read_data(); 
       return input_data; 
      } 
     ) & 
     make_filter<input_data_type, manipulator1_output_type>(
      filter::parallel, // process data in parallel by the first manipulator 
      [](input_data_type elem) -> manipulator1_output_type { 
       auto processed_elem = manipulator1::process(elem); 
       return processed_elem; 
      } 
     ) & 
     make_filter<manipulator1_output_type, manipulator2_output_type>(
      filter::parallel, // process data in parallel by the second manipulator 
      [](manipulator1_output_type elem) -> manipulator2_output_type { 
       auto processed_elem = manipulator2::process(elem); 
       return processed_elem; 
      } 
     ) & 
     make_filter<manipulator2_output_type, void>(
      filter::serial_in_order, // visualize frame by frame 
      [](manipulator2_output_type elem) { 
       visualize(elem); 
      } 
     ) 
    ); 
    return 0; 
} 

前提是必需的功能(的read_data,可视化)被实现。这里input_data_type,manipulator1_output_type等是在流水线阶段之间传递的类型,并且操纵器的process函数对传递的参数进行必要的计算。

顺便说一句,为避免使用锁和其他同步原语工作,可以使用库中的concurrent_bounded_queue,并将输入数据放入此队列中,可能由不同的线程(例如专用于IO操作)组成,如concurrent_bounded_queue_instance.push(elem),以及然后通过input_data_type elem; concurrent_bounded_queue_instance.pop(elem)阅读。请注意,在这里弹出一个项目是一个阻塞操作。 concurrent_queue提供了非阻塞try_pop的替代方案。

另一种可能性是使用tbb::flow_graph及其节点来组织相同的流水线方案。看看描述dependencydata流程图的两个示例。您可能需要使用sequencer_node来正确排序项目执行(如有必要)。

这是值得阅读标记标记SO看看其他人如何使用这个库。

0

您是否实现了单线程版本?异形?

他们是关键步骤,W/O他们,你可以让你的高度并行设计的最佳实现只是认识到瓶颈是你的缓冲器和/或线程同步和/或假共享和I/O /或缓存未命中或类似问题。

我想先尝试一个简单的线程池,其任务顺序执行所有步骤。然后分析它的工作原理后,什么是CPU消耗等我会用更复杂的工具总是比较它们的表现与第一简版