我想对从mongodb获取的数据运行Spark RowSimilarity推荐器。为此,我写了下面的代码,它从mongo获取输入,将其转换为对象的RDD。这需要传递到IndexedDataSetSpark,然后传递给SimilarityAnalysis.rowSimilarityIDSScala - 创建IndexedDatasetSpark对象
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.BSONObject
import com.mongodb.hadoop.MongoInputFormat
object SparkExample extends App {
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri", "mongodb://my_mongo_ip:27017/db.collection")
val sparkConf = new SparkConf()
val sc = new SparkContext("local", "SparkExample", sparkConf)
val documents: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD(
mongoConfig,
classOf[MongoInputFormat],
classOf[Object],
classOf[BSONObject]
)
val new_doc: RDD[(String, String)] = documents.map(
doc1 => (
doc1._2.get("product_id").toString(),
doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-")).mkString(" ")
)
)
var myIDs = IndexedDatasetSpark(new_doc)(sc)
SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://myhadoop:9000/myfile", readWriteSchema)
我无法创造出可以传递给SimilarityAnalysis.rowSimilarityIDS一个IndexedDatasetSpark。请帮助我解决这个问题。
EDIT1:
我设法创建IndexedDatasetSpark对象和代码现在编译正确。我不得不作为一个隐含参数添加(sc)
到IndexedDatasetSpark
的代码运行:
Error: could not find implicit value for parameter sc: org.apache.spark.SparkContext
现在,当我运行它,它提供了以下错误:
Error: could not find implicit value for parameter sc: org.apache.mahout.math.drm.DistributedContext
我无法弄清楚如何让DistributedContext。
这是创建RDD并将其转换为IDS以便通过rowSimilarityIDS进行处理的正确方法吗?
更多上下文:我已经从这种情况开始:Run Mahout RowSimilarity recommender on MongoDB data
我build.sbt:
name := "scala-mongo"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2"
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
"org.scalatest" % "scalatest_2.10" % "1.9.2" % "test"
)
libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2"
resolvers += "typesafe repo" at "http://repo.typesafe.com/typesafe/releases/"
resolvers += Resolver.mavenLocal
EDIT2:我暂时去掉dfsWrite让代码执行和偶然误差以下:
java.io.NotSerializableException: org.apache.mahout.math.DenseVector
Serialization stack:
- object not serializable (class: org.apache.mahout.math.DenseVector, value: {3:1.0,8:1.0,10:1.0})
- field (class: scala.Some, name: x, type: class java.lang.Object)
- object (class scala.Some, Some({3:1.0,8:1.0,10:1.0}))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
是否有一些序列化,我可能跳过了?
你忘了显示错误吗? – pferrel
@pferrel:我用最后一个错误编辑了问题。请让我知道我是否遵循Scala/Spark/Mahout中正确的做法。 – user3295878
@pferrel:删除dfsWrite并让rowSimilarity运行后,我遇到了一个新问题。已经更新了这个问题。 – user3295878