2016-11-28 38 views
0

ACCES一个Wrappedarray我有这样一个数据帧:如何从数据帧的地图

+------+------------------------------------------------------------------------------+ 
|myKeys|myMaps                  | 
+------+------------------------------------------------------------------------------+ 
|b  |Map(b -> WrappedArray([1,o], [4,xxx]), a -> WrappedArray([1,o], [1,n], [1,n]))| 
|a  |Map(b -> WrappedArray([1,o], [4,n]), a -> WrappedArray([4,c], [1,n], [1,n])) | 
|a  |Map(b -> WrappedArray([4,o], [3,n]), a -> WrappedArray([4,o], [1,n], [1,n])) | 
|b  |Map(b -> WrappedArray([4,a], [3,n]), a -> WrappedArray([1,o], [1,n], [1,n])) | 
+------+------------------------------------------------------------------------------+ 

有了这个架构

root 
|-- myKeys: string (nullable = false) 
|-- myMaps: map (nullable = true) 
| |-- key: string 
| |-- value: array (valueContainsNull = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- _1: string (nullable = true) 
| | | |-- _2: string (nullable = true) 

下面是创建它的代码:

val x = sc.parallelize(Seq(
     Array(("a", "1", "o"), ("a", "1", "n"), ("b", "1", "o"), ("a", "1", "n"), ("b", "4", "xxx")), 
     Array(("a", "1", "o"), ("a", "1", "n"), ("b", "1", "o"), ("a", "1", "n"), ("b", "4", "n")), 
     Array(("a", "1", "o"), ("a", "1", "n"), ("b", "4", "o"), ("a", "1", "n"), ("b", "3", "n")), 
     Array(("a", "1", "o"), ("a", "1", "n"), ("b", "4", "o"), ("a", "1", "n"), ("b", "3", "n")) 
    )).map(x => testSchema(x)).toDF("myArrays") 


val y = x.withColumn("myKeys", lit("b")) 

val getMap = udf((mouvements: mutable.WrappedArray[Row]) => { 
    val test = mouvements.toArray 
    .map(line => (line(0).toString, line(1).toString, line(2).toString)) 
    .groupBy(_._1) 
    .map{case (k,values) => k -> values.map(x => (x._2, x._3))} 
    test}) 


val df_with_map = y.select($"myKeys", getMap($"myArrays") as "myMaps") 
df_with_map show false 
df_with_map printSchema 

现在,我想访问我的数组的第二个元素,它的第一个元素等于4,地图等值线的关键字为b。我应该有这样

+---+ 
|val| 
+---+ 
|xxx| 
|c | 
|o | 
|a | 
+---+ 

结果我已经尝试这种与此UDF这样做:

val getMyValue = udf{(myKey: String, myMaps: Map[String, WrappedArray[Row]]) => 

    val first_val= "4" 
    val myArrays = myMaps.get(myKey) 
    val res = myArrays.get.toArray.filter{x => x.getString(0) == first_val} 
    res 
} 

val df_value = df_with_map.select(getMyValue($"myKey",$"myMaps") as "myValue") 
df_value show false 
df_value printSchema 

但它返回错误

java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported 

一个行:

val getMyValue = udf{(myKey: String, myMaps: Map[String, WrappedArray[Row]]) => 

你有什么想法吗?

回答

4

用途:

val first_val = "4" 
val df = Seq(
    ("b", Map("b" -> Seq(("1", "o"), ("4", "xxx")))) 
).toDF("myKeys", "myMaps") 

root 
|-- myKeys: string (nullable = true) 
|-- myMaps: map (nullable = true) 
| |-- key: string 
| |-- value: array (valueContainsNull = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- _1: string (nullable = true) 
| | | |-- _2: string (nullable = true) 
df.select($"myMaps".getItem("b")) 
    .as[Seq[(String, String)]] 
    .flatMap(xs => xs.filter(_._1 == first_val).map(_._2)) 

编辑

df.as[(String, Map[String,Seq[(String, String)]])].flatMap { 
    case (key, map) => 
    map.getOrElse(key, Seq[(String, String)]()).filter(_._1 == first_val).map(_._2) 
} 
+0

尼斯它的工作,但我觉得我的例子是太简单了。我遇到的问题是我想从中获取价值的关键是在select之前不知道,所以我不能调用getItem方法。我将redit的问题,使其更详尽 –

+0

我更新我的文章,以更好地解释我的问题 –

+0

当我想调用你的方法在我的DataFrame它返回: org.apache.spark.sql.AnalysisException:无法解析'由于数据类型不匹配,cast(myKeys为map >>):无法将StringType转换为MapType(StringType,ArrayType(StructType(StructField(_1,StringType,true) ,StructField(_2,StringType,true)),true),true); 所以它适用于您的示例,但不适用于我的示例。 –