我想在Flink中的每个节点上共享一个HashMap
,并允许节点更新该HashMap。我有这样的代码至今:如何将HashMap附加到Flink中的配置对象?
object ParallelStreams {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Is there a way to attach a HashMap to this config variable?
val config = new Configuration()
config.setClass("HashMap", Class[CustomGlobal])
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
class CustomGlobal extends ExecutionConfig.GlobalJobParameters {
override def toMap: util.Map[String, String] = {
new HashMap[String, String]()
}
}
class MyCoMap extends RichCoMapFunction[String, String, String] {
var users: HashMap[String, String] = null
//How do I get access the HashMap I attach to the global config here?
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
val globalConf = globalParams[Configuration]
val hashMap = globalConf.getClass
}
//Other functions to override here
}
}
我在想,如果你可以将自定义对象在这里val config = new Configuration()
创建config
变量? (请参阅上面代码中的注释)。
我注意到你只能附加原始值。我创建了一个自定义类,它扩展了ExecutionConfig.GlobalJobParameters
,并通过执行config.setClass("HashMap", Class[CustomGlobal])
附加了该类,但是我不确定是否应该这样做?
另一种方法是使用侧面输入。请参阅https://stackoverflow.com/a/45219889/3026310了解一些指针。 –