2017-07-17 96 views
0

我无法选择适当的窗口功能/分配器。任务如下。首先,我从具有request_id和一些数据的Source获取数据,并对外部数据库执行异步请求。Apache Flink异步请求和窗口

// Here String is for request_id, Data is for treated data 
DataStream Tuple2<String, Data> stream = ... 

// async I/O queries 
DataStream<Tuple2<String, String>> resultStream = 
AsyncDataStream.unorderedWait(
    stream, 
    new AsyncDatabaseRequest(), 
    1000, 
    TimeUnit.MILLISECONDS, 
    100 
); 

现在我想通过request_id收集所有数据并进行一些计算。

DataStream Tuple2<String, Integer> = result 
    .map(val -> new Tuple2<String, Integer>(val.f0, val.f1.data_int)) 
    .keyBy(0) 
    .window(...) 
    .sum(1); 

问题是窗口函数。我需要每个窗口包含具有相同request_id的所有数据点,但异步查询的时间可能从毫秒到分钟不等。另一方面,我需要低延迟,所以我不能使用ProcessingTimeSessionWindows.withGap(Time.minutes(10))。我需要在从异步函数获取最后一个数据后立即执行计算。

对我来说最好的方法是使用异步函数的窗口水印,当然每个查询都会完成,以及它如何指向它。这是可能的,这样的任务的最佳做法是什么?

回答

0

那么,我找到了解决方案,它似乎很容易。 我只使用EventTime。在我的源功能我产生事件时间戳以及水印如下:

Long ts = System.currentTimeMillis(); 
ctx.collectWithTimestamp(data, ts); 
ctx.emitWatermark(new Watermark(ts + 1)); 

在河流流量我用EVENTTIME功能:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
DataStream<...> dataStream = ...; 

DataStream<...> newStream = dataStream 
    .keyBy(0) 
    .timeWindow(Time.milliseconds(1)) 
    .reduce(new Reducer()); 

这样,我避免超时,结果立即准备。