2

我有一个场景,我需要使用for循环并行触发许多sql查询,并将结果列表收集到ListBuffer中。但是,我收到很多运行循环的错误,结果并不完整。为了举例我犯了一个非常简单的可重复的例子:spark 2.0并行JobProgressListener失败惨剧

import scala.collection.mutable.ListBuffer 
val dummy = List("a","b").toDF.createOrReplaceTempView("df") 
spark.catalog.cacheTable("df") 
val dig = (0 to 9).par 
var counter = 0:Int 
var results = ListBuffer[List[org.apache.spark.sql.Row]]() 

for (i1 <- dig) { 
    for (i2 <- dig) { 
    for (i3 <- dig) { 
     println("||==="+i1+"=="+i2+"=="+i3+"===="+(i1*100+i2*10+i3*1)+"===="+counter+"=======||") 
     counter +=1 
     results += spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"','"+(i1*100+i2*10+i3*1)+"','"+counter+"',* from df ").collect().toList 
     } 
    } 
    } 
results(0).take(2).foreach(println) 
results.size 
results.flatten.size 

上述代码简单地从0计数到999,每个计数插入到一个ListBuffer的2行的列表。与比较

运行代码的结果与“串行”计数器值沿着表:

||===9==8==3====983====969=======|| 
||===9==8==5====985====969=======|| 
||===9==8==1====981====969=======|| 
||===9==8==2====982====969=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 784 
||===9==8==7====987====974=======|| 
||===5==8==9====589====975=======|| 
||===9==8==4====984====976=======|| 
||===9==8==6====986====976=======|| 
||===9==8==9====989====977=======|| 
||===9==8==8====988====977=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 773 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 790 
||===5==9==0====590====980=======|| 
||===5==9==2====592====980=======|| 
||===5==9==5====595====980=======|| 
||===5==9==1====591====980=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 795 
||===5==9==3====593====984=======|| 
||===5==9==7====597====985=======|| 
||===5==9==8====598====985=======|| 
||===5==9==6====596====987=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 798 
||===5==9==9====599====988=======|| 
||===5==9==4====594====989=======|| 
||===9==9==0====990====990=======|| 
||===9==9==5====995====991=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 784 
||===9==9==2====992====992=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 789 
||===9==9==3====993====993=======|| 
||===9==9==1====991====994=======|| 
||===9==9==4====994====995=======|| 
||===9==9==7====997====996=======|| 
||===9==9==8====998====997=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 790 
||===9==9==6====996====998=======|| 
||===9==9==9====999====999=======|| 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 805 
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 798 

scala> results(0).take(2).foreach(println) 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 802 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 805 
[trial,0,0,0,0,16,a] 
[trial,0,0,0,0,16,b] 

scala> results.size 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 839 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 840 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 839 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 842 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 855 
res3: Int = 1000 

scala> results.flatten.size 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 860 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 854 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 860 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 868 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 874 
res4: Int = 2000 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 882 

scala> 

[Stage 589:=(28 + 0)/28][Stage 590:>(27 + 1)/28][Stage 591:>(20 + 7)/28]16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 888 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 895 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 898 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 898 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 905 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 906 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 907 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 902 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 905 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 913 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 915 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 916 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 913 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 920 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 942 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 946 
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 942 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 946 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 948 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 956 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 952 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 965 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 965 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 966 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 976 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 976 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 990 
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 999 


scala> 

,这些都是但一些WARN我得到。

你可以看到,计数器“步履蹒跚”有时

**这就是麻烦开始**警告

很多,但results.size=1000results.flatten.size = 2000预期。

但是试图算到10000以同样的方式会导致更警告:

import scala.collection.mutable.ListBuffer 
val dummy = List("a","b").toDF.createOrReplaceTempView("df") 
spark.catalog.cacheTable("df") 
val dig = (0 to 9).par 
var counter = 0:Int 
var results = ListBuffer[List[org.apache.spark.sql.Row]]() 

for (i1 <- dig) { 
    for (i2 <- dig) { 
    for (i3 <- dig) { 
     for (i4 <- dig) { 
     println("||==="+i1+"=="+i2+"=="+i3+"=="+i4+"===="+(i1*1000+i2*100+i3*10+i4*1)+"===="+counter+"=======||") 
     counter +=1 
     results += spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',* from df ").collect().toList 
     } 
    } 
    } 
} 
results(0).take(2).foreach(println) 
results.size 
results.flatten.size 

输出:

