2016-09-08 42 views
0

我有带字符串REC的文本文件作为记录分隔符和换行符作为列分隔符,并且每个数据都使用逗号作为分隔符来附加列名称,下面是示例数据格式从自定义数据格式创建火花数据框

REC
标识,19048
期限,牛奶
评级,1个
REC
标识,19049
期限,玉米
评级,5

用REC作为记录分隔符。现在,我想创建列名称为ID,Term和Rank的火花数据框。请帮助我。

回答

3

这里是工作的代码

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 
import org.apache.spark.{SparkConf, SparkContext} 


object RecordSeparator extends App { 
    var conf = new 
     SparkConf().setAppName("test").setMaster("local[1]") 
    .setExecutorEnv("executor- cores", "2") 
    var sc = new SparkContext(conf) 
    val hconf = new Configuration 
    hconf.set("textinputformat.record.delimiter", "REC") 
    val data = sc.newAPIHadoopFile("data.txt", 
    classOf[TextInputFormat], classOf[LongWritable], 
    classOf[Text], hconf).map(x => x._2.toString.trim).filter(x => x != "") 
    .map(x => getRecord(x)).map(x => x.split(",")) 
    .map(x => record(x(0), x(2), x(2))) 

    val sqlContext = new SQLContext(sc) 
    val df = data.toDF() 
    df.printSchema() 
    df.show(false) 

    def getRecord(in: String): String = { 
    val ar = in.split("\n").mkString(",").split(",") 
    val data = Array(ar(1), ar(3), ar(5)) 
    data.mkString(",") 
    } 
} 

case class record(Id: String, Term: String, Rank: String) 

输出:

root 
|-- Id: string (nullable = true) 
|-- Term: string (nullable = true) 
|-- Rank: string (nullable = true) 

+-----+----+----+ 
|Id |Term|Rank| 
+-----+----+----+ 
|19048|1 |1 | 
|19049|5 |5 | 
+-----+----+----+ 
+0

** **缩进你的代码。 – gsamaras

0

假如你对 “正常” 的文件系统(未HDFS)的文件,你必须写一个文件分析器和然后用sc.parallelize创建一个RDD然后一个DataFrame

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 
import scala.collection.mutable 

object Demo extends App { 
    val conf = new SparkConf().setMaster("local[1]").setAppName("Demo") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 


    case class Record(
        var id:Option[Int] = None, 
        var term:Option[String] = None, 
        var rank:Option[Int] = None) 



    val filename = "data.dat" 

    val records = readFile(filename) 
    val df = sc.parallelize(records).toDF 
    df.printSchema() 
    df.show() 



    def readFile(filename:String) : Seq[Record] = { 
    import scala.io.Source 

    val records = mutable.ArrayBuffer.empty[Record] 
    var currentRecord: Record = null 

    for (line <- Source.fromFile(filename).getLines) { 
     val tokens = line.split(',') 

     currentRecord = tokens match { 
     case Array("REC") => Record() 
     case Array("Id", id) => { 
      currentRecord.id = Some(id.toInt); currentRecord 
     } 
     case Array("Term", term) => { 
      currentRecord.term = Some(term); currentRecord 
     } 
     case Array("Rank", rank) => { 
      currentRecord.rank = Some(rank.toInt); records += currentRecord; 
      null 
     } 
     } 
    } 
    return records 
    } 
} 

这给

root 
|-- id: integer (nullable = true) 
|-- term: string (nullable = true) 
|-- rank: integer (nullable = true) 

+-----+----+----+ 
| id|term|rank| 
+-----+----+----+ 
|19048|milk| 1| 
|19049|corn| 5| 
+-----+----+----+