2017-09-01 110 views
1

所以,基本是:遍历文件在pySpark目录自动数据帧和SQL表创建

  • 我在星火2 +
  • 我跑这一切在Jupyter笔记本
  • 我的目标是遍历目录中的许多文件,并具有spark(1)创建数据帧和(2)将这些数据帧转换为sparkSQL表。基本上,我希望能够随时打开笔记本电脑,并拥有一个干净的方式来始终加载可用的所有内容。

下面是我进口:

from pyspark.sql.functions import * 
from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

fileDirectory = 'data/' 

下面是实际的代码:

for fname in os.listdir(fileDirectory): 
    sqlContext.read.format("csv").\ 
      option("header", "true").\ 
      option("inferSchema", "true").\ 
      load(fname) 

    df_app = app_dat_df 
    df_app.createOrReplaceTempView(fname) 

但我发现了以下错误消息:

AnalysisException: u'Unable to infer schema for CSV. It must be specified manually.;' 

会似乎没有发现问题的方式,我传递的文件(伟大),但它不会让我推断模式。当我手动检查每个文件时,这从来都不是问题。

有人可以给我一些指示我可以改善他们/让它运行?

很多,非常感谢!

+0

确定的路径是正确的?你想访问本地文件系统并且你的工作目录是'data /'? 'fname'只是文件的名称,不是它的完整路径。如果问题来自一个文件,您应该在您的循环中添加一个打印以查看哪一个是问题 – MaFF

+0

好点。我忘了提到这一点,但是,是的,路径和所有这些都是正确的。如果我按照文件运行以下代码,它可以正常工作: 'df_name = sqlContext.read.format(“csv”)。option(“header”,“true”)。option(“inferSchema”,“true “)\ .load(” 数据/ file_name.csv “)' 'DF = df_name' 'df.createOrReplaceTempView(” df_name“)' – Berzerkeley

+1

所以你的工作目录是不是数据是数据的父目录。在你的代码中,你直接访问'transaction_dat.csv'。试试'fileDirectory + fname'而不是 – MaFF

回答

0

由于inferSchema出现错误,因此应手动指定csv数据的模式。

另外@Marie已经提到你需要稍微修改你的加载语法。

from pyspark.sql.types import * 

customSchema = StructType([ 
    StructField("string_col", StringType(), True), 
    StructField("integer_col", IntegerType(), True), 
    StructField("double_col", DoubleType(), True)]) 

fileDirectory = 'data/' 
for fname in os.listdir(fileDirectory): 
    df_app = sqlContext.read.format("csv").\ 
     option("header", "true"). \ 
     schema(customSchema). \ 
     load(fileDirectory + fname) 

希望这会有所帮助!


不要忘了让我们知道是否能解决你的问题:)

+0

@Berzerkeley如果它解决了您的问题,您应该将答案标记为正确答案,因为如果将来遇到类似问题,这肯定会有所帮助。谢谢! – Prem