2016-07-28 197 views
1

我正在通过Spark使用以下内容读取csv文件。使用Spark读取CSV

rdd=sc.textFile("emails.csv").map(lambda line: line.split(",")) 

我需要创建一个Spark DataFrame。

我已经通过以下这个转换RDD引发DF:

dataframe=rdd.toDF() 

但我需要指定DF的模式,而RDD转换为DF。我试着这样做:(我只是有2列文件和消息)

from pyspark import Row 

email_schema=Row('file','message') 

email_rdd=rdd.map(lambda r: email_schema(*r)) 

dataframe=sqlContext.createDataFrame(email_rdd) 

不过,我得到的错误: java.lang.IllegalStateException:输入行没有预计到所需的值数架构。需要2个字段,同时提供1个值。

我也试着读我的CSV文件中使用这样的:

rdd=sc.textFile("emails.csv").map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1])) 

我得到的错误:类型错误:“名单”对象不是可调用

我试着用熊猫来阅读我的CSV文件导入熊猫数据框架,然后将其转换为火花DataFrame,但我的文件太大了。

我还补充说:

bin/pyspark --packages com.databricks:spark-csv_2.10:1.0.3 

并使用下面的阅读我的文件:

df=sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('emails.csv') 

我收到错误: 产生java.io.IOException:(1 STARTLINE)EOF之前到达封装的令牌完成

我已经通过其他几个相关的线程,并尝试如上。任何人都可以请解释我哪里错了?

[使用Python 2.7,火花1.6.2 MacOSX上]

被修改:

第一3行如下所示。我需要提取电子邮件的内容。我该如何解决它?

allen-p/_sent_mail/1。 “Message-ID:< [email protected]> Date:Mon,14 May 2001 16:39:00 -0700(PDT) From:[email protected] To:[email protected] .COM 主题: 的MIME版本:1.0 的Content-Type:text/plain的;字符集= US-ASCII 内容传输编码:7位 X-来源:菲利普ķ艾伦 X-TO:蒂姆·贝尔登 X -cc: X-bcc: X-Folder:\ Phillip_Allen_Jan2002_1 \ Allen,Phillip K.发送邮件 X-Origin:Allen-P X-FileName:pallen(Non-Privileged)。PST

这里是我们的预测 “

艾伦-P/_sent_mail/10。” 消息ID:< [email protected]> 日期:星期五,2001年13点51分5月4日:00 -0700(PDT) From:[email protected] 收件人:[email protected] Subject:Re: Mime-Version:1.0 Content-Type:text/plain;字符集= US-ASCII 内容传输编码:7位 X-来源:菲利普ķ艾伦 X-TO:约翰·ĴLavorato X-CC: X-BCC: X-文件夹:\ Phillip_Allen_Jan2002_1 \阿伦,菲利普发送邮件 X-Origin:Allen-P X-FileName:pallen(Non-Privileged).pst

旅行开个商务会议让旅途变得愉快。特别是如果你必须准备一个演示文稿。我建议在这里举行商业计划会议,然后在没有任何正式商务会议的情况下出差。我甚至会试着去就一次旅行是否需要或者是必要的问题发表一些诚实的意见。

就商务会议而言,我认为尝试激发不同群体讨论什么是工作和什么不是工作会更有成效。主持人说话太多,而其他人则安静地等待轮到他们。如果以圆桌讨论的形式举行,会议可能会更好。

我的建议是去奥斯汀。打高尔夫球,租一艘滑雪船和喷气式滑雪板。飞行某处需要太多的时间 “

的Allen-P/_sent_mail/100”。邮件ID:< [email protected]> 日期:星期三,2000 3点00分00秒10月18日-0700(PDT) From:[email protected] To:[email protected] Subject:Re:test Mime-Version:1.0 Content-Type:text/plain;字符集= US-ASCII 内容传输编码:7位 X-来源:菲利普ķ艾伦 X-TO:莉亚凡Arsdall X-CC: X-BCC: X-文件夹:\ Phillip_Allen_Dec2000 \注文件夹\ '发送邮件 X-Origin:Allen-P X-FileName:pallen.nsf

测试成功。要走的路!“

+0

你可以打印第一五行from' emails.csv'(根据需要anonymyzing数据)的样本? – Alexander

+0

'line(line [0],line [1])'..'line()'的外部用法意味着你正在尝试调用一个list对象,因此我尝试了这个错误 –

回答

0

如果RDD将适合在内存中,然后:

rdd.toPandas().to_csv('emails.csv') 

如果没有,使用spark-csv为您的版本火花:

rdd.write.format('com.databricks.spark.csv').save('emails.csv') 

在你上面的例子:

rdd=....map(lambda line: line.split(",")).map(lambda line: line(line[0],line[1])) 
你不想要的:
rdd=....map(lambda line: line.split(",")).map(lambda line: (line[0], line[1])) 
+0

,但它是一个非常庞大的数据集。 – tg89

+0

如何附加com.databricks.spark.csv库以启动?我使用了我的文章中指定的格式。你知道我哪里错了吗?而且,我正在尝试阅读该文件。 – tg89

+0

谢谢@Alexander。我修好了它。但是现在在读取文件时,我得到了java.lang.IllegalStateException:输入行没有期望的模式所需的值的数量。需要2个字段,同时提供1个值。我编辑了我的帖子来显示我的第一个3行。第一列是粗体1,2,3的文件。第二列是包含所有消息内容的消息。你能告诉我怎样才能纠正这个并入2列? – tg89

0

如果你有一个巨大的文件,为什么不使用块一个熊猫数据帧,而不是加载所有的这一次,是这样的:

import pandas as pd 
df_pd = pd.read_csv('myfilename.csv',chunksize = 10000) 

for i,chunk in enumerate(df1): 
    if i==0: 
     df_spark = sqlContext.createDataFrame(chunk) 
    else: 
     df_spark = df_spark.unionAll(sqlContext.createDataFrame(chunk)) 

df_spark将是你所需要的火花数据帧。这是低效的,但它会起作用。对于其他一些实现方法,您可以参考这个question

另一个可能的方法是使用rdd的inferSchema方法,但是您需要在csv文件中有列名以使其起作用,请参阅this。 所以,你可以这样做:

srdd = inferSchema(rdd) 
email_rdd=rdd.map(lambda r: srdd(*r)) 

dataframe=sqlContext.createDataFrame(email_rdd)