2015-08-15 75 views
4

我是带有spark的数据框的新手,有时候很奇怪。假设我有一个包含纬度和经度坐标的日志的数据框。带有Spark DataFrame的地理过滤器

LogsDataFrame.printSchema : 
root 
|-- lat: double (nullable = false) 
|-- lon: double (nullable = false) 
|-- imp: string (nullable = false) 
|-- log_date: string (nullable = true) 
|-- pubuid: string (nullable = true) 

在另一方面我有一个简单方法

within(lat : Double, long : Double, radius : Double) : Boolean 

,告诉如果LAT和经度是在预先定义的位置的一定的半径。

现在,我该如何过滤不满意的日志。我试图

logsDataFrame.filter(within(logsDF("lat"), logsDF("lon"), RADIUS)

不过,这并不推断双,而是它给回柱类型。 我如何得到这个工作? 火花网站中的文档有点简单,我相信我错过了一些东西。

谢谢你的帮助。

回答

6

一般而言,您至少需要两件事才能使其发挥作用。首先,你必须创建一个UDF包装within

import org.apache.spark.sql.functions.{udf, lit} 

val withinUDF = udf(within _) 

接下来,当UDF被调用时,半径应标记为文字:

df.where(withinUDF($"lat", $"long", lit(RADIUS))) 

因为不是每个类型都可以通过这种方式和创建包装和呼吁​​3210是相当繁琐,你可能更喜欢柯里:

def within(radius: Double) = udf((lat: Double, long: Double) => ???) 

df.where(within(RADIUS)($"lat", $"long")) 
+0

这是真棒,很好。我错过了文字部分。我肯定会用柯里重写代码。刚开始写Scala。谢谢。 – Eriksen