2017-07-20 109 views
1

我想在Flink中的每个节点上共享一个HashMap,并允许节点更新该HashMap。我有这样的代码至今:如何将HashMap附加到Flink中的配置对象?

object ParallelStreams { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    //Is there a way to attach a HashMap to this config variable? 
    val config = new Configuration() 
    config.setClass("HashMap", Class[CustomGlobal]) 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

    class CustomGlobal extends ExecutionConfig.GlobalJobParameters { 
     override def toMap: util.Map[String, String] = { 
     new HashMap[String, String]() 
     } 
    } 

    class MyCoMap extends RichCoMapFunction[String, String, String] { 
     var users: HashMap[String, String] = null 
     //How do I get access the HashMap I attach to the global config here? 
     override def open(parameters: Configuration): Unit = { 
     super.open(parameters) 
     val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters 
     val globalConf = globalParams[Configuration] 
     val hashMap = globalConf.getClass 

     } 
     //Other functions to override here 
    } 
} 

我在想,如果你可以将自定义对象在这里val config = new Configuration()创建config变量? (请参阅上面代码中的注释)。

我注意到你只能附加原始值。我创建了一个自定义类,它扩展了ExecutionConfig.GlobalJobParameters,并通过执行config.setClass("HashMap", Class[CustomGlobal])附加了该类,但是我不确定是否应该这样做?

回答

1

向运算符分配参数的常用方法是将它们作为函数类中的常规成员变量。在计划构建过程中创建和分配的函数对象被序列化并发送给所有工作人员。所以你不必通过配置传递参数。

这将如下所示

class MyMapper(map: HashMap) extends MapFunction[String, String] { 
// class definition 
} 


val inStream: DataStream[String] = ??? 

val myHashMap: HashMap = ??? 
val myMapper: MyMapper = new MyMapper(myHashMap) 
val mappedStream: DataStream[String] = inStream.map(myMapper) 

myMapper对象序列化(使用Java序列化)和运执行。所以map的类型必须实现Java Serializable接口。

编辑:我错过了你希望地图可以从所有并行任务中更新的部分。这对Flink来说是不可能的。您必须完全复制地图并全部更新(通过广播)或使用外部系统(键值存储)。

+0

另一种方法是使用侧面输入。请参阅https://stackoverflow.com/a/45219889/3026310了解一些指针。 –

相关问题