2017-04-26 502 views
0

我是Spark的新手。这是我想要做的事情。使用Spark和Scala将数据插入Hive Table的问题

我创建了两个数据流;第一个从文本文件读取数据并使用hivecontext将其注册为临时表。另一个持续从Kafka获得RDD,对于每个RDD,它创建数据流并将内容注册为可临时的。最后,我将这两个临时表连接在一个键上以获得最终结果集。我想将结果集插入配置单元表中。但我没有想法。试图遵循一些实例,但只能创建一个列中有一列的表格,并且不可读。你能告诉我如何将结果插入到特定的数据库和配置表中。请注意,我可以看到使用show函数进行连接的结果,因此真正的挑战在于插入到hive表中。

以下是我正在使用的代码。

imports..... 

    object MSCCDRFilter {   
    def main(args: Array[String]) { 
     val sparkConf = new SparkConf().setAppName("Flume, Kafka and Spark MSC CDRs Manipulation") 

     val sc = new SparkContext(sparkConf) 
     val sqlContext = new HiveContext(sc) 
     import sqlContext.implicits._ 
     val cgiDF = sc.textFile("file:///tmp/omer-learning/spark/dim_cells.txt").map(_.split(",")).map(p => CGIList(p(0).trim, p(1).trim, p(2).trim,p(3).trim)).toDF() 
     cgiDF.registerTempTable("my_cgi_list") 
     val CGITable=sqlContext.sql("select *"+ 
      " from my_cgi_list") 
     CGITable.show() // this CGITable is a structure I defined in the project 
       val streamingContext = new StreamingContext(sc, Seconds(10) 
     val zkQuorum="hadoopserver:2181" 
     val topics=Map[String, Int]("FlumeToKafka"->1) 

     val messages: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(streamingContext,zkQuorum,"myGroup",topics) 

     val logLinesDStream = messages.map(_._2) //获取数据 
     logLinesDStream.print() 
     val MSCCDRDStream = logLinesDStream.map(MSC_KPI.parseLogLine) // change MSC_KPI to MCSCDR_GO if you wanna change the class 
// MSCCDR_GO and MSC_KPI are structures defined in the project  
     MSCCDRDStream.foreachRDD(MSCCDR => { 
      println("+++++++++++++++++++++NEW RDD ="+ MSCCDR.count()) 

      if (MSCCDR.count() == 0) { 
      println("==================No logs received in this time interval=================") 
      } else { 

      val dataf=sqlContext.createDataFrame(MSCCDR) 
      dataf.registerTempTable("hive_msc") 
      cgiDF.registerTempTable("my_cgi_list") 
      val sqlquery=sqlContext.sql("select a.cdr_type,a.CGI,a.cdr_time, a.mins_int, b.Lat, b.Long,b.SiteID from hive_msc a left join my_cgi_list b" 
      +" on a.CGI=b.CGI") 
      sqlquery.show() 
      sqlContext.sql("SET hive.exec.dynamic.partition = true;") 
      sqlContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict;") 
      sqlquery.write.mode("append").partitionBy("CGI").saveAsTable("omeralvi.msc_data")  

      val FilteredCDR = sqlContext.sql("select p.*, q.* " + 
       " from MSCCDRFiltered p left join my_cgi_list q " + 
       "on p.CGI=q.CGI ") 

      println("======================print result =================") 
      FilteredCDR.show() 
     streamingContext.start() 
     streamingContext.awaitTermination() 
     } 
    } 
+0

如果您可以将示例中的代码减少到最小,可重现的示例,那么任何人都可以运行代码并自行观察问题将会很有帮助。此外,非关键线路混淆了问题可能出现的地方。我建议你编辑你的问题,使代码片段更有用。 –

回答

1

我已经取得了一些成功写入配置单元,使用以下:

dataFrame 
    .coalesce(n) 
    .write 
    .format("orc") 
    .options(Map("path" -> savePath)) 
    .mode(SaveMode.Append) 
    .saveAsTable(fullTableName) 

我们尝试使用分区不是通过注视着,因为我觉得有一些问题,我们需要的分区柱。

唯一的限制是并发写入,其中表不存在,那么任何任务尝试创建表(因为它在第一次尝试写入表时不存在)将例外。

请注意,在流式应用程序中写入Hive通常是不好的设计,因为您经常会写很多小文件,而这些文件的读取和存储效率非常低。因此,如果您对Hive的写入次数多于每隔一小时左右,则应确保包含压缩逻辑,或添加更适合事务数据的中间存储层。