2015-10-19 106 views
0

我开始使用spark来学习。我做了一个基于this document的简单程序。SparkSQL DataFrame:使用缓存时sql查询不起作用

我的程序从文件(在HDFS集群上)读取支付日志,将其传输到一个数据框,并在一些sql查询中使用这个数据框。我在两种情况下运行我的程序:使用和不使用cache()方法。我遇到了一个奇怪的问题,因为描述波纹管:

  1. 不使用高速缓存():

我试图运行一些查询和一切都很好。 (log_zw是我的表名)

val num_records = sqlContext.sql("select * from log_zw").count 
val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count 
  • 使用高速缓存()
  • 我也使用上述两个查询。第一个查询返回正确的值,但第二个是而不是,它返回。

    然而,当我在另一种方法查询它:

    val num_acc1 = log_zw.filter(log_zw("ACN").contains("acc1")).count 
    

    它返回正确的结果。

    我对Spark和集群计算系统非常陌生,我没有任何想法,为什么它像那样工作。任何人都可以请向我解释这个问题,特别是使用SQL查询和火花方法时的不同。

    编辑:这是模式,它非常简单。

    root 
    |-- PRODUCT_ID: string (nullable = true) 
    |-- CHANNEL: string (nullable = true) 
    |-- ACN: string (nullable = true) 
    |-- AMOUNT_VND: double (nullable = false) 
    |-- TRANS_ID: string (nullable = true) 
    

    EDIT2:这是使用高速缓存()时,我的代码:(我跑了一些查询,结果表明:在代码中的注释)

    // read tsv files 
    case class LogZW(
        PRODUCT_ID: String, 
        PLATFORM: String, 
        CHANNEL: String, 
        ACN: String, 
        AMOUNT_VND: Double, 
        TRANS_ID: String) 
    
    def loadLog(filename: String): DataFrame = { 
        sc.textFile(filename).map(line => line.split("\t")).map(p => 
        LogZW(p(1), p(3), p(4), p(5), p(9).toDouble, p(10).substring(0,8))).toDF() 
    } 
    
    // generate schema 
    val schemaString = "PRODUCT_ID PLATFORM CHANNEL ACN AMOUNT_VND TRANS_ID" 
    val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) 
    
    // read all files 
    val HDFSFolder = "hdfs://master:54310/user/lqthang/data/*" 
    val log = loadLog(HDFSFolder) 
    
    // register table 
    log.registerTempTable("log") 
    log.show() 
    
    // select a subset of log table 
    val log_zw = sqlContext.sql("select PRODUCT_ID, CHANNEL, ACN, AMOUNT_VND, TRANS_ID from log where PLATFORM = 'zingwallet' and CHANNEL not in ('CBZINGDEAL', 'VNPT') and PRODUCT_ID not in ('ZingCredit', 'zingcreditdbg') ") 
    
    // register new table 
    log_zw.show() 
    log_zw.registerTempTable("log_zw") 
    
    // cache table 
    log_zw.cache() 
    
    // this query returns incorrect value!! 
    val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count 
    
    // this query returns correct value! 
    val num_acc2 = sqlContext.sql("select * from log_zw where trim(ACN) = 'acc1' ").count 
    
    // uncache data and try another query 
    log_zw.unpersist() 
    
    // this query also returns the correct value!!! 
    val num_acc2 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count 
    

    EDIT3:我试着将另一个缓存()方法添加到log数据帧:

    // register table 
    log.registerTempTable("log") 
    log.show() 
    log.cache() 
    

    以下代码是相同的如上(log_zw.cache())。所以重要的结果是:

    // this query returns the CORRECT value!! 
    val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count 
    
    +0

    你能告诉我们的模式? –

    +0

    @AlbertoBonsanto我添加了架构。 –

    回答

    0

    我们没有很多关于数据是什么的细节,但我注意到你的两个代码段做了不同的事情。

    在第一个,你做ACN = 'acc1',但在第二个,你检查ACN 包含'acc1'。

    所以第二位(带过滤器),将匹配如果ACN是“ACC1”或“ACC1”或“ACC1”

    换句话说,我敢打赌,如果你添加一个微调到您的SQL查询你会得到不同的结果。

    那么试试这个:
    val num_records = sqlContext.sql("select * from log_zw").count val num_acc1 = sqlContext.sql("select * from log_zw where trim(ACN) = 'acc1' ").count

    +0

    感谢您的回答。 使用** trim **给了我一个正确的结果。但是,它不回答我的问题。而且,当不使用** cache()**和** trim **时:sqlContext.sql(“select * from log_zw where ACN ='acc1'”)。count也返回正确的结果。 –

    +0

    我需要更多信息。你可以减少一个样本,给出不正确的计数(你可以更改私人信息)?这样我们可以尝试重现这一点。你在哪里放了'''cache()'''调用?请更新您的2号码(使用缓存)。 – kanielc

    +0

    根据你的建议,我添加了整个代码,并只显示了一个错误结果的样本。 –