2016-03-07 51 views
-1

我创建了sparkUDF。当我在火花外壳上运行它时,它运行得非常好。但是,当我注册并使用我的sparkSQL查询时,它会给出NullPointerException。Scala UDF在Spark shell上运行良好,但在sparkSQL中使用它时会产生NPE


阶>test_proc( “1605”, “(@ SUPP在(-1,118)”)

16/03/07 10时35分04秒INFO TaskSetManager:成品任务0.0在阶段21.0(TID 220)在62 ms上cdts1hdpdn01d.rxcorp.com(1/1)

16/03/07 10:35:04信息YarnScheduler:从池中删除TaskSet 21.0,其任务已完成

16/03/07 10:35:04 INFO DAGSch eduler:ResultStage 21(第一位置:45)在0.062小号16/03/07 10时35分04秒INFO DAGScheduler完成:作业完成16:首先在45,把2.406408小号

res14:INT = 1

斯卡拉>


但当我注册,并在我的sparkSQL查询中使用它,它给NPE。

阶>sqlContext.udf.register( “store_proc”,test_proc _)

阶>hiveContext.sql(“选择store_proc( '1605',“(@supp在( - 1118)')“)first.getInt(0)

16/03/07 10时37分58秒INFO ParseDriver:解析命令:SELECT store_proc( '1605',“(@supp在(-1,118 )')16/03/07 10:37:58 INFO ParseDriver:Parse完成16/03/07 10:37:58信息SparkContext:开始工作:第一次:24

16/03/07 10:37:58信息DAGScheduler:得到了17个(第一个在24),有1个输出分区16/03/07 10:37:58 INFO DAGScheduler:最后阶段:ResultStage 22(first在:24)16/03/07 10时37分58秒INFO DAGScheduler:)名单(

16/03/07 10时37分58秒INFO DAGScheduler:最后阶段的家长缺少父母:列表()

16/03/07 10时37分58秒INFO DAGScheduler:提交ResultStage 22(MapPartitionsRDD [86]在第一位置:24),它没有丢失父母

16/03/07 10时37分58秒INFO MemoryStore:ensureFreeSpace(10520)使用curMem = 1472899调用,maxMem = 2222739947

16/03/07 10点37分58秒INFO MemoryStore的:块broadcast_30存储在存储器(估计大小10.3 KB,免费2.1 GB)

16/03/07 10点37分58秒INFO MemoryStore的值:ensureFreeSpace(4774)称为与curMem = 1483419,MAXMEM = 2222739947

16/03/07 10点37分58秒INFO MemoryStore的:块broadcast_30_piece0存储在内存中的字节(估计大小4.7 KB,免费2.1 GB)

16/03/07 10:37:58信息BlockManagerInfo:在162.44.214.87:47564的内存中添加了broadcast_30_piece0(大小:4.7 KB,免费:2。1 GB)

16/03/07 10时37分58秒INFO SparkContext:创建从广播广播30在DAGScheduler.scala:861

16/03/07 10时37分58秒INFO DAGScheduler:提交1 (在第一MapPartitionsRDD [86]在:24)从ResultStage 22缺少任务

16/03/07 10时37分58秒INFO YarnScheduler:添加任务设置22.0 1个任务

16/03/07 10 :37:58 INFO TaskSetManager:在阶段22.0(TID 221,cdts1hdpdn02d.rxcorp.com,分区0,PROCESS_LOCAL,2155字节)中启动任务0.0

16/03/07 10:37:58 INFO BlockManagerInfo:在cdts1hdpdn02d.rxcorp.com:33678(大小:4.7 KB,免费:6.7 GB)内存中添加broadcast_30_piece0 16/03/07 10:37:58 WARN TaskSetManager:在阶段22.0失落任务0.0(TID 221,cdts1hdpdn02d.rxcorp.com):显示java.lang.NullPointerException

在org.apache.spark.sql.hive.HiveContext.parseSql(HiveContext.scala:291 )at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)at $ line20。$ read $ iwC $ iwC $ iwC $ iwC $ iwC $ iwC $ iwC $ iwC.test_proc(:41)


这是s充足我的 'test_proc' 的:

DEF test_proc(X:字符串,Y:字符串):INT = {

VAL hiveContext =新org.apache.spark.sql.hive.HiveContext(SC )

VAL Z:INT = hiveContext.sql( “选择7”)first.getInt(0)

返回ž }

+2

链接似乎需要注册,所以你最好在这里提供细节。这是一件好事,无论如何:) –

+0

嘿,哥们,当然。我已更新。 –

回答

2

基于从独立的输出调用它看起来像。正在执行某种Spark动作,并且这不能在UDF内工作,因为Spark不支持分布式数据结构上的嵌套操作。如果test_proc使用SQLContext这将导致NPP,因为Spark上下文仅存在于驱动程序中。

如果出现这种情况,您将使用本地(最有可能播出的)变量或joins重构您的代码以达到预期的效果。

+0

嗨,亲爱的,谢谢你的回复。这是我的test_proc的一个例子,可能你可以帮助我更多地了解: def test_proc(x:Int,y:Int):Int = { val hiveContext = new org.apache.spark.sql.hive .HiveContext(SC); val z:Int = hiveContext.sql(“select 7”)。first.getString(0); return z} –

+0

我补充说明。 – zero323

+0

非常感谢您的回复。 基本上我试图为我的仓库写一个商店proc。该存储过程包含多个变量来存储多个查询的输出,并进行一些操作并返回一个数字作为输出。我使用这个输出到一个事实表中的文件管理器行。 所以我不能在我的查询中使用这个UDF加入。一个解决方案,我觉得我应该把我的sparkcontext传递给我的UDF,它可能会工作。 您是否知道这样的用例,我们可以将sparkcontext作为参数传递给UDF? –

相关问题