2016-08-12 50 views
0

我想写使用Scala的火花框架内联函数,将一个字符串输入,执行SQL语句并返回我一个字符串值执行火花SQL查询试图从UDF

val testfunc: (String=>String)= (arg1:String) => 
{val k = sqlContext.sql("""select c_code from r_c_tbl where x_nm = "something" """)        
k.head().getString(0) 
} 

我注册这个阶函数作为UDF

val testFunc_test = udf(testFunc) 

我有一个数据帧在蜂巢表

val df = sqlContext.table("some_table") 

然后我在withColumn中调用udf并尝试将其保存在新的数据框中。

val new_df = df.withColumn("test", testFunc_test($"col1")) 

但每次我试图做到这一点我得到一个错误

16/08/10 21:17:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,  10.0.1.5): java.lang.NullPointerException 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:41) 
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086) 
    at org.apache.spark.sql.DataFrame.foreach(DataFrame.scala:1434) 

我是比较新的火花和Scala。但我不知道为什么这个代码不应该运行。任何见解或工作将受到高度赞赏。

请注意,我没有粘贴整个错误堆栈。请让我知道是否需要。

回答

1

您的UDF中不能使用sqlContext - UDF必须是可序列化的才能发送给执行程序,并且上下文(可以认为是到群集的连接)不能被序列化并发送到节点 - 只有驱动程序应用程序(其中UDF是定义为,但不是执行)可以使用sqlContext

看起来像你的用例(从表Y中的每个记录的表X执行选择)最好通过使用join来完成。