2016-03-07 66 views
1

我想在Spark SQL作业中使用DSL而不是纯SQL,但是我无法获得我的UDF工作。udf在Spark SQL中DSL

sqlContext.udf.register("subdate",(dateTime: Long)=>dateTime.toString.dropRight(6)) 

这不起作用

rdd1.toDF.join(rdd2.toDF).where("subdate(rdd1(date_time)) === subdate(rdd2(dateTime))") 

我还想添加其他参加就像这个工作纯粹SQL条件

val results=sqlContext.sql("select * from rdd1 join rdd2 on rdd1.id=rdd2.idand subdate(rdd1.date_time)=subdate(rdd2.dateTime)") 

感谢您的帮助

回答

2

SQL您传递给where方法的表达式不正确,至少有几个原因:

  • ===是一个Column方法不是有效的SQL等式。您应该使用单个等号=
  • 括号表示法(table(column))不是引用SQL中的列的有效方法。在这种情况下,它将被识别为函数调用。 SQL使用点符号(table.column
  • 即使它既不rdd1也不rdd2是有效的表的别名

因为它看起来像列名是明确的,你可以简单地使用下面的代码:

df1.join(df2).where("subdate(date_time) = subdate(dateTime)") 

如果不是这种情况,使用点语法不会首先提供别名。例如参见Usage of spark DataFrame "as" method

此外,注册UDF通常在您始终使用原始SQL时很有意义。如果你想使用DataFrame API,最好是直接使用UDF:

import org.apache.spark.sql.functions.udf 

val subdate = udf((dateTime: Long) => dateTime.toString.dropRight(6)) 

val df1 = rdd1.toDF 
val df2 = rdd2.toDF 

df1.join(df2, subdate($"date_time") === subdate($"dateTime")) 

,或者如果列名是模棱两可:

df1.join(df2, subdate(df1("date_time")) === subdate(df2("date_time"))) 

最后,对于简单的功能这样最好是组成内置表达式比创建UDF。

+0

非常感谢。通过编写内置表达式,你的意思是什么?从sql.Column包中使用“substr”类似的函数? – vgkowski

+0

或多或少。这里有一些微妙的东西(并不是每个函数都是用表达式来实现的),但是不要纠缠于此。如果这有帮助,请不要感谢 - 只接受和/或upvote :) – zero323