我开始使用spark来学习。我做了一个基于this document的简单程序。SparkSQL DataFrame:使用缓存时sql查询不起作用
我的程序从文件(在HDFS集群上)读取支付日志,将其传输到一个数据框,并在一些sql查询中使用这个数据框。我在两种情况下运行我的程序:使用和不使用cache()方法。我遇到了一个奇怪的问题,因为描述波纹管:
- 不使用高速缓存():
我试图运行一些查询和一切都很好。 (log_zw是我的表名)
val num_records = sqlContext.sql("select * from log_zw").count
val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
- 使用高速缓存()
我也使用上述两个查询。第一个查询返回正确的值,但第二个是而不是,它返回。
然而,当我在另一种方法查询它:
val num_acc1 = log_zw.filter(log_zw("ACN").contains("acc1")).count
它返回正确的结果。
我对Spark和集群计算系统非常陌生,我没有任何想法,为什么它像那样工作。任何人都可以请向我解释这个问题,特别是使用SQL查询和火花方法时的不同。
编辑:这是模式,它非常简单。
root
|-- PRODUCT_ID: string (nullable = true)
|-- CHANNEL: string (nullable = true)
|-- ACN: string (nullable = true)
|-- AMOUNT_VND: double (nullable = false)
|-- TRANS_ID: string (nullable = true)
EDIT2:这是使用高速缓存()时,我的代码:(我跑了一些查询,结果表明:在代码中的注释)
// read tsv files
case class LogZW(
PRODUCT_ID: String,
PLATFORM: String,
CHANNEL: String,
ACN: String,
AMOUNT_VND: Double,
TRANS_ID: String)
def loadLog(filename: String): DataFrame = {
sc.textFile(filename).map(line => line.split("\t")).map(p =>
LogZW(p(1), p(3), p(4), p(5), p(9).toDouble, p(10).substring(0,8))).toDF()
}
// generate schema
val schemaString = "PRODUCT_ID PLATFORM CHANNEL ACN AMOUNT_VND TRANS_ID"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// read all files
val HDFSFolder = "hdfs://master:54310/user/lqthang/data/*"
val log = loadLog(HDFSFolder)
// register table
log.registerTempTable("log")
log.show()
// select a subset of log table
val log_zw = sqlContext.sql("select PRODUCT_ID, CHANNEL, ACN, AMOUNT_VND, TRANS_ID from log where PLATFORM = 'zingwallet' and CHANNEL not in ('CBZINGDEAL', 'VNPT') and PRODUCT_ID not in ('ZingCredit', 'zingcreditdbg') ")
// register new table
log_zw.show()
log_zw.registerTempTable("log_zw")
// cache table
log_zw.cache()
// this query returns incorrect value!!
val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
// this query returns correct value!
val num_acc2 = sqlContext.sql("select * from log_zw where trim(ACN) = 'acc1' ").count
// uncache data and try another query
log_zw.unpersist()
// this query also returns the correct value!!!
val num_acc2 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
EDIT3:我试着将另一个缓存()方法添加到log
数据帧:
// register table
log.registerTempTable("log")
log.show()
log.cache()
以下代码是相同的如上(log_zw.cache()
)。所以重要的结果是:
// this query returns the CORRECT value!!
val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
你能告诉我们的模式? –
@AlbertoBonsanto我添加了架构。 –