2014-10-29 138 views
7

我下载了Spark 1.1.0,并使用“sbt assembly”构建它。我尝试运行的例子Spark-Cassandra project无法部署本地Spark作业,工作者因EndPointAssociationError失败

import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.SparkContext._ 
import com.datastax.spark.connector._ 

object Test { 
    def main (args: Array[String]) { 
    val conf = new SparkConf(true) 
     .set("spark.cassandra.connection.host", "127.0.0.1") 

    val sc = new SparkContext("spark://127.0.0.1:7077", "test", conf) 

    val rdd = sc.cassandraTable("test", "kv") 
    println(rdd.count) 
    println(rdd.first) 
    println(rdd.map(_.getInt("value")).sum) 
    } 
} 

火花主通过在sbin目录执行./start-master.sh开始。 然后启动从属在同一台机器上使用命令:

./start-slave.sh 0火花://127.0.0.1:7077

当火花:// ..地址是一个从本地主机获取:8080(火花仪表板)。

这很好,仪表板看到工人。然后我运行Scala程序,并得到一个ClassNotFoundException。这似乎是有点误导,因为工人日志报告这个错误:

14/10/29 12:23:05 ERROR EndpointWriter: AssociationError [akka.tcp://[email protected]:33137] -> [akka.tcp://[email protected]:37279]: Error [Association failed with [akka.tcp://# 
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://[email protected]:37279] 
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: localhost/127.0.0.1:37279 
] 

当我开始示例程序有这样的警告:

14/10/29 12:22:31 WARN util.Utils: Your hostname, bas-HP-EliteBook-8530w resolves to a loopback address: 127.0.0.1; using 192.168.122.1 instead (on interface virbr0) 
14/10/29 12:22:31 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 

我不知道为什么会这样警告那里。我的hosts文件如下:

127.0.0.1 localhost 
127.0.0.1 bas-HP-EliteBook-8530w 

# The following lines are desirable for IPv6 capable hosts 
::1  ip6-localhost ip6-loopback 
fe00::0 ip6-localnet 
ff00::0 ip6-mcastprefix 
ff02::1 ip6-allnodes 
ff02::2 ip6-allrouters 

的conf/spark-env.sh已启用这些选项:

SPARK_LOCAL_IP=127.0.0.1 
SPARK_MASTER_IP=127.0.0.1 

全部工人日志:

