2016-08-17 123 views
0

我尝试创建一个包含其他系列RDD的JavaRDD。其他RDD中的Java Spark RDD?

RDDMachine.foreach(machine - > startDetectionNow()) 在里面,机器启动一个查询到ES并获得另一个RDD。我收集所有这些(1200hits)并转换为列表。机器开始使用该清单之后

首先:是否可以做到这一点?如果不是,我能以哪种方式尝试做一些不同的事情?

让我告诉我尝试做:

 SparkConf conf = new SparkConf().setAppName("Algo").setMaster("local"); 
    conf.set("es.index.auto.create", "true"); 
    conf.set("es.nodes", "IP_ES"); 
    conf.set("es.port", "9200"); 
    sparkContext = new JavaSparkContext(conf); 

    MyAlgoConfig config_algo = new MyAlgoConfig(Detection.byPrevisionMerge); 

    Machine m1 = new Machine("AL-27", "IP1", config_algo); 
    Machine m2 = new Machine("AL-20", "IP2", config_algo); 
    Machine m3 = new Machine("AL-24", "IP3", config_algo); 
    Machine m4 = new Machine("AL-21", "IP4", config_algo); 

    ArrayList<Machine> Machines = new ArrayList(); 
    Machines.add(m1); 
    Machines.add(m2); 
    Machines.add(m3); 
    Machines.add(m4); 

    JavaRDD<Machine> machineRDD = sparkContext.parallelize(Machines); 

    machineRDD.foreach(machine -> machine.startDetectNow()); 

我想开始我的算法在每一个必须从位于Elasticsearch数据学习机。


public boolean startDetectNow() 


    // MEGA Requete ELK 
    JavaRDD dataForLearn = Elastic.loadElasticsearch(
      Algo.sparkContext 
      , "logstash-*/Collector" 
      , Elastic.req_AvgOfCall(
        getIP() 
        , "hour" 
        , "2016-04-16T00:00:00" 
        , "2016-06-10T00:00:00")); 

    JavaRDD<Hit> RDD_hits = Elastic.mapToHit(dataForLearn); 
    List<Hit> hits = Elastic.RddToListHits(RDD_hits); 

所以我试图让每一个“机器”查询的所有数据。 我的问题是:是否有可能用Spark做到这一点?或者用其他方式? 当我在Spark中启动它时;当代码围绕第二个RDD时,它的接缝就像锁一样。

和错误消息是:

