2016-06-14 51 views
3

我想根据特性将一个巨大的csv文件细分为不同的分区来优化Spark应用程序的运行时间。Apache Spark:使用文件夹结构来减少分析的运行时间

E.g.我有一个包含客户ID(整数,a)的列,包含日期(月+年,例如01.2015,b)的列和包含产品ID(整数,c)的列(以及包含产品特定数据的更多列,不需要为分区)。

我想建立一个像/customer/a/date/b/product/c这样的文件夹结构。当用户想要了解2016年1月售出的来自客户X的产品信息时,他可以加载和分析保存在/customer/X/date/01.2016/*中的文件。

是否有可能通过通配符加载这样的文件夹结构?应该也可以加载特定时间范围内的所有客户或产品,例如, 01.2015至09.2015。是否可以使用通配符如/customer/*/date/*.2015/product/c?或者如何解决这样的问题呢?

我想对数据进行一次分区,稍后在分析中加​​载特定文件,以减少这些作业的运行时间(忽略分区的额外工作)。

解决方案:木地板的工作文件

我改变了我的星火应用程序保存我的数据有木文件,现在一切工作正常,并通过给文件夹结构,我可以预先选择数据。在这里我的代码片段:

JavaRDD<Article> goodRdd = ... 

SQLContext sqlContext = new SQLContext(sc); 

List<StructField> fields = new ArrayList<StructField>(); 
fields.add(DataTypes.createStructField("keyStore", DataTypes.IntegerType, false)); 
fields.add(DataTypes.createStructField("textArticle", DataTypes.StringType, false)); 

StructType schema = DataTypes.createStructType(fields); 

JavaRDD<Row> rowRDD = goodRdd.map(new Function<Article, Row>() { 
    public Row call(Article article) throws Exception { 
     return RowFactory.create(article.getKeyStore(), article.getTextArticle()); 
    } 
}); 

DataFrame storeDataFrame = sqlContext.createDataFrame(rowRDD, schema); 

// WRITE PARQUET FILES 
storeDataFrame.write().partitionBy(fields.get(0).name()).parquet("hdfs://hdfs-master:8020/user/test/parquet/"); 

// READ PARQUET FILES 
DataFrame read = sqlContext.read().option("basePath", "hdfs://hdfs-master:8020/user/test/parquet/").parquet("hdfs://hdfs-master:8020/user/test/parquet/keyStore=1/"); 

System.out.println("READ : " + read.count()); 

重要

不要用表试试只用一列!当您尝试拨打partitionBy方法时,您会得到例外!

+0

不能创建为HDFS路径蜂巢表?配置表格支持动态分区和静态分区。使用数据框,您可以根据需要查询数据。 –

+1

@RamPrasadG你不需要创建配置单元表。 Spark可以做到这一点。无论如何,也许我会回答这个问题,而不是;) –

+0

@GlennieHellesSindholt:这意味着,Spark可以解释像“/客户/ * /日期/ * /产品/ 123”路径? –

回答

9

因此,在Spark中,您可以按照您要查找的方式保存和读取分区数据。然而,而不是创建路径像你这样当你使用保存数据/customer/a/date/b/product/c星火将使用这种约定/customer=a/date=b/product=c

df.write.partitionBy("customer", "date", "product").parquet("/my/base/path/") 

当你需要在读取数据时,需要指定basepath-option这样的:

sqlContext.read.option("basePath", "/my/base/path/").parquet("/my/base/path/customer=*/date=*.2015/product=*/") 

以下的所有内容将被解释为Spark的列。在此处给出的示例中,Spark会将三列customer,dateproduct添加到数据框中。请注意,您可以根据需要为任何列使用通配符。

对于读取特定时间范围内的数据,您应该知道Spark使用谓词下推,因此它只会实际将数据加载到符合条件的内存中(如某些过滤器转换所指定的那样)。但是如果你真的想明确指定范围,你可以生成一个路径名列表,然后将其传递给读取函数。就像这样:

val pathsInMyRange = List("/my/path/customer=*/date=01.2015/product=*", 
          "/my/path/customer=*/date=02.2015/product=*", 
          "/my/path/customer=*/date=03.2015/product=*"..., 
          "/my/path/customer=*/date=09.2015/product=*") 

sqlContext.read.option("basePath", "/my/base/path/").parquet(pathsInMyRange:_*) 

无论如何,我希望这有助于:)

+0

谢谢!看起来不错,只是试了一下 - 它没有划分的工作...当我使用“df.write.partitionBy”我得到一个例外,请参阅上面编辑的代码。 –

+0

它现在工作!谢谢你的回答,@ glennie-helles-sindholt!由于我试图用一列(不切实际的测试用例)对一个表进行分区,所以发生异常,所以在这里你至少需要两列来使它工作! –