Spark Command: java -cp ::/home/bas/Downloads/spark-1.1.0.backup/conf:/home/bas/Downloads/spark-1.1.0.backup/assembly/target/scala-2.10/spark-assembly-1.1.0-hadoop1.0.4.jar -XX:MaxPermSize# 
======================================== 

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
14/10/29 12:21:16 INFO Worker: Registered signal handlers for [TERM, HUP, INT] 
14/10/29 12:21:16 INFO SecurityManager: Changing view acls to: bas, 
14/10/29 12:21:16 INFO SecurityManager: Changing modify acls to: bas, 
14/10/29 12:21:16 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(bas,); users with modify permissions: Set(bas,) 
14/10/29 12:21:17 INFO Slf4jLogger: Slf4jLogger started 
14/10/29 12:21:17 INFO Remoting: Starting remoting 
14/10/29 12:21:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:33137] 
14/10/29 12:21:17 INFO Utils: Successfully started service 'sparkWorker' on port 33137. 
14/10/29 12:21:17 INFO Worker: Starting Spark worker localhost:33137 with 2 cores, 2.8 GB RAM 
14/10/29 12:21:17 INFO Worker: Spark home: /home/bas/Downloads/spark-1.1.0.backup 
14/10/29 12:21:17 INFO Utils: Successfully started service 'WorkerUI' on port 8081. 
14/10/29 12:21:17 INFO WorkerWebUI: Started WorkerWebUI at http://localhost:8081 
14/10/29 12:21:17 INFO Worker: Connecting to master spark://127.0.0.1:7077... 
14/10/29 12:21:18 INFO Worker: Successfully registered with master spark://127.0.0.1:7077 
14/10/29 12:22:34 INFO Worker: Asked to launch executor app-20141029122234-0000/0 for test 
14/10/29 12:22:35 INFO ExecutorRunner: Launch command: "java" "-cp" "::/home/bas/Downloads/spark-1.1.0.backup/conf:/home/bas/Downloads/spark-1.1.0.backup/assembly/target/scala-2.10/spark-a# 
14/10/29 12:23:05 INFO Worker: Asked to kill executor app-20141029122234-0000/0 
14/10/29 12:23:05 INFO ExecutorRunner: Runner thread for executor app-20141029122234-0000/0 interrupted 
14/10/29 12:23:05 INFO ExecutorRunner: Killing process! 
14/10/29 12:23:05 INFO Worker: Executor app-20141029122234-0000/0 finished with state KILLED exitStatus 1 
14/10/29 12:23:05 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/sy# 
14/10/29 12:23:05 ERROR EndpointWriter: AssociationError [akka.tcp://[email protected]:33137] -> [akka.tcp://[email protected]:37279]: Error [Association failed with [akka.tcp://# 
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://[email protected]:37279] 
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: localhost/127.0.0.1:37279 
] 
14/10/29 12:23:05 ERROR EndpointWriter: AssociationError [akka.tcp://[email protected]:33137] -> [akka.tcp://[email protected]:37279]: Error [Association failed with [akka.tcp://# 
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://[email protected]:37279] 
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: localhost/127.0.0.1:37279 
] 
14/10/29 12:23:05 ERROR EndpointWriter: AssociationError [akka.tcp://[email protected]:33137] -> [akka.tcp://[email protected]:37279]: Error [Association failed with [akka.tcp://# 
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://[email protected]:37279] 
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: localhost/127.0.0.1:37279 
] 

完整的主机日志:

park Command: java -cp ::/home/bas/Downloads/spark-1.1.0.backup/conf:/home/bas/Downloads/spark-1.1.0.backup/assembly/target/scala-2.10/spark-assembly-1.1.0-hadoop1.0.4.jar -XX:MaxPermSize# 
======================================== 

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
14/10/29 12:20:52 INFO Master: Registered signal handlers for [TERM, HUP, INT] 
14/10/29 12:20:52 INFO SecurityManager: Changing view acls to: bas, 
14/10/29 12:20:52 INFO SecurityManager: Changing modify acls to: bas, 
14/10/29 12:20:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(bas,); users with modify permissions: Set(bas,) 
14/10/29 12:20:53 INFO Slf4jLogger: Slf4jLogger started 
14/10/29 12:20:53 INFO Remoting: Starting remoting 
14/10/29 12:20:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:7077] 
14/10/29 12:20:53 INFO Utils: Successfully started service 'sparkMaster' on port 7077. 
14/10/29 12:20:54 INFO Master: Starting Spark master at spark://127.0.0.1:7077 
14/10/29 12:20:54 INFO Utils: Successfully started service 'MasterUI' on port 8080. 
14/10/29 12:20:54 INFO MasterWebUI: Started MasterWebUI at http://localhost:8080 
14/10/29 12:20:54 INFO Master: I have been elected leader! New state: ALIVE 
14/10/29 12:21:18 INFO Master: Registering worker localhost:33137 with 2 cores, 2.8 GB RAM 
14/10/29 12:22:34 INFO Master: Registering app test 
14/10/29 12:22:34 INFO Master: Registered app test with ID app-20141029122234-0000 
14/10/29 12:22:34 INFO Master: Launching executor app-20141029122234-0000/0 on worker worker-20141029122117-localhost-33137 
14/10/29 12:23:05 INFO Master: akka.tcp://[email protected]:40211 got disassociated, removing it. 
14/10/29 12:23:05 INFO Master: Removing app app-20141029122234-0000 
14/10/29 12:23:05 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/sy# 
14/10/29 12:23:05 INFO Master: akka.tcp://[email protected]:40211 got disassociated, removing it. 
14/10/29 12:23:05 ERROR EndpointWriter: AssociationError [akka.tcp://[email protected]:7077] -> [akka.tcp://[email protected]:40211]: Error [Association failed with [akka.tcp:/# 
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://[email protected]:40211] 
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: /192.168.122.1:40211 
] 
14/10/29 12:23:05 INFO Master: akka.tcp://[email protected]:40211 got disassociated, removing it. 
14/10/29 12:23:05 INFO Master: akka.tcp://[email protected]:40211 got disassociated, removing it. 
14/10/29 12:23:05 ERROR EndpointWriter: AssociationError [akka.tcp://[email protected]:7077] -> [akka.tcp://[email protected]:40211]: Error [Association failed with [akka.tcp:/# 
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://[email protected]:40211] 
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: /192.168.122.1:40211 
] 
14/10/29 12:23:05 ERROR EndpointWriter: AssociationError [akka.tcp://[email protected]:7077] -> [akka.tcp://[email protected]:40211]: Error [Association failed with [akka.tcp:/# 
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://[email protected]:40211] 
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: /192.168.122.1:40211 
] 
14/10/29 12:23:05 INFO Master: akka.tcp://[email protected]:40211 got disassociated, removing it. 
14/10/29 12:23:05 WARN Master: Got status update for unknown executor app-20141029122234-0000/0 

