2017-08-10 98 views
0

我有一个自定义的udf并在spark中注册。如果我尝试访问该UDF,则会抛出error.Unable访问。火花中的UDF使用情况

我试过这样。

spark.udf.register("rssi_weightage", FilterMap.rssi_weightage) 
val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(rssi_weightage($"RSSI").as("RSSI_Weight"))) 

显示错误在第一(rssi_weightage($ “RSSI”)// rssi_weightage没有发现错误

任何帮助将不胜感激。

+0

我的解决方案能解决您的问题吗?如果是,请接受答案 –

回答

2

这不是你如何使用UDF,实际UDF是spark.udf.register返回值那么你可以做:

val udf_rssii_weightage = spark.udf.register("rssi_weightage", FilterMap.rssi_weightage) 

val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(udf_rssi_weightage($"RSSI")).as("RSSI_Weight")) 

但在你的情况,你不需要注册UDF,只是用org.apache.spark.sql.functions.udf转换一个单组LAR函数的UDF:

val udf_rssii_weightage = udf(FilterMap.rssi_weightage) 
+0

感谢您的解决方案... –

+0

好一个:) @Raphael,值得赞赏 –

1

我想你有你定义的UDF功能, 下一个快照在公告UDF稍微不同的方法方式的问题 - 它直接定义函数: 进口org.apache.spark.sql.functions._

val data = sqlContext.read.json(sc.parallelize(Seq("{'foo' : 'Bar'}", "{'foo': 'Baz'}"))) 

val example = Seq("Bar", "Bazzz") 
val urbf = udf { foo: String => if (example.contains(example)) 1 else 0 } 

data.select($"foo", urbf($"foo")).show 

+--------+-------------+ 
| foo |UDF(foo)  | 
+--------+-------------+ 
| Bar |   1| 
| Bazzz |   0| 
+--------+-------------+ 
+0

感谢您的解决方案... –