2017-10-06 53 views
1

我使用星火2.1 CDH集群使用Scala版本2.11.8星火2.1 UDF没有得到注册星火罐

我创建了一个UDF中的我的方法之一。

当我在Spark Shell Repl中执行该方法时,它工作得非常好。 但是,当使用Spark提交命令从Spark Jar调用方法时,相同的UDF无法注册。

下面是build.sbt内容:

name := "newtest" 
version := "0.0.2" 

scalaVersion := "2.10.5" 

sbtPlugin := true 

val sparkVersion = "2.1.0" 

mainClass in (Compile, run) := Some("com.testPackage.sq.newsparktest.Test") 

assemblyJarName in assembly := "newtest.jar" 

libraryDependencies ++= Seq( 
    "org.apache.spark" %% "spark-core" % sparkVersion % "provided", 
    "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", 
    "com.databricks" %% "spark-avro" % "3.2.0", 
    "org.apache.spark" %% "spark-hive" % "1.5.0" % "provided", 
    "com.amazonaws" % "aws-java-sdk" % "1.0.002" 
    ) 

libraryDependencies += 
    "log4j" % "log4j" % "1.2.15" excludeAll(
     ExclusionRule(organization = "com.sun.jdmk"), 
     ExclusionRule(organization = "com.sun.jmx"), 
     ExclusionRule(organization = "javax.jms") 
    ) 

resolvers += "SparkPackages" at "https://dl.bintray.com/spark-packages/maven/" 
resolvers += Resolver.url("bintray-sbt-plugins", url("http://dl.bintray.com/sbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) 

assemblyMergeStrategy in assembly := { 
case PathList("META-INF", xs @ _*) => MergeStrategy.discard 
case x => MergeStrategy.first 
}  

Plugins.SBT如下:

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.0.1") 
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") 

下面是导致该问题的代码片段:

val timestamp_diff = (endTime: Timestamp, startTime: Timestamp) => { 
    (endTime.getTime() - startTime.getTime()) 
} 
logger.info("Function Created") --------> **This works fine** 

    spark.udf.register("timestamp_diff", timestamp_diff) 


-----> **The above command causes error as below** 

以下是错误:

Exception in thread "main" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror; 
      at com.testPackage.sq.newsparktest$.main(newsparktest.scala:49) 
      at com.testPackage.sq.newsparktest.main(newsparktest.scala) 
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
      at java.lang.reflect.Method.invoke(Method.java:606) 
      at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) 
      at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) 
      at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) 
      at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) 
      at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

注:正如前面提到的,当REPL,而不是通过执行星火罐子

回答

0

这似乎是一个Scala的版本问题,直接执行失败的命令工作得很好。从Spark 2.1 docs

Spark runs on Java 7+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark 2.1.0 uses Scala 2.11. You will need to use a compatible Scala version (2.11.x).

因此,您可能需要使用Scala的一个新版本:

scalaVersion := "2.11.11" 

注意斯卡拉2.12.x尚未支持。