2016-04-28 91 views
0

我试图运行在飞艇的folloiwng简单的代码:飞艇:muptiple SparkContexts问题

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.{Logging, SparkConf, SparkContext} 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.DStream 

System.clearProperty("spark.driver.port") 
System.clearProperty("spark.hostPort") 

def maxWaitTimeMillis: Int = 20000 
def actuallyWait: Boolean = false 

val conf = new SparkConf().setMaster("local[2]").setAppName("Streaming test") 
var sc = new SparkContext(conf) 

def batchDuration: Duration = Seconds(1) 
val ssc = new StreamingContext(sc, batchDuration) 

这是齐柏林输出:

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.{Logging, SparkConf, SparkContext} 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.DStream 
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD 
import org.apache.spark.mllib.regression.LabeledPoint 
calculateRMSE: (output: org.apache.spark.streaming.dstream.DStream[(Double, Double)], n: org.apache.spark.streaming.dstream.DStream[Long])Double 
res50: String = null 
res51: String = null 
maxWaitTimeMillis: Int 
actuallyWait: Boolean 
conf: org.apache.spark.SparkConf = [email protected] 
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: 
org.apache.spark.SparkContext.<init>(SparkContext.scala:82) 
org.apache.zeppelin.spark.SparkInterpreter.createSparkContext(SparkInterpreter.java:356) 
org.apache.zeppelin.spark.SparkInterpreter.getSparkContext(SparkInterpreter.java:150) 
org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:525) 
org.apache.zeppelin.interpreter.ClassloaderInterpreter.open(ClassloaderInterpreter.java:74) 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:68) 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:92) 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:345) 
org.apache.zeppelin.scheduler.Job.run(Job.java:176) 
org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139) 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
java.util.concurrent.FutureTask.run(FutureTask.java:266) 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
java.lang.Thread.run(Thread.java:745) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2257) 
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2239) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2239) 
    at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2312) 
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:91) 

为什么它说我有运行多个SparkContexts ?如果我不添加行var sc = new SparkContext(conf),那么scnull,所以它不会被创建。

+0

SparkContext应该由Zeppelin的名称sc自动创建..我在你的文章中知道你已经说过它是空的,但它不应该... – mgaido

+0

@ mark91:好的,你是对的。我仔细检查了代码,'sc'确实是创建的。现在的问题是设置检查点目录。 – Klue

+0

这是你的问题吗?你有没有试过做ssc.checkpoint(“/ my_cwonderful_checkpoint_dir”)? – mgaido

回答

1

您不能在Zeppelin中使用多个SparkContext。这是他的一个限制,因为他实际上正在创建一个到SparkContext的webhook。

如果您希望在Zeppelin中设置您的SparkConf,最简单的方法是在Interpreter菜单中设置这些属性并重新启动解释器以在SparkContext中执行这些配置。现在

,你可以回到你的笔记本电脑和测试代码:

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.{Logging, SparkConf, SparkContext} 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.DStream 

def maxWaitTimeMillis: Int = 20000 
def actuallyWait: Boolean = false 

def batchDuration: Duration = Seconds(1) 
val ssc = new StreamingContext(sc, batchDuration) 

更多的是here

+0

谢谢。它说我也应该设置检查点目录。它可以按照以下'ssc.checkpoint(SparkCheckpointDir)'完成,但是如何定义'SparkCheckpointDir'? – Klue

+0

您也可以在解释器菜单中设置检查点目录。您将看到名称和值为spark的属性。 – eliasah

+0

你知道一些教程,展示如何做到这一点?另外,如果你解释如何以编程方式设置检查点,我很感激。非常感谢。 – Klue