- 我想根据时间戳创建消息的批量消息为 。
- 此外,我想在固定时间批量这些消息 窗口(1分钟)。
- 只有在窗口经过后,才会将批次 推送到下游。
对于这项工作,处理器API似乎或多或少配件(一拉KStream batch process windows):如何重新处理批次的卡夫卡流
public void process(String key, SensorData sensorData) {
//epochTime is rounded to our prefered time window (1 minute)
long epochTime = Sensordata.epochTime;
ArrayList<SensorData> data = messageStore.get(epochTime);
if (data == null) {
data = new ArrayList<SensorData>();
}
data.add(sensorData);
messageStore.put(id, data);
this.context.commit();
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(60000); // equal to 1 minute
}
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, ArrayList<SensorData>> it = messageStore.all();
while (it.hasNext()) {
KeyValue<String, ArrayList<SensorData>> entry = it.next();
this.context.forward(entry.key, entry.value);
}
//reset messageStore
}
然而,这种方法有一个主要缺点有:我们不使用卡夫卡流窗口。
- 不考虑无序消息。
- 当实时操作时,标点符号计划应等于所需的批处理时间窗口。如果我们将其设置得较短,那么批处理将被转发并且下游计算将快速开始。如果设置为long,并且在批处理窗口未完成时触发标点符号,则会出现同样的问题。
- 此外,重播历史数据,同时保持标点时间表(1分钟)只会在1分钟后触发第一次计算。如果是这样,那将炸毁州立店并且感觉错误。
考虑到这些问题,我应该使用Kafka Streams窗口。但是,这只有在卡夫卡流DSL ...
任何这方面的toughts将是真棒。
我正在处理这个问题:在Windows上使用DSL,并让底层计算消费者在其批次尚未满时退出 – Raf