16/08/17 0时17分十三秒INFO SparkContext:开始的工作:收集在Elastic.java:94 16/08/17 0时17分十三秒INFO DAGScheduler:有1个输出分区的作业1(在Elastic.java:94收集) 16/08/17 00:17:13 INFO DAGScheduler:最后阶段:ResultStage 1(收集在Elastic.java:94) 16/08/17 00:17:13信息DAGScheduler:最后阶段的父母:列表() 16/08/17 00:17:13信息DAGScheduler:缺少父母:列表() 16/08/17 00:17:13 INFO DAGScheduler:提交ResultStage 1(地图上的MapPartitionsRDD [4]在Elastic.java:106),其中没有丢失的父母 16/08/17 00:17:13 INFO MemoryStore:将存储为内存中的值的广播块_1(估计大小4.3 KB,免费7.7 KB) 16/08/17 00:17:13 INFO MemoryStore:在存储器中存储字节数据块broadcast_1_piece0(估计大小2.5 KB,免费10.2 KB) 16/08/17 00:17:13 INFO BlockManagerInfo:在localhost上增加broadcast_1_piece0:46356(大小:2.5 KB,免费:511.1 MB) 16/08/17 00:17:13信息SparkContext:在DAG​​Scheduler.scala从广播创建广播1:1006 16/08/17 00:17:13信息DAGScheduler:从ResultStage 1提交1个丢失的任务(MapPartitionsRDD [4 ] at map at Elastic.java:106) 16/08/17 00:17:13 INFO TaskSchedulerImpl:将任务集1.0添加为1个任务 ^ C16/08/17 00:17:22 INFO SparkContext:调用stop()从关机ho确定 16/08/17 00:17:22 INFO SparkUI:停止Spark网络用户界面http://192.168.10.23:4040 16/08/17 00:17:22信息DAGScheduler:ResultStage 0(GuardConnect.java:60的foreach)在10,292秒内失败 16/08/17 00:17:22 INFO DAGScheduler:作业0失败:foreach在GuardConnect.java:60,花费10,470974 s 异常在线程“主要”org.apache.spark.SparkException:作业0取消,因为SparkContext已关闭 at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply( DAGScheduler.scala:804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cl eanUpAfterSchedulerStop(DAGScheduler.scala:804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop。的onStop(DAGScheduler.scala:1658)免 在org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) 在org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581)在 org.apache.spark.SparkContext $$ anonfun $停止$ 9.apply $ MCV $ SP(SparkContext.scala:1740) 在org.apache.spark.util.Utils $ .tryLogNonFatalError(Utils.scala 1229) 在组织.apache.spark.SparkContext.stop(SparkContext.scala:1739) 在org.apache.spark.SparkContext $$ anonfun $ 3.apply $ MCV $ SP(SparkContext.scala:596) 在org.apache.spark.util .SparkShutdownHook.run(ShutdownHookManager.scala:267) 在org.apache.spark.util.SparkShutdownHookManager $$ anonfun runAll $ $ 1 $$ anonfun申请$ $ $ MCV SP 1.适用$ $ $ MCV SP(ShutdownHookManager.scala:239 ) 在org.apache。 spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply_schema $ MCV $ SP $ 1.适用(ShutdownHookManager.scala:239) 在org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply_schema $ MCV $ SP $ 1.适用(ShutdownHookManager.scala:239) 在org.apache.spark.util.Utils $ .logUncaughtExceptions(Utils.scala 1765) 在org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.适用$ MCV $ SP(ShutdownHookManager.scala:239) 在org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.适用(ShutdownHookManager.scala:239) 在org.apache.spark .util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.适用(ShutdownHookManager.scala:239) 在scala.util.Try $。适用(Try.scala:161) 在org.apache.spark.util.SparkShutdownHookManager.runAll( ShutdownHookManager.scala:2 39) 在org.apache.spark.util.SparkShutdownHookManager $$匿名$ 2.run(ShutdownHookManager.scala:218) 在org.apache.hadoop.util.ShutdownHookManager $ 1.run(ShutdownHookManager.java:54) 在有机.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 在org.apache.spark.SparkContext.runJob(SparkContext.scala 1845) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 在org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 在org.apache.spark.rdd.RDD $$ anonfun $ $的foreach 1.适用(RDD.scala:912) 在org.apache.spark.rdd.RDD $$ anonfun $ $的foreach 1.适用(RDD.scala:910) 在org.apach e.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) 在org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111) 在org.apache.spark.rdd.RDD。用示波器(RDD.scala:316) 在org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 在org.apache.spark.api.java.JavaRDDLike $ class.foreach(JavaRDDLike.scala: 332) 在org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46) 在com.seigneurin.spark.GuardConnect.main(GuardConnect.java:60) 在sun.reflect.NativeMethodAccessorImpl。 invoke0(本地方法)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess orImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498) 在org.apache.spark.deploy.SparkSubmit $ .ORG阿帕奇$ $ $踢部署$ SparkSubmit $$ runMain( SparkSubmit.scala:731) 在org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181) 在org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206) 在org.apache.spark.deploy.SparkSubmit $。主要(SparkSubmit.scala:121) 在org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 17年8月16日零时17分22秒ERROR LiveListenerBus:SparkListenerBus已alreadyloggedin停止!丢弃事件SparkListenerStageCompleted([email protected]) 16/08/17零时17分22秒INFO DAGScheduler:结果塔格1(收集在弹性。java:94)在9,301秒失败 16/08/17 00:17:22错误LiveListenerBus:SparkListenerBus已停止!删除事件SparkListenerStageCompleted([email protected]) 16/08/17 00:17:22错误LiveListenerBus:SparkListenerBus已停止!删除事件SparkListenerJobEnd(0,1471385842813,JobFailed(org.apache.spark.SparkException:因为SparkContext被关闭而取消了作业0)) 16/08/17 00:17:22 INFO DAGScheduler:作业1失败:在Elastic处收集。 java:94,花了9,317650秒 16/08/17 00:17:22错误执行程序:阶段0.0中的任务0.0中的异常(TID 0) org.apache.spark.SparkException:因为关闭了SparkContext而导致作业1取消在org.apache.spark.scheduler.DAGScheduler上执行下列操作: $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply(DAGScheduler.scala:806) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply(DAGScheduler.scala :804) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.s卡拉:804) 在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) 在org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) 在org.apache。 spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) at org.apache.spark.SparkContext $$ anonfun $ stop $ 9.apply $ mcV $ sp(SparkContext.scala:1740) at org.apache.spark。 util.Utils $ .tryLogNonFatalError(Utils.scala:1229) at org.apache.spark.SparkContext.stop(SparkContext.scala:1739) at org.apache.spark.SparkContext $$ anonfun $ 3.apply $ mcV $ sp (SparkContext.scala:596) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:267) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ appl y $ mcV $ sp $ 1.apply $ mcV $ sp(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(ShutdownHookManager .scala:239) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util .Utils $ .logUncaughtExceptions(Utils.scala:1765) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply $ mcV $ sp(ShutdownHookManager.scala:239) at org.apache.spark。 util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply(ShutdownHookManager.scala:239) at scala.util。试试$ .apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:239) at org.apache.spark.util.SparkShutdownHookManager $$ anon $ 2.run(ShutdownHookManager.scala:218) at org.apache。 hadoop.util.ShutdownHookManager $ 1.run(ShutdownHookManager.java:54) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala :1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob (SparkContext.scala:1929) at org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply(RDD.scala:927) at org.apac he.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD。 withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.collect(RDD.scala:926) at org.apache.spark.api.java.JavaRDDLike $ class.collect(JavaRDDLike.scala: 339) at com.seigneurin.spark.Elastic org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:46) 。RddToListHits(Elastic.java:94) at com.seigneurin.spark.OXO.prepareDataAndLearn(OXO.java:126) at com.seigneurin.spark.OXO.startDetectNow(OXO.java:148) at com.seigneurin。 spark.GuardConnect.lambda $ main $ 1282d8df $ 1(GuardConnect.java:60) at org.apache.spark.api.java.JavaRDDLike $$ anonfun $ foreach $ 1.apply(JavaRDDLike.scala:332) at org.apache .spark.api.java.JavaRDDLike $$ anonfun $ foreach $ 1.apply(JavaRDDLike.scala:332) at scala.collection.Iterator $ class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator .foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1 $$ anonfun $ apply $ 32.apply(RDD.scala:912) at org.apache.spark.rdd .RDD $$ anonfun $ $的foreach 1 $$ anonf un $ apply $ 32.apply(RDD.scala:912) at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext $$ anonfun $运行作业$ 5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89 ) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/08/17 00:17:22错误LiveListenerBus:SparkListenerBus已停止!删除事件SparkListenerJobEnd(1,1471385842814,JobFailed(org.apache.spark.SparkException:因为SparkContext被关闭而取消作业1)) 16/08/17 00:17:22信息MapOutputTrackerMasterEndpoint:MapOutputTrackerMasterEndpoint已停止! 16/08/17 00:17:22 INFO MemoryStore:MemoryStore已清除 16/08/17 00:17:22信息BlockManager:BlockManager已停止 16/08/17 00:17:22信息BlockManagerMaster:BlockManagerMaster已停止 16/08/17 00:17:22信息OutputCommitCoordinator $ OutputCommitCoordinatorEndpoint:OutputCommitCoordinator已停止! 16/08/17 00:17:22 INFO RemoteActorRefProvider $ RemotingTerminator:关闭远程守护进程。 16/08/17 00:17:22 INFO RemoteActorRefProvider $ RemotingTerminator:远程守护进程关闭;继续冲洗远程传输。 16/08/17 00:17:22信息TaskSetManager:在阶段1.0(TID 1,本地主机,分区0,ANY,6751字节)中启动任务0.0 16/08/17 00:17:22错误收件箱:忽略错误 java.util.concurrent.RejectedExecutionException:任务[email protected][email protected]中被拒绝[Terminated,pool size = 0,active threads = 0,queued tasks = 0,completed tasks = 1] at java.util.concurrent.ThreadPoolExecutor $ AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util .concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.apache.spark.executor.Executor.launchTask(Executor.scala:128) at org.apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ reviveOffers $ 1.apply(LocalBackend.scala:86) at org.apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ reviveOffers $ 1.apply( LocalBackend.scala:84) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:84) at org。scala.collection.immutable.List.foreach(List.scala:318) apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ receive $ 1.applyOrElse(LocalBackend.scala:69) at org.apache.spark.rpc.netty.Inbox $$ anonfun $ process $ 1.apply $ mcV $ sp( Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.saflaCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher $ Me ssageLoop。运行(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java。 lang.Thread.run(Thread.java:745) 16/08/17 00:17:22 INFO SparkContext:成功停止SparkContext 16/08/17 00:17:22 INFO ShutdownHookManager:关机钩叫 16/08/17 00:17:22信息ShutdownHookManager:删除目录/ tmp/spark-8bf65e78-a916-4cc0-b4d1-1b0ec9a07157 16/08/17 00:17:22信息RemoteActorRefProvider $ RemotingTerminator:远程关闭。 16/08/17 0点十七分22秒INFO ShutdownHookManager:删除/ tmp目录/火花8bf65e78-a916-4cc0-b4d1-1b0ec9a07157 /的httpd-6d3aeb80-808c-4749-8f8b-ac9341f990a7

如果谢谢你可以给我一些建议。

+0

我们需要内部异常来帮助。所有这些都说明你的foreach存在问题。 –

+0

哼哼,也许是因为我有一段时间(1)刚刚在Rdd 之后?我在想,我可以用RDD来完成这项工作。我添加了错误 –

+0

的完整信息RDD的RDD实际上并不合理,但是,有一种欺骗编译器进行编译的方法。 – GameOfThrows

回答

0

您无法在RDD内部创建RDD,无论RDD的类型如何。 这是第一条规则。这是因为RDD是一个指向你的数据的抽象。

+0

好的,谢谢你的建议;) –