2015-07-10 335 views
15

我有一个示例应用程序可以将csv文件读取到数据框中。数据帧可以使用方法 df.saveAsTable(tablename,mode)以镶木地板格式存储到Hive表格中。将Spark数据框保存为Hive中的动态分区表

上面的代码工作正常,但我每天都有这么多数据,我想基于creationdate(表中的列)动态分区配置单元表。

有没有办法动态分区数据框并将其存储到配置单元仓库。希望避免使用hivesqlcontext.sql(insert into table partittioin by(date)....)对插入语句进行硬编码。

问题可以被视为一个扩展:How to save DataFrame directly to Hive?

任何的帮助深表感谢。

回答

12

我相信它的工作原理是这样的:

df是年,月等栏目

df.write.partitionBy('year', 'month').saveAsTable(...) 

df.write.partitionBy('year', 'month').insertInto(...) 
+0

尝试过这种方法Partitionby。它只能在RDD级别上工作,一旦创建了数据框,大多数方法都是DBMS样式的,例如, groupby,orderby但他们不提供写入Hive上不同分区文件夹的目的。 – Chetandalal

+4

好吧,所以能够用1.4版本解决它。 。df.write()模式(SaveMode.Append).partitionBy( “日期”)saveAsTable( “表名”); 。但是,这会将我的日期字段更改为整数值并删除实际日期。例如列中有9个唯一日期,但现在它们存储为1,2,3 ....并且文件夹名称为date = 1,2,3,...而不是date = 20141121。让我知道是否有办法做到这一点。 – Chetandalal

+0

@ subramaniam-ramasubramanian:请回答OP的问题作为答案,而不是编辑现有的答案 –

22

我能写分区蜂巢一个数据帧表使用df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")

我必须启用fo降低属性使其工作。

 
hiveContext.setConf("hive.exec.dynamic.partition", "true") 
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") 
+0

我应该在哪里设置上述2个参数?我尝试登录配置单元shell并运行上面的命令,它失败了。我相信我做错了。你能告诉我在哪里可以设置这些属性? –

+2

@VrushankDoshi在创建hiveContext之后,您可以在spark程序中设置它。 val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.setConf(“hive.exec.dynamic.partition” ,“true”)hiveContext.setConf(“hive.exec.dynamic。分区模式“,”非严格“) – MV23

3

我也面临同样的事情,但使用我解决的以下技巧。

  1. 当我们将任何表分区时,分区列变得区分大小写。

  2. 分区列应该以相同的名称出现在DataFrame中(区分大小写)。代码:

    var dbName="your database name" 
    var finaltable="your table name" 
    
    // First check if table is available or not.. 
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) { 
        //If table is not available then it will create for you.. 
        println("Table Not Present \n Creating table " + finaltable) 
        sparkSession.sql("use Database_Name") 
        sparkSession.sql("SET hive.exec.dynamic.partition = true") 
        sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ") 
        sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400") 
        sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID  string,EMP_Name   string,EMP_Address    string,EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)") 
        //Table is created now insert the DataFrame in append Mode 
        df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable) 
    } 
    
+0

df.write.mode(SaveMode.Append).insertInto(empDB +”。“+ finaltable)不需要提及partitionBy?示例df.write.mode(SaveMode.Append)。 partitionBy(“EMP_DEP”).insertInto(empDB +“。”+ finaltable) –

+0

没有必要..其可选 –

+0

还没有为我工作,表计数为零 –