2017-06-14 117 views
1

我正在尝试将dataframe写入text文件。如果一个文件包含单列,那么我可以写入文本文件。如果文件包含多列,那么我面临一些错误在文本文件中写入/存储数据帧

文本数据源仅支持单列,并且您有2列 列。

object replace { 

    def main(args:Array[String]): Unit = { 

    Logger.getLogger("org").setLevel(Level.ERROR) 

    val spark = SparkSession.builder.master("local[1]").appName("Decimal Field Validation").getOrCreate() 

    var sourcefile = spark.read.option("header","true").text("C:/Users/phadpa01/Desktop/inputfiles/decimalvalues.txt") 

    val rowRDD = sourcefile.rdd.zipWithIndex().map(indexedRow => Row.fromSeq((indexedRow._2.toLong+1) +: indexedRow._1.toSeq)) //adding prgrefnbr    
         //add column for prgrefnbr in schema 
    val newstructure = StructType(Array(StructField("PRGREFNBR",LongType)).++(sourcefile.schema.fields)) 

    //create new dataframe containing prgrefnbr 

    sourcefile = spark.createDataFrame(rowRDD, newstructure) 
    val op= sourcefile.write.mode("overwrite").format("text").save("C:/Users/phadpa01/Desktop/op") 

    } 

} 

回答

2

找到可以转换数据帧到RDD和隐蔽该行字符串,并写最后行

val op= sourcefile.rdd.map(_.toString()).saveAsTextFile("C:/Users/phadpa01/Desktop/op") 

编辑

由于@philantrovert和@Pravinkumar指出,上述内容将附加[]在输出文件中,这是真的。该解决方案将是replace他们empty字符作为

val op= sourcefile.rdd.map(_.toString().replace("[","").replace("]", "")).saveAsTextFile("C:/Users/phadpa01/Desktop/op") 

甚至可以使用regex

+0

我认为这会在每行的两端添加'['和']''。 – philantrovert

+0

,但它为每条记录添加了“[]”每条记录.eg:[2,12.2,12.2] –

+0

是的,它可以替换为空。让我更新答案 –

0

可以保存为文本CSV文件(.format("csv")

其结果将是在CSV格式的文本文件中,每个列将一个逗号分隔。

val op = sourcefile.write.mode("overwrite").format("csv").save("C:/Users/phadpa01/Desktop/op") 

更多信息可以在spark programming guide

+0

我想要的文件扩展名应该由上述方案的文件扩展名是.txt是的.csv –

+0

你怎么想每行要打印?逗号分隔或其他东西? – stefanobaghino

+0

@PravinkumarHadpad - 你为什么在意输出文件扩展名是.txt还是.csv? – Yaron

0

我用databricks API来我的DF输出保存到文本文件。

myDF.write.format("com.databricks.spark.csv").option("header", "true").save("output.csv") 
2

我会建议使用csv或其他分隔格式。以下是最简洁/优雅方式为例来写星火2+ .tsv格式

val tsvWithHeaderOptions: Map[String, String] = Map(
    ("delimiter", "\t"), // Uses "\t" delimiter instead of default "," 
    ("header", "true")) // Writes a header record with column names 

df.coalesce(1)   // Writes to a single file 
    .write 
    .mode(SaveMode.Overwrite) 
    .options(tsvWithHeaderOptions) 
    .csv("output/path")