2016-12-15 62 views
2
  • 我想根据时间戳创建消息的批量消息为 。
  • 此外,我想在固定时间批量这些消息 窗口(1分钟)。
  • 只有在窗口经过后,才会将批次 推送到下游。

对于这项工作,处理器API似乎或多或少配件(一拉KStream batch process windows):如何重新处理批次的卡夫卡流

public void process(String key, SensorData sensorData) { 
    //epochTime is rounded to our prefered time window (1 minute) 
    long epochTime = Sensordata.epochTime; 

    ArrayList<SensorData> data = messageStore.get(epochTime); 
    if (data == null) { 
     data = new ArrayList<SensorData>(); 
    } 

    data.add(sensorData); 
    messageStore.put(id, data); 
    this.context.commit(); 
} 

@Override 
public void init(ProcessorContext context) { 
    this.context = context; 
    this.context.schedule(60000); // equal to 1 minute 
} 

@Override 
public void punctuate(long streamTime) { 
    KeyValueIterator<String, ArrayList<SensorData>> it = messageStore.all(); 
    while (it.hasNext()) { 
     KeyValue<String, ArrayList<SensorData>> entry = it.next(); 
     this.context.forward(entry.key, entry.value); 
    } 
    //reset messageStore 
} 

然而,这种方法有一个主要缺点有:我们不使用卡夫卡流窗口。

  • 不考虑无序消息。
  • 当实时操作时,标点符号计划应等于所需的批处理时间窗口。如果我们将其设置得较短,那么批处理将被转发并且下游计算将快速开始。如果设置为long,并且在批处理窗口未完成时触发标点符号,则会出现同样的问题。
  • 此外,重播历史数据,同时保持标点时间表(1分钟)只会在1分钟后触发第一次计算。如果是这样,那将炸毁州立店并且感觉错误。

考虑到这些问题,我应该使用Kafka Streams窗口。但是,这只有在卡夫卡流DSL ...

任何这方面的toughts将是真棒。

回答

1

你可以混合和匹配DSL和使用process()transform(),或者transformValues() DSL内处理器API(有一些关于这个其他的SO问题已经如此,我不细说进一步)。因此,您可以将常规窗口结构与自定义(下游)运算符结合使用,以保持结果(和重复数据删除)。某些重复数据删除操作已经自动发生在您的窗口操作员内部(例如Kafka 0.10.1;请参阅http://docs.confluent.io/current/streams/developer-guide.html#memory-management),但是如果您只想得到一个结果,缓存将不会为您执行此操作。

关于标点符号:它是基于进度(即流时间)而不是基于挂钟时间触发的 - 所以如果您重新处理旧数据,如果将被称为与您中相同的次数原始运行(如果在处理旧数据时考虑挂钟时间,则相互之间快一点)。如果你想得到更多的细节,还有一些关于这个问题的问题。

但是,我一般考虑:为什么你只需要一个结果呢?如果您进行流处理,则可能需要构建下游消费者应用程序,以便能够处理更新结果。这是Kafka的固有设计:使用更新日志。

+0

我正在处理这个问题:在Windows上使用DSL,并让底层计算消费者在其批次尚未满时退出 – Raf