2017-07-29 128 views
3

我对火花缓存行为有点困惑。 我想计算相关的数据集(B),高速缓存,并unpersist源数据集(一) - 这里是我的代码:火花缓存区别2.0.2和2.1.1

val spark = SparkSession.builder().appName("test").master("local[4]").getOrCreate() 
import spark.implicits._ 
val a = spark.createDataset(Seq(("a", 1), ("b", 2), ("c", 3))) 
a.createTempView("a") 
a.cache 
println(s"Is a cached: ${spark.catalog.isCached("a")}") 
val b = a.filter(x => x._2 < 3) 
b.createTempView("b") 
// calling action 
b.cache.first 
println(s"Is b cached: ${spark.catalog.isCached("b")}") 

spark.catalog.uncacheTable("a") 
println(s"Is b cached after a was unpersisted: ${spark.catalog.isCached("b")}") 

当使用火花2.0.2它按预期工作:

Is a cached: true 
Is b cached: true 
Is b cached after a was unpersisted: true 

但在2.1.1:

Is a cached: true 
Is b cached: true 
Is b cached after a was unpersisted: false 

我怎么能archieve 2.1.1相同的行为?

谢谢。

回答

1

我不知道应该如何。据测试,在Spark 2.1.1中,它按预期工作,但有几条评论反映了一些疑问。也许你可以在Spark项目中打开一个JIRA来澄清这种情况。

CachedTableSuite.scala

test("uncaching temp table") { 
    testData.select('key).createOrReplaceTempView("tempTable1") 
    testData.select('key).createOrReplaceTempView("tempTable2") 
    spark.catalog.cacheTable("tempTable1") 

    assertCached(sql("SELECT COUNT(*) FROM tempTable1")) 
    assertCached(sql("SELECT COUNT(*) FROM tempTable2")) 

    // Is this valid? 
    spark.catalog.uncacheTable("tempTable2") 

    // Should this be cached? 
    assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0) 
} 

assertCached方法检查numCachedTables等于第二个参数。

QueryTest.scala

/** 
* Asserts that a given [[Dataset]] will be executed using the given number of cached results. 
*/ 
def assertCached(query: Dataset[_], numCachedTables: Int = 1): Unit = { 
    val planWithCaching = query.queryExecution.withCachedData 
    val cachedData = planWithCaching collect { 
    case cached: InMemoryRelation => cached 
    } 

    assert(
    cachedData.size == numCachedTables, 
    s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + 
    planWithCaching) 
} 
+1

是,火花的家伙说,这是 '被设计' 的行为 - https://issues.apache.org/jira/browse/SPARK-21478。尽管不清楚如何处理不再需要的大型缓存数据集。有任何想法吗? – yanik1984