2015-04-17 79 views
0

我试图从1.2版本使用新的连接功能,但我得到repl的repartitionByCassandraReplica函数的错误。cassandra火花连接器错误与repartitionByCassandraReplica函数

我试图复制网站的例子,一对夫妇的元素创造了一个卡桑德拉表(shopping_history): https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.mde

import com.datastax.spark.connector.rdd._ 
import com.datastax.spark.connector.cql.CassandraConnector 
import com.datastax.spark.connector._ 
import com.datastax.driver.core._ 

case class CustomerID(cust_id: Int) 
val idsOfInterest = sc.parallelize(1 to 1000).map(CustomerID(_)) 
val repartitioned = idsOfInterest.repartitionByCassandraReplica("cim_dev", "shopping_history", 10) 
repartitioned.first() 

我得到这个错误:

15/04/13 18:35:43 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, dev2-cim.aid.fr): java.lang.ClassNotFoundException: $line31.$read$$iwC$$iwC$CustomerID 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:344) 
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) 
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) 
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098) 
    at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:1098) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
    at org.apache.spark.scheduler.Task.run(Task.scala:56) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

我使用spark 1.2.0和connector 1.2.0 RC 3. idsOfInterest上使用的joinWithCassandraTable函数可以工作。

我也很好奇joinWithCassandraTable/cassandraTable与In子句/ foreachPartition(withSessionDo)语法之间的区别。

它们是否都将数据请求到作为协调器的本地节点? joinWithCassandraTable与repartitionByCassandraReplica结合如同异步查询一样高效,仅向本地节点请求数据?如果不应用repartitionByCassandraReplica会发生什么情况?

我已经问过这个问题上卡桑德拉连接器的谷歌小组论坛: https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/b615ANGSySc

感谢

+0

我不知道你的类加载器的问题,不知道你是如何运行这个代码,你可以给我们提交你的提交命令或启动命令吗? – RussS

+0

@RussS,我的启动命令是spark-shell :),其中spark.executor.extraClassPath/spark.driver.extraClassPath在spark-default.conf中设置到cassandra连接器jar中。奇怪的是,找不到的类是在shell中创建的... – Alex

+0

使用完整的程序集?另外尝试--jar有时会在某些版本的spark上出现一些classloader怪异。 – RussS

回答

2

我会在这里回答你的第二个问题,如果我能与第一部分随动根据更多信息来了解情况。

我也好奇betwween的差异: joinWithCassandraTable/cassandraTable用在从句/ foreachPartition(withSessionDo)语法。

带有in子句的cassandraTable将创建一个单独的spark分区。所以对于非常小的子句可能没问题,但子句必须从驱动程序序列化到火花应用程序。这对于大的子句可能是非常糟糕的,一般来说,如果我们不需要,我们不希望将数据从火花驱动程序来回发送给执行者。

joinWithCassandraTableforeachPartition(withSessionDo)非常相似。主要区别在于joinWithCassandraTable调用使用Connector转换和阅读代码,这将使得从Cassandra行中获取Scala对象变得更加容易。在这两种情况下,您的数据都将保持RDD格式,并且不会被序列化回驱动程序。他们还将使用先前RDD的分区器(或最后一个公开preferredLocation方法的RDD),以便他们能够使用repartitionByCassandraTable进行工作。

如果未应用repartitionByCassandraTable,则会在可能或可能不是您要请求的信息的协调者的节点上请求数据。这将在您的查询中添加额外的网络跃点,但这可能不是一个非常大的性能损失。在加入之前,您想重新分配的时间实际上取决于数据的总量和重新分配操作中火花洗牌的成本。

+0

感谢您的回答RussS。我仍然对“可能会或可能不是协调人”部分感到好奇。到目前为止,我使用带IN子句的foreachPartition(withSessionDo)语法来查询关于一批customerID(Part.Key)的一些时间序列信息,并且遇到了非常大的查询中的一些问题。我想知道是否每个执行者都在查询本地cassandra作为一个协调器,从而产生大量的网络流量和CPU负载。这就是为什么我在joinWithCassandraTable中插入内容。那么,执行者什么时候需要协调员? 1.1和1.2之间的行为有何不同? – Alex

+0

in子句还执行连接不会执行的服务器端multiget。至于协调员,如果只有一个多维数据集查询,则每个协调器只能获得一个协调器。 – RussS