2017-07-14 58 views
0

这是我的问题,我有一个地图Map[Array[String],String],我想将它传递给UDF。如何在火花中将地图传入UDF

这里是我的UDF:

def lookup(lookupMap:Map[Array[String],String]) = 
    udf((input:Array[String]) => lookupMap.lift(input)) 

这里是我的地图变量:

val srdd = df.rdd.map { row => (
    Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString), 
    row.getString(7) 
)} 

这里是我如何调用该函数:

val combinedDF = dftemp.withColumn("a",lookup(lookupMap))(Array($"b",$"c","d")) 

我第一次得到了一个错误关于不可变数组,所以我将数组更改为不可变类型,然后发生了有关类型不匹配的错误。我google了一下,显然我不能直接将非列类型传递给UDF。有人可以帮忙吗?荣誉。


更新:所以我所做的一切转换成包裹阵列。下面是我做的:

val srdd = df.rdd.map{row => (WrappedArray.make[String](Array(row.getString(1),row.getString(5),row.getString(8))),row.getString(7))} 

val lookupMap = srdd.collectAsMap() 


def lookup(lookupMap:Map[collection.mutable.WrappedArray[String],String]) = udf((input:collection.mutable.WrappedArray[String]) => lookupMap.lift(input)) 


val combinedDF = dftemp.withColumn("a",lookup(lookupMap))(Array($"b",$"c",$"d")) 

现在我有这样的错误:

required: Map[scala.collection.mutable.WrappedArray[String],String] -ksh: Map[scala.collection.mutable.WrappedArray[String],String]: not found [No such file or directory]

我试图做这样的事情:

val m = collection.immutable.Map(1->"one",2->"Two") 
val n = collection.mutable.Map(m.toSeq: _*) 

但后来我刚回来到列类型的错误。

回答

1

首先,您必须通过一个Column作为UDF的参数;既然你希望这个参数是一个数组,你应该使用org.apache.spark.sql.functions中的array函数,该函数从一系列其他列中创建一个数组Column。因此,UDF调用将是:现在

lookup(lookupMap)(array($"b",$"c",$"d")) 

,因为阵列反序列化为mutable.WrappedArray,为了使地图查找接替你最好确保这是你的UDF中使用的类型:

def lookup(lookupMap: Map[mutable.WrappedArray[String],String]) = 
    udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input)) 

所以共:

import spark.implicits._ 
import org.apache.spark.sql.functions._ 

// Create an RDD[(mutable.WrappedArray[String], String)]: 
val srdd = df.rdd.map { row: Row => (
    mutable.WrappedArray.make[String](Array(row.getString(1), row.getString(5), row.getString(8))), 
    row.getString(7) 
)} 

// collect it into a map (I assume this is what you're doing with srdd...) 
val lookupMap: Map[mutable.WrappedArray[String], String] = srdd.collectAsMap() 

def lookup(lookupMap: Map[mutable.WrappedArray[String],String]) = 
    udf((input: mutable.WrappedArray[String]) => lookupMap.lift(input)) 

val combinedDF = dftemp.withColumn("a",lookup(lookupMap)(array($"b",$"c",$"d"))) 
+0

如何创建一个包裹数组?我做了val srdd = df.rdd.map {row =>(WrappedArray(row.getString(1),row.getString(5),row.getString(8)),row。getString(7))}但它告诉我包裹数组不参数 – Anna

+0

您可以使用'mutable.WrappedArray.make [String](Array(...))' - 查看更新的答案 –

+0

我收到另一个错误,错误:类型不匹配; found:scala.collection.Map [scala.collection.mutable.WrappedArray [String],String] required:scala.collection.immutable.Map [scala.collection.mutable.WrappedArray [String],String] 请参阅我的更新 – Anna

0

安娜您的srdd/lookupmap代码类型的org.apache.spark.rdd.RDD [(阵列[字符串],字符串)]

val srdd = df.rdd.map { row => (
Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString), 
    row.getString(7) 
)} 

凡在查找方法,你期待一个Map作为参数

def lookup(lookupMap:Map[Array[String],String]) = 
udf((input:Array[String]) => lookupMap.lift(input)) 

这就是为什么你越来越类型不匹配错误的原因。

首先将RDD [元组]中的srdd作为RDD [Map],然后尝试将RDD转换为Map来解决此错误。

val srdd = df.rdd.map { row => Map(
Array(row.getString(1),row.getString(5),row.getString(8)).map(_.toString) -> 
    row.getString(7) 
)} 
+0

嗨,谢谢,但我做到了这一点:srdd.collectAsMap(),所以我认为它已经是一个地图类型? – Anna

+0

是的,我现在可以在你的帖子中编辑之后将它转换为地图,但是我根据你的旧帖子添加了我的评论,该帖子没有srdd作为地图。 –