我有Spark Notebook,Spark,Accumulo 1.6和Hadoop都在运行的Vagrant图像。从笔记本电脑,我可以手动创建一个扫描仪,并从我创建使用的Accumulo一个例子表拉力测试数据:如何在Spark-notebook中从Accumulo 1.6创建Spark RDD?
val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector("root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)
scanner.setRange(new Range("row_0000000000", "row_0000000010"))
for(entry: Entry[Key, Value] <- scanner) {
println(entry.getKey + " is " + entry.getValue)
}
会给前十行表中的数据。
当我尝试正是如此创建RDD:
val rdd2 =
sparkContext.newAPIHadoopRDD (
new Configuration(),
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
我得到一个RDD还给我,我不能做与因以下错误:
java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at org.apache.spark.rdd.RDD.count(RDD.scala:927)
这完全因为我没有指定任何参数来连接哪个表,认证是什么等等这一事实是有意义的。
所以我的问题是:我该怎么做需要从这里开始将前十行表数据存入我的RDD中?
更新一个 还不行,但我确实发现了一些东西。原来,有两个几乎相同的包,
org.apache.accumulo.core.client.mapreduce
&
org.apache.accumulo.core.client.mapred
都具有几乎相同的成员,除了一个事实,即一些方法签名是不同的。不知道为什么两者都存在,因为没有我可以看到的贬低通知。我试图以不愉快的方式实现Sietse的回答。下面是我做什么,并且响应:
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration jobConf: org.apache.hadoop.mapred.JobConf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
val rdd2 =
sparkContext.hadoopRDD (
jobConf,
classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value],
1
)
rdd2: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = HadoopRDD[1] at hadoopRDD at :62
rdd2.first
java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308) at org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.RDD.take(RDD.scala:1077) at org.apache.spark.rdd.RDD.first(RDD.scala:1110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69) at...
*编辑2 *
重:霍顿的答案 - 仍然没有喜悦:
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
InputFormatBase.setInputTableName(jobConf, "batchtest1")
val rddX = sparkContext.newAPIHadoopRDD(
jobConf,
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
rddX: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = NewHadoopRDD[0] at newAPIHadoopRDD at :58
Out[15]: NewHadoopRDD[0] at newAPIHadoopRDD at :58
rddX.first
java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.RDD.take(RDD.scala:1077) at org.apache.spark.rdd.RDD.first(RDD.scala:1110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61) at
编辑3 - 进步!
我能弄清楚为什么'input INFO not set'错误发生。眼尖的你之间无疑会看到下面的代码缺少结束“(”
AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password")
为我做这个火花的笔记本电脑,我一直点击执行按钮和移动上,因为我没有看到错误。我忘记的是,当你离开关闭'''时,笔记本将做火花外壳将会做的事情 - 它会永久等待你添加它。所以错误是'setConnectorInfo'方法永远不会被执行的结果。
不幸的是,我仍然无法将accumulo表数据推送到可用于我的RDD中。当我执行
rddX.count
我回来
res15: Long = 10000
这是正确的反应 - 有10000行中我指着表中的数据。然而,当我试图抓住正是如此数据的第一个元素:
rddX.first
我得到以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key
在何处何去何从有什么想法?
编辑4-成功!
接受的答案+评论是那里的90% - 除了accumulo键/值需要被转换成可序列化的事实。通过调用两者的.toString()方法,我得到了这个工作。我会尽快发布一些完整的工作代码,以防其他人遇到同样的问题。
大卫,你只是想知道一件快事(因为我还不知道accumulo ^^)。你已经在火星壳里试过这种东西了吗?所以,我会知道这是一个火花笔记本问题还是不是:-D。如果它是一个累积的东西,我可以用@lossyrob看到在Geotrellis中使用Accumulo和Spark – 2015-03-25 12:49:36
@andypetrella我还没有在spark-shell中试过这个,因为 - 我认为 - spark-notebook只是传递我的命令来激发和回到我身上从火花中回来的东西(你会比我更清楚)。我会说,当我尝试按照accumulo文档9.1.2节中的说明操作时,我得到了“Job job = new Job(getConf(java.lang.Object)”中的“java.lang.IllegalStateException:Job in state DEFINE instead of RUNNING” ))“或'我不知道什么getConf()”是消息,这取决于我如何设置。 – 2015-03-25 17:14:53
我在这里看到http://pastebin.com/ti7Qz19m这个人是按照accumulo文件 - 但我不能从它得到任何牵引力。 – 2015-03-25 17:17:12