2017-07-06 94 views
1

所以我有N个异步的,有时间戳的数据流。每个流都有一个固定的速率。我想处理所有的数据,但问题在于我必须按照尽可能接近数据的时间(它是一个实时流应用程序)处理数据。对N个数据流进行时间排序的算法

到目前为止,我的实现是创建一个K消息的固定窗口,我使用优先级队列按时间戳排序。然后,我按顺序处理整个队列,然后进入下一个窗口。这是可以的,但它并不理想,因为它会产生与缓冲区大小成正比的延迟,并且如果在缓冲区处理结束后到达消息,有时也会导致丢失消息。它看起来是这样的:

// Priority queue keeping track of the data in timestamp order. 
ThreadSafeProrityQueue<Data> q; 
// Fixed buffer size 
int K = 10; 
// The last successfully processed data timestamp 
time_t lastTimestamp = -1; 

// Called for each of the N data streams asyncronously 
void receiveAsyncData(const Data& dat) { 
    q.push(dat.timestamp, dat); 
    if (q.size() > K) { 
     processQueue(); 
    } 
} 

// Process all the data in the queue. 
void processQueue() { 
    while (!q.empty()) { 
     const auto& data = q.top(); 
     // If the data is too old, drop it. 
     if (data.timestamp < lastTimestamp) { 
      LOG("Dropping message. Too old."); 
      q.pop(); 
      continue; 
     } 
     // Otherwise, process it. 
     processData(data); 
     lastTimestamp = data.timestamp; 
     q.pop(); 
    } 
} 

关于数据的信息:他们一定会被自己的流中进行排序。他们的利率介于5至30赫兹之间。它们由图像和其他数据组成。

一些为什么这比它看起来更难的例子。假设我有两个流,A和B在1赫兹都在运行,我得到的数据按以下顺序:

(stream, time) 
(A, 2) 
(B, 1.5) 
(A, 3) 
(B, 2.5) 
(A, 4) 
(B, 3.5) 
(A, 5) 

查看如何,如果我处理的时候,我收到他们的订单数据,B会总是下降?这就是我想要避免的。现在在我的算法中,B每隔10帧就会丢弃一次,而我将以10帧的滞后时间处理数据。

+0

您的应用程序是多线程的吗? (如果没有,为什么不?) – rici

+0

是的,但接收端必须按顺序处理数据。我不会指定具体的应用程序,但您可以将其想象为像N个流式视频源一样以时间同步方式绘制到屏幕上的东西。 – mklingen

+0

因此,每个来源都能保证产生有序的流? – rici

回答

0

我会建议一个生产者/消费者结构。让每个流都将数据放入队列中,并有一个单独的线程读取队列。那就是:

// your asynchronous update: 
void receiveAsyncData(const Data& dat) { 
    q.push(dat.timestamp, dat); 
} 

// separate thread that processes the queue 
void processQueue() 
{ 
    while (!stopRequested) 
    { 
     data = q.pop(); 
     if (data.timestamp >= lastTimestamp) 
     { 
      processData(data); 
      lastTimestamp = data.timestamp; 
     } 
    } 
} 

这可以防止您在处理批次时在当前实施中看到的“滞后”。

processQueue功能在一个单独的,持久的线程上运行。 stopRequested是程序在关闭时设置的标志 - 强制线程退出。有些人会为此使用volatile标志。我更喜欢使用类似手动重置事件的东西。

为了使这项工作,你需要一个优先级队列实现,允许并发更新,或者你需要一个同步锁来包装你的队列中。特别是,当队列为空时,您要确保q.pop()等待下一个项目。或者,当队列为空时,您永远不会拨打q.pop()。我不知道你的ThreadSafePriorityQueue的具体细节,所以我不能确切地说你会怎么写。

时间戳检查仍然是必要的,因为稍后的项目可能会在先前的项目之前被处理。例如:

  1. 从数据流1收到的事件,但线程在可以添加到队列之前被换出。
  2. 从数据流2接收到的事件,并将其添加到队列中。
  3. 来自数据流2的事件由processQueue函数从队列中移除。
  4. 上述步骤1的线程获取另一个时间片并将项添加到队列中。

这是不寻常的,只是罕见。时差通常会在几微秒左右。

如果您经常无法获得更新,那么您可以引入人为延迟。例如,在更新后的问题中,您将显示消息以500毫秒的顺序发送。假设500毫秒是您想要支持的最大容差。也就是说,如果一条消息晚了500多毫秒,它就会被丢弃。

当您将事物添加到优先级队列时,您所做的是将500毫秒添加到时间戳。那就是:

q.push(AddMs(dat.timestamp, 500), dat); 

而且在处理事物的循环,你不离队的时间戳之前的东西。喜欢的东西:

while (true) 
{ 
    if (q.peek().timestamp <= currentTime) 
    { 
     data = q.pop(); 
     if (data.timestamp >= lastTimestamp) 
     { 
      processData(data); 
      lastTimestamp = data.timestamp; 
     } 
    } 
} 

这引入了所有项目的处理的500毫秒的延迟,但会阻止下降落在500毫秒的阈值内的“迟到”的更新。您必须平衡您对“实时”更新的渴望与防止更新丢失的愿望。

+0

对不起,但这正是我写的。这会一直放弃不同步的消息。将用一个为什么比看起来更难的例子来修改我的问题。 – mklingen

+0

@mklingen:查看我的更新。 –

+0

人工延迟是完全正确的方式,并解决我的问题。 – mklingen

0

总会有一个滞后,而这个滞后将取决于您愿意等待最慢的“固定速率”流的时间。

建议:

  1. 保持缓冲器
  2. 保持布尔标志的阵列与所述含义:“如果位置IX是真实的,在缓冲器中有至少一个样品源自流IX”
  3. 排序/过程,只要你拥有所有标志设置为true

不是完整的证明(每个缓冲器进行排序,而是从一个缓冲到另一个你可能有时间戳反转),但也许不够好?

利用“满意”标志的计数来触发处理(在步骤3)可以用于使滞后更小,但是存在更多的缓冲器间时间戳反转的风险。在极端情况下,只接受一个满意的标记就意味着“一收到它就立即推送一个框架,时间戳分类将被诅咒”。
我提到这个以支持我的观点,即延迟/时间戳倒数平衡是您的问题所固有的 - 除了绝对相等的帧速率,将会有完美的解决方案,其中一方不会被牺牲。由于“解决方案”将是一种平衡行为,因此任何解决方案都需要收集/使用额外的信息来帮助做出决定(例如“标志阵列”)。如果我的建议对你的案例听起来很愚蠢(很可能,你选择分享的细节不是太多),开始考虑哪些指标与你的目标“体验质量”水平相关,并使用额外的数据结构帮助收集/处理/使用这些指标。

+0

我认为这是最接近正确的答案。将尝试它,让你知道它是如何去。 – mklingen

相关问题