IntelliJ stacktrace:

/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain Test 
    14/10/29 12:22:31 WARN util.Utils: Your hostname, bas-HP-EliteBook-8530w resolves to a loopback address: 127.0.0.1; using 192.168.122.1 instead (on interface virbr0) 
    14/10/29 12:22:31 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 
    14/10/29 12:22:31 INFO spark.SecurityManager: Changing view acls to: bas, 
    14/10/29 12:22:31 INFO spark.SecurityManager: Changing modify acls to: bas, 
    14/10/29 12:22:31 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(bas,); users with modify permissions: Set(bas,) 
    14/10/29 12:22:31 INFO slf4j.Slf4jLogger: Slf4jLogger started 
    14/10/29 12:22:31 INFO Remoting: Starting remoting 
    14/10/29 12:22:32 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:40211] 
    14/10/29 12:22:32 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:40211] 
    14/10/29 12:22:32 INFO util.Utils: Successfully started service 'sparkDriver' on port 40211. 
    14/10/29 12:22:32 INFO spark.SparkEnv: Registering MapOutputTracker 
    14/10/29 12:22:32 INFO spark.SparkEnv: Registering BlockManagerMaster 
    14/10/29 12:22:32 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20141029122232-f3bf 
    14/10/29 12:22:32 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 34325. 
    14/10/29 12:22:32 INFO network.ConnectionManager: Bound socket to port 34325 with id = ConnectionManagerId(192.168.122.1,34325) 
    14/10/29 12:22:32 INFO storage.MemoryStore: MemoryStore started with capacity 470.3 MB 
    14/10/29 12:22:32 INFO storage.BlockManagerMaster: Trying to register BlockManager 
    14/10/29 12:22:32 INFO storage.BlockManagerMasterActor: Registering block manager 192.168.122.1:34325 with 470.3 MB RAM 
    14/10/29 12:22:32 INFO storage.BlockManagerMaster: Registered BlockManager 
    14/10/29 12:22:32 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-eea6f1f3-8f69-4900-87c7-6da17f1f3d76 
    14/10/29 12:22:32 INFO spark.HttpServer: Starting HTTP Server 
    14/10/29 12:22:32 INFO server.Server: jetty-8.1.14.v20131031 
    14/10/29 12:22:32 INFO server.AbstractConnector: Started [email protected]:33688 
    14/10/29 12:22:32 INFO util.Utils: Successfully started service 'HTTP file server' on port 33688. 
    14/10/29 12:22:33 INFO server.Server: jetty-8.1.14.v20131031 
    14/10/29 12:22:33 INFO server.AbstractConnector: Started [email protected]:4040 
    14/10/29 12:22:33 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 
    14/10/29 12:22:33 INFO ui.SparkUI: Started SparkUI at http://192.168.122.1:4040 
    14/10/29 12:22:33 INFO client.AppClient$ClientActor: Connecting to master spark://127.0.0.1:7077... 
    14/10/29 12:22:33 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 
    14/10/29 12:22:34 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141029122234-0000 
    14/10/29 12:22:34 INFO client.AppClient$ClientActor: Executor added: app-20141029122234-0000/0 on worker-20141029122117-localhost-33137 (localhost:33137) with 2 cores 
    14/10/29 12:22:34 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141029122234-0000/0 on hostPort localhost:33137 with 2 cores, 512.0 MB RAM 
    14/10/29 12:22:34 INFO client.AppClient$ClientActor: Executor updated: app-20141029122234-0000/0 is now RUNNING 
    14/10/29 12:22:34 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster 
    14/10/29 12:22:34 INFO core.Cluster: New Cassandra host /127.0.0.1:9042 added 
    14/10/29 12:22:34 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 
    14/10/29 12:22:34 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 
    14/10/29 12:22:34 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 
    14/10/29 12:22:34 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1) 
    14/10/29 12:22:35 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster 
    14/10/29 12:22:35 INFO spark.SparkContext: Starting job: count at Test.scala:23 
    14/10/29 12:22:35 INFO scheduler.DAGScheduler: Got job 0 (count at Test.scala:23) with 1 output partitions (allowLocal=false) 
    14/10/29 12:22:35 INFO scheduler.DAGScheduler: Final stage: Stage 0(count at Test.scala:23) 
    14/10/29 12:22:35 INFO scheduler.DAGScheduler: Parents of final stage: List() 
    14/10/29 12:22:35 INFO scheduler.DAGScheduler: Missing parents: List() 
    14/10/29 12:22:35 INFO scheduler.DAGScheduler: Submitting Stage 0 (CassandraRDD[0] at RDD at CassandraRDD.scala:47), which has no missing parents 
    14/10/29 12:22:36 INFO storage.MemoryStore: ensureFreeSpace(4224) called with curMem=0, maxMem=493187235 
    14/10/29 12:22:36 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.1 KB, free 470.3 MB) 
    14/10/29 12:22:36 INFO storage.MemoryStore: ensureFreeSpace(2338) called with curMem=4224, maxMem=493187235 
    14/10/29 12:22:36 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.3 KB, free 470.3 MB) 
    14/10/29 12:22:36 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.122.1:34325 (size: 2.3 KB, free: 470.3 MB) 
    14/10/29 12:22:36 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0 
    14/10/29 12:22:36 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (CassandraRDD[0] at RDD at CassandraRDD.scala:47) 
    14/10/29 12:22:36 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 
    14/10/29 12:22:38 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:37279/user/Executor#2049453845] with ID 0 
    14/10/29 12:22:38 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, NODE_LOCAL, 23151 bytes) 
    14/10/29 12:22:38 INFO storage.BlockManagerMasterActor: Registering block manager localhost:37704 with 265.4 MB RAM 
    14/10/29 12:22:39 INFO network.ConnectionManager: Accepted connection from [192.168.122.1/192.168.122.1:36717] 
    14/10/29 12:22:39 INFO network.SendingConnection: Initiating connection to [localhost/127.0.0.1:37704] 
    14/10/29 12:22:39 INFO network.SendingConnection: Connected to [localhost/127.0.0.1:37704], 1 messages pending 
    14/10/29 12:22:39 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:37704 (size: 2.3 KB, free: 265.4 MB) 
    14/10/29 12:22:40 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition 
      java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
      java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
      java.security.AccessController.doPrivileged(Native Method) 
      java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
      java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
      java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
      java.lang.Class.forName0(Native Method) 
      java.lang.Class.forName(Class.java:270) 
      org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) 
      java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) 
      java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) 
      java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) 
      java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
      java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
      java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
      java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
      java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
      java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
      org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) 
      org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) 
      org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) 
      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
      java.lang.Thread.run(Thread.java:745) 
    14/10/29 12:22:40 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, localhost, NODE_LOCAL, 23151 bytes) 
    14/10/29 12:22:40 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on executor localhost: java.lang.ClassNotFoundException (com.datastax.spark.connector.rdd.partitioner.CassandraPartition) [duplicate 1] 
    14/10/29 12:22:40 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, localhost, NODE_LOCAL, 23151 bytes) 
    14/10/29 12:22:40 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on executor localhost: java.lang.ClassNotFoundException (com.datastax.spark.connector.rdd.partitioner.CassandraPartition) [duplicate 2] 
    14/10/29 12:22:40 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, localhost, NODE_LOCAL, 23151 bytes) 
    14/10/29 12:22:40 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on executor localhost: java.lang.ClassNotFoundException (com.datastax.spark.connector.rdd.partitioner.CassandraPartition) [duplicate 3] 
    14/10/29 12:22:40 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 
    14/10/29 12:22:40 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    14/10/29 12:22:40 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 
    14/10/29 12:22:40 INFO scheduler.DAGScheduler: Failed to run count at Test.scala:23 
    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, localhost): java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition 
      java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
      java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
      java.security.AccessController.doPrivileged(Native Method) 
      java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
      java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
      java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
      java.lang.Class.forName0(Native Method) 
      java.lang.Class.forName(Class.java:270) 
      org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) 
      java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) 
      java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) 
      java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) 
      java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
      java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
      java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
      java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
      java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
      java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
      org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) 
      org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) 
      org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) 
      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
      java.lang.Thread.run(Thread.java:745) 
    Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
     at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
     at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

    Process finished with exit code 1 

