2017-06-06 332 views
0

我想用双引号连接两列在这两列获得前缀和后缀。代码有效,但它给了我额外的双引号。如何在spark sql concat中包含双引号?

输入:

campaign_file_name_1, campaign_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, 1 
campaign_file_name_1, campaign_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, 2 

预期输出:

campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, "campaign_name_1"="1", 2017-06-06 17:09:31 
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, "campaign_name_1"="2", 2017-06-06 17:09:31 

实际输出为每码:

campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, """campaign_name_1""=""1""", 2017-06-06 17:09:31 
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, """campaign_name_1""=""2""", 2017-06-06 17:09:31 

星火代码:

object campaignResultsMergerETL extends BaseETL { 


     val now = ApplicationUtil.getCurrentTimeStamp() 
     val conf = new Configuration() 
     val fs = FileSystem.get(conf) 
     val log = LoggerFactory.getLogger(this.getClass.getName) 

      def main(args: Array[String]): Unit = { 
       //--------------------- 
        code for sqlContext Initialization 

       //--------------------- 
      val campaignResultsDF = sqlContext.read.format("com.databricks.spark.avro").load(campaignResultsLoc) 
      campaignResultsDF.registerTempTable("campaign_results") 
      val campaignGroupedDF = sqlContext.sql(
    """ 
    |SELECT campaign_file_name, 
    |campaign_name, 
    |tracker_id, 
    |SUM(campaign_measure) AS campaign_measure 
    |FROM campaign_results 
    |GROUP BY campaign_file_name,campaign_name,tracker_id 
    """.stripMargin) 

campaignGroupedDF.registerTempTable("campaign_results_full") 

      val campaignMergedDF = sqlContext.sql(
     s""" 
     |SELECT campaign_file_name, 
     |tracker_id, 
     |CONCAT('\"',campaign_name, '\"','=','\"',campaign_measure,'\"'), 
     |"$now" AS audit_timestamp 
     |FROM campaign_results_full 
""".stripMargin) 

    saveAsCSVFiles(campaignMergedDF,campaignResultsExportLoc,numPartitions) 

} 


    def saveAsCSVFiles(campaignMeasureDF:DataFrame,hdfs_output_loc:String,numPartitions:Int): Unit = 
     { 
       log.info("saveAsCSVFile method started") 
       if (fs.exists(new Path(hdfs_output_loc))){ 
       fs.delete(new Path(hdfs_output_loc), true) 
     } 
     campaignMeasureDF.repartition(numPartitions).write.format("com.databricks.spark.csv").save(hdfs_output_loc) 
     log.info("saveAsCSVFile method ended") 
     } 

    } 

有人能帮助我解决这个问题?

回答

1

看起来您在CONCAT参数中错误地附上了=。尝试:

|CONCAT('\"',campaign_name, '\"','=','\"',campaign_measure,'\"'), 

[更新]

也许你的星火版本与我的不同,它似乎有望为我工作:

val df = Seq(("x", "y")).toDF("a", "b") 

df.createOrReplaceTempView("df") 

val df2 = spark.sqlContext.sql("""SELECT a, b, CONCAT('"', a, '"="', b, '"') as a_eq_b FROM df""") 

df2.show 
+---+---+-------+ 
| a| b| a_eq_b| 
+---+---+-------+ 
| x| y|"x"="y"| 
+---+---+-------+ 

df2.coalesce(1).write.option("header", "true").csv("/path/to/df2.csv") 

/path/to/df2.csv content: 
a,b,a_eq_b 
x,y,"\"x\"=\"y\"" 

现在,你可以任意做出报价无效像在下面:

df2.coalesce(1).write.option("header", "true").option("quote", "\u0000").csv("/path/to/df2null.csv") 

/path/to/df2null.csv content: 
a,b,a_eq_b 
x,y,"x"="y" 

请注意,虽然,如果您需要读取Spark上的CSV,请让sur e您包含相同的quote选项。

+0

@ Leo:我也一样,但仍然得到不正确的输出 –

+0

@Surender Raja,请看我扩展的答案。 –

+0

这是一个完美的答案。非常感谢 –

相关问题