16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8797 
||===0==9==4==3====943====9998=======|| 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8799 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8801 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8802 
||===0==9==4==4====944====9999=======|| 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8803 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8804 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8805 
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8806 

,结果:

scala> results(0).take(2).foreach(println) 
[trial,3,0,0,0,3000,7,a] 
[trial,3,0,0,0,3000,7,b] 

scala> results.size 
res3: Int = 9999 

scala> results.flatten.size 
res4: Int = 19998 

它缺少一个值。

我请你来试试下面的代码计数100000:

import scala.collection.mutable.ListBuffer 
val dummy = List("a","b").toDF.createOrReplaceTempView("df") 
spark.catalog.cacheTable("df") 
val dig = (0 to 9).par 
var counter = 0:Int 
var results = ListBuffer[List[org.apache.spark.sql.Row]]() 

for (i0 <- dig) { 
    for (i1 <- dig) { 
    for (i2 <- dig) { 
     for (i3 <- dig) { 
     for (i4 <- dig) { 
      println("============="+i0+"=="+i1+"=="+i2+"=="+i3+"=="+i4+"===="+(i0*10000+i1*1000+i2*100+i3*10+i4*1)+"===="+counter+"=========") 
      counter +=1 
      results += spark.sql("select 'trial','"+i0+"','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i0*10000+i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',* from df ").collect().toList 
     } 
     } 
    } 
    } 
} 

不仅我得到吨运行期间ofJobProgressListener警告,结果是不完整和不确定性:

scala> results(0).take(2).foreach(println) 
[trial,8,5,0,0,0,85000,13,a] 
[trial,8,5,0,0,0,85000,13,b] 

scala> results.size 
res3: Int = 99999 

scala> results.flatten.size 
res4: Int = 192908 

上我的真实生活中的例子我经常在运行的随机点出现“spark.sql.execution.id已设置”异常

我该如何解决这个问题?

我已经试过

spark.conf.set("spark.extraListeners","org.apache.spark.scheduler.StatsReportListener,org.apache.spark.scheduler.EventLoggingListener") 

和阅读Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id is already set

Apache Spark: network errors between executors

http://docs.scala-lang.org/overviews/parallel-collections/overview.html有关影响的操作,但它似乎太多的方向有。

,似乎最相关的这个问题恕我直言,这个错误是https://issues.apache.org/jira/browse/SPARK-10548 本该是在火花1.6

解决

谁能提供关于解决这种情况下一些提示?我的现实世界案例的复杂性与100000计数相似,并在随机阶段执行时失败。

我部署一个GCS dataproc集群

gcloud dataproc clusters create clusTest --zone us-central1-b --master-machine-type n1-highmem-16 --num-workers 2 --worker-machine-type n1-highmem-8 --num-worker-local-ssds 2 --num-preemptible-workers 8 --scopes 'https://www.googleapis.com/auth/cloud-platform' --project xyz-analytics

a screenshot of spark UI durning run

+0

什么版本星火的是你吗? –

+0

google gcs上的spark 2.0 –

+0

java.lang.IllegalArgumentException:spark.sql.execution.id已经设置为 at org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:81) at org。 apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) 在org.apache.spark.sql.Dataset.org $ $阿帕奇火花$ $ SQL数据集执行$$ $ 1(Dataset.scala:2182)在 org.apache.spark.sql.Dataset $$ anonfun $ $组织阿帕奇$火花$ SQL $数据集$$收集$ 1.适用(Dataset.scala:2187) 在... –

回答

2

结果是不完整和不确定性

非确定性部分应该给出提示。你在进入竞争状态的同时将结果添加到你的ListBuffer(它不是真正的线程安全并行更新,所以如果你运行足够长时间,最终会失去一些结果。)

我在本地尝试了它,可能会重现这个不完整的结果问题。只需添加一个同步块来添加到缓冲区就可以完成结果。您也可以使用其他​​数据结构为你的工作,所以你不必把一个明确的​​块例如java.util.concurrent.ConcurrentLinkedQueue什么的。

所以下面的解决了这个问题:

for (i1 <- dig) { 
    for (i2 <- dig) { 
    for (i3 <- dig) { 
     for (i4 <- dig) { 
     counter +=1 
     val result = spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',* from df ").collect().toList 
     synchronized { 
      results += result 
     } 
     } 
    } 
    } 
} 

至于“spark.sql.execution.id已设置”例外:我无法与上面给出的例子重现它。 (但是,我在本地Spark上运行上述代码。)它是否可以在本地安装上重现?

相关问题