我正在使用spark-1.3.1(pyspark),并且使用SQL查询生成了一个表。我现在有一个对象是一个DataFrame。我想将这个DataFrame对象(我称它为“table”)导出到一个csv文件,以便我可以操作它并绘制列。如何将DataFrame“表”导出到csv文件?如何将pyspark中的表数据框导出为csv?
谢谢!
我正在使用spark-1.3.1(pyspark),并且使用SQL查询生成了一个表。我现在有一个对象是一个DataFrame。我想将这个DataFrame对象(我称它为“table”)导出到一个csv文件,以便我可以操作它并绘制列。如何将DataFrame“表”导出到csv文件?如何将pyspark中的表数据框导出为csv?
谢谢!
如果数据帧适合于驾驶员记忆,你想保存到本地文件系统,您可以用toPandas
方法转换Spark DataFrame当地Pandas DataFrame,然后简单地使用to_csv
:
df.toPandas().to_csv('mycsv.csv')
否则,你可以使用spark-csv:
星火1.3
df.save('mycsv.csv', 'com.databricks.spark.csv')
星火1.4+
df.write.format('com.databricks.spark.csv').save('mycsv.csv')
火花2.0+,你可以直接使用csv
数据来源:
df.write.csv('mycsv.csv')
如果您不能使用火花CSV,你可以做到以下几点:
df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")
如果您需要处理字符串换行符或逗号不起作用。使用这个:
import csv
import cStringIO
def row2csv(row):
buffer = cStringIO.StringIO()
writer = csv.writer(buffer)
writer.writerow([str(s).encode("utf-8") for s in row])
buffer.seek(0)
return buffer.read().strip()
df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")
这怎么样(在你不想要一个班轮)?
for row in df.collect():
d = row.asDict()
s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"])
f.write(s)
f是一个打开的文件描述符。此外,分隔符是一个TAB字符,但很容易改变为任何你想要的。
对于Apache Spark 2+,为了将数据帧保存到单个csv文件中。使用以下命令:
query.repartition(1).write.csv("cc_out.csv", sep='|')
这里1
表示我只需要一个csv分区。您可以根据您的要求进行更改。
超级回答。对于第一个选项,如果我想写入管道分隔文件而不是逗号分隔的CSV,这可能吗? –
如果你有火花数据帧,你可以使用'df.write.csv('/ tmp/lookatme /')',并且会在'/ tmp/lookatme'中放置一组csv文件。使用spark要比序列化快得多在熊猫。唯一的缺点是你最终会得到一组csvs而不是一个,如果目标工具不知道如何连接它们,你需要自己做。 – Txangel
让csv脱离火花是一件大事。有关第一种解决方案的一些有趣之处在于'to_csv'工作时无需导入熊猫。 '.toPandas'是Spark的一部分,可能会隐式导入它.. – cardamom