我有带字符串REC的文本文件作为记录分隔符和换行符作为列分隔符,并且每个数据都使用逗号作为分隔符来附加列名称,下面是示例数据格式从自定义数据格式创建火花数据框
REC
标识,19048
期限,牛奶
评级,1个
REC
标识,19049
期限,玉米
评级,5
用REC作为记录分隔符。现在,我想创建列名称为ID,Term和Rank的火花数据框。请帮助我。
我有带字符串REC的文本文件作为记录分隔符和换行符作为列分隔符,并且每个数据都使用逗号作为分隔符来附加列名称,下面是示例数据格式从自定义数据格式创建火花数据框
REC
标识,19048
期限,牛奶
评级,1个
REC
标识,19049
期限,玉米
评级,5
用REC作为记录分隔符。现在,我想创建列名称为ID,Term和Rank的火花数据框。请帮助我。
这里是工作的代码
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}
object RecordSeparator extends App {
var conf = new
SparkConf().setAppName("test").setMaster("local[1]")
.setExecutorEnv("executor- cores", "2")
var sc = new SparkContext(conf)
val hconf = new Configuration
hconf.set("textinputformat.record.delimiter", "REC")
val data = sc.newAPIHadoopFile("data.txt",
classOf[TextInputFormat], classOf[LongWritable],
classOf[Text], hconf).map(x => x._2.toString.trim).filter(x => x != "")
.map(x => getRecord(x)).map(x => x.split(","))
.map(x => record(x(0), x(2), x(2)))
val sqlContext = new SQLContext(sc)
val df = data.toDF()
df.printSchema()
df.show(false)
def getRecord(in: String): String = {
val ar = in.split("\n").mkString(",").split(",")
val data = Array(ar(1), ar(3), ar(5))
data.mkString(",")
}
}
case class record(Id: String, Term: String, Rank: String)
输出:
root
|-- Id: string (nullable = true)
|-- Term: string (nullable = true)
|-- Rank: string (nullable = true)
+-----+----+----+
|Id |Term|Rank|
+-----+----+----+
|19048|1 |1 |
|19049|5 |5 |
+-----+----+----+
假如你对 “正常” 的文件系统(未HDFS)的文件,你必须写一个文件分析器和然后用sc.parallelize
创建一个RDD
然后一个DataFrame
:
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Demo extends App {
val conf = new SparkConf().setMaster("local[1]").setAppName("Demo")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
case class Record(
var id:Option[Int] = None,
var term:Option[String] = None,
var rank:Option[Int] = None)
val filename = "data.dat"
val records = readFile(filename)
val df = sc.parallelize(records).toDF
df.printSchema()
df.show()
def readFile(filename:String) : Seq[Record] = {
import scala.io.Source
val records = mutable.ArrayBuffer.empty[Record]
var currentRecord: Record = null
for (line <- Source.fromFile(filename).getLines) {
val tokens = line.split(',')
currentRecord = tokens match {
case Array("REC") => Record()
case Array("Id", id) => {
currentRecord.id = Some(id.toInt); currentRecord
}
case Array("Term", term) => {
currentRecord.term = Some(term); currentRecord
}
case Array("Rank", rank) => {
currentRecord.rank = Some(rank.toInt); records += currentRecord;
null
}
}
}
return records
}
}
这给
root
|-- id: integer (nullable = true)
|-- term: string (nullable = true)
|-- rank: integer (nullable = true)
+-----+----+----+
| id|term|rank|
+-----+----+----+
|19048|milk| 1|
|19049|corn| 5|
+-----+----+----+
** **缩进你的代码。 – gsamaras