我正在Scala中开发一个Spark应用程序,并想知道将其并行化并在hadoop集群上运行的最佳方法。我的代码会从hdfs文件中读取每一行,解析它并生成多个记录(对于每行),我将其作为一个case类存储。我已经在getElem()方法中编写了完整的逻辑,并按预期工作。在斯卡拉设计并行化Spark应用程序的最佳方法
现在,我想计算所有输入记录的逻辑并将响应存储到hdfs位置。
请让我知道我该如何处理spark和整合为输入生成的所有相应输出记录并写入HDFS。
object testing extends Serializable {
var recordArray=Array[Record]();
def main(args:Array[String])
{
val conf = new SparkConf().setAppName("jsonParsing").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext= new SQLContext(sc)
val input=sc.textFile("hdfs://loc/data.txt")
// input.collect().foreach(println)
input.map(data=>getElem(parse(data,false),sc,sqlContext))
}
//method definition
def getElem(json:JValue)={
// Parses the json and creates array of datasets for each input record and stores the data in case class
val x= Record("xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx","xxxx")
}
case class Record(summary_key: String, key: String,array_name_position:Int,Parent_Level_1:String,Parent_level_2:String,Parent_Level_3:String,Parent_level_4:String,Parent_level_5:String,
param_name_position:Integer,Array_name:String,paramname:String,paramvalue:String)
}
你测试了你的代码吗?我怀疑它是一个完整的工作代码。 –
@RameshMaharjan是的,逻辑工作在斯卡拉 –
具体是什么问题?既然你似乎已经想出了如何正确解析记录(你声称getElem正在工作,你喜欢),那么你唯一的问题是保存结果?您想以什么格式保存数据? – puhlen