2016-09-26 280 views
4

我的要求是在一天内流传数百万条记录,并且它对外部配置参数具有巨大的依赖性。例如,用户可以随时在Web应用程序中更改所需的设置,并且在更改完成之后,流必须随新应用程序配置参数一起发生。这些是应用程序级别的配置,我们也有一些动态排除参数,每个数据必须通过并过滤。Flink:如何处理flink中的外部应用程序配置更改

我看到flink没有在所有任务管理器和子任务之间共享的全局状态。有一个集中的缓存是一个选项,但对于每个参数,我都必须从缓存中读取它,这会增加延迟。请提供更好的方法来处理这些情况以及其他应用程序如何处理它。谢谢。

回答

3

更新正在运行的流式应用程序的配置是常见要求。在Flink的DataStream API中,这可以使用处理两个输入流的所谓的CoFlatMapFunction完成。其中一个流可以是数据流,另一个是控制流。

以下示例显示如何动态调整过滤超出特定长度的字符串的用户函数。

val data: DataStream[String] = ??? 
val control: DataStream[Int] = ??? 

val filtered: DataStream[String] = data 
    // broadcast all control messages to the following CoFlatMap subtasks 
    .connect(control.broadcast) 
    // process data and control messages 
    .flatMap(new DynLengthFilter) 


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] { 

    var length = 0 

    // filter strings by length 
    override def flatMap1(value: String, out: Collector[String]): Unit = { 
    if (value.length < length) { 
     out.collect(value) 
    } 
    } 

    // receive new filter length 
    override def flatMap2(value: Int, out: Collector[String]): Unit = { 
    length = value 
    } 

    override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length 

    override def restoreState(state: Int): Unit = { 
    length = state 
    } 
} 

DynLengthFilter用户函数执行Checkpointed接口用于过滤器的长度。如果发生故障,该信息会自动恢复。