所以我有N个异步的,有时间戳的数据流。每个流都有一个固定的速率。我想处理所有的数据,但问题在于我必须按照尽可能接近数据的时间(它是一个实时流应用程序)处理数据。对N个数据流进行时间排序的算法
到目前为止,我的实现是创建一个K消息的固定窗口,我使用优先级队列按时间戳排序。然后,我按顺序处理整个队列,然后进入下一个窗口。这是可以的,但它并不理想,因为它会产生与缓冲区大小成正比的延迟,并且如果在缓冲区处理结束后到达消息,有时也会导致丢失消息。它看起来是这样的:
// Priority queue keeping track of the data in timestamp order.
ThreadSafeProrityQueue<Data> q;
// Fixed buffer size
int K = 10;
// The last successfully processed data timestamp
time_t lastTimestamp = -1;
// Called for each of the N data streams asyncronously
void receiveAsyncData(const Data& dat) {
q.push(dat.timestamp, dat);
if (q.size() > K) {
processQueue();
}
}
// Process all the data in the queue.
void processQueue() {
while (!q.empty()) {
const auto& data = q.top();
// If the data is too old, drop it.
if (data.timestamp < lastTimestamp) {
LOG("Dropping message. Too old.");
q.pop();
continue;
}
// Otherwise, process it.
processData(data);
lastTimestamp = data.timestamp;
q.pop();
}
}
关于数据的信息:他们一定会被自己的流中进行排序。他们的利率介于5至30赫兹之间。它们由图像和其他数据组成。
一些为什么这比它看起来更难的例子。假设我有两个流,A和B在1赫兹都在运行,我得到的数据按以下顺序:
(stream, time)
(A, 2)
(B, 1.5)
(A, 3)
(B, 2.5)
(A, 4)
(B, 3.5)
(A, 5)
查看如何,如果我处理的时候,我收到他们的订单数据,B会总是下降?这就是我想要避免的。现在在我的算法中,B每隔10帧就会丢弃一次,而我将以10帧的滞后时间处理数据。
您的应用程序是多线程的吗? (如果没有,为什么不?) – rici
是的,但接收端必须按顺序处理数据。我不会指定具体的应用程序,但您可以将其想象为像N个流式视频源一样以时间同步方式绘制到屏幕上的东西。 – mklingen
因此,每个来源都能保证产生有序的流? – rici