build.sbt:

name := "sparktest" 

version := "1.0" 

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0-alpha3" withSources() withJavadoc() 

libraryDependencies += "org.apache.spark" % "spark-core" % "1.1.0" 

斯卡拉编译版本2.10.4 =

Java版本= 1.7.0_67

我尝试设置火花本地IP这样:

System.setProperty("SPARK_LOCAL_IP", "127.0.0.1") 
println(System.getenv("SPARK_LOCAL_IP")) 

and like t他的:

scala.util.Properties.envOrElse("SPARK_LOCAL_IP", "127.0.0.1") 
println(System.getenv("SPARK_LOCAL_IP")) 

但它只是打印空。

在此先感谢您的任何建议。

编辑:改变火花配置如下解决这个问题:

val conf = new SparkConf(true) 
     .set("spark.cassandra.connection.host", "127.0.0.1") 
     .set("spark.executor.extraClassPath", "/home/bas/Downloads/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar") 

注意,这并不与火花卡桑德拉连接器的SBT jar文件的工作,我必须建立它从源头上。事实上,我删除了所有的SBT依赖关系,并将spark依赖作为源构建添加。

回答

7

你得到错误的原因是因为Spark在工作者的类路径上没有spark-cassandra-connector

您可以使用one-jar SBT插件,像sbt-assembly一个创造assembled jar包括所有的依赖关系。并用它to submit你的工作,星火:

./bin/spark-submit \ 
    --class <main-class> 
    --master <master-url> \ 
    <assembled jar> \ 
    [application-arguments] 

或者你可以在全球范围使用的Spark的Runtime Environment选项spark.executor.extraClassPath并设置CLASSPATH为了让应用程序。

+0

它工作:-)。我将用调整后的代码编辑原始问题。我不得不从source手动构建spark-cassandra-connector库,因为SBT生成的jar文件似乎不起作用。谢谢! – Bas 2014-10-29 14:17:59

相关问题