10
SparkSession 
    .builder 
    .master("local[*]") 
    .config("spark.sql.warehouse.dir", "C:/tmp/spark") 
    .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint") 
    .appName("my-test") 
    .getOrCreate 
    .readStream 
    .schema(schema) 
    .json("src/test/data") 
    .cache 
    .writeStream 
    .start 
    .awaitTermination 

在spark 2.1.0中执行此示例时出现错误。 没有.cache选择它的工作如预期,但与.cache选项我得到:为什么在流式数据集上使用缓存会失败,并显示“AnalysisException:必须使用writeStream.start()执行带有流式数据源的查询”?

异常线程“main” org.apache.spark.sql.AnalysisException:查询与流媒体源必须与writeStream.start执行( );; FileSource [SRC /测试/数据] 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ $ .ORG阿帕奇$ $火花SQL $ $催化剂分析$ $$ UnsupportedOperationChecker throwError(UnsupportedOperationChecker.scala:196) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ $ checkForBatch 1.适用(UnsupportedOperationChecker.scala:35) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ $ checkForBatch 1 。适用(UnsupportedOperationChecker.scala:33) 在org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $。 checkForBatch(UnsupportedOperationChecker.scala:33) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecut ion.scala:58) 在org.apache.spark.sql.execution.QueryExecution.withCachedData $ lzycompute(QueryExecution.scala:69) 在org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala: (QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute(QueryExecution.scala:79) 在org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) 在org.apache。 spark.sql.execution.QueryExecution.executedPlan $ lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryEx ecution.scala:84) at org.apache.spark.sql.execution.CacheManager $$ anonfun $ cacheQuery $ 1.apply(CacheManager.scala:102) at org.apache.spark.sql.execution.CacheManager.writeLock( CacheManager.scala:65) at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89) at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479) at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489) 在org.me.App $。主要(App.scala:23) 在org.me.App.main(App.scala)

任何想法?

+1

对不起,但我不认为只是不使用缓存是解决方案。 –

+1

Martin,请随时参与[SPARK-20927]上的评论(https://issues.apache.org/jira/browse/SPARK-20927?focusedCommentId=16334363&page=com.atlassian.jira.plugin.system.issuetabpanels %3Acomment-tabpanel#comment-16334363)关于在流式计算上缓存的需求 – mathieu

回答

10

你的(非常有趣)的情况下可以归结为以下行(你可以在spark-shell执行):

scala> :type spark 
org.apache.spark.sql.SparkSession 

scala> spark.readStream.text("files").cache 
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; 
FileSource[files] 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34) 
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89) 
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104) 
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68) 
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92) 
    at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603) 
    at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613) 
    ... 48 elided 

这样做的原因竟然很简单的解释(没有双关语星火SQL的explain意)。

spark.readStream.text("files")创建所谓的流式数据集

scala> val files = spark.readStream.text("files") 
files: org.apache.spark.sql.DataFrame = [value: string] 

scala> files.isStreaming 
res2: Boolean = true 

流数据集是星火SQL的Structured Streaming的基础。

正如你可以在结构化数据流的Quick Example已经阅读:

,然后开始使用start()流计算。

报价DataStreamWriter的start的scaladoc:

start()方法:StreamingQuery启动流媒体查询的执行,这将持续输出结果,以给定的路径作为新的数据到达。

所以,你必须使用start(或foreach)开始流查询的执行。你已经知道了。

但是......有结构化数据流是Unsupported Operations

此外,还有一些数据集的方法,不会在流媒体数据集工作。它们是会立即运行查询并返回结果的操作,这对流式数据集没有意义。

如果您尝试这些操作中的任何一个,将会看到类似“操作XYZ不支持流式DataFrames/Datasets”的AnalysisException。

这看起来很熟悉,不是吗?

cache在不支持的操作的列表,但是这是因为它已经简单地被忽视了(我报SPARK-20927修复它)。

cache应该已经在列表中,因为它的查询获取星火SQL的CacheManager的注册前不执行查询。

让我们去深入星火SQL深处... 屏住呼吸 ...

cacheispersistpersistrequests the current CacheManager to cache the query

sparkSession.sharedState.cacheManager.cacheQuery(this) 

虽然缓存查询CacheManager确实execute it

sparkSession.sessionState.executePlan(planToCache).executedPlan 

我们知道是不允许的,因为它是start(或foreach)这样做。

问题解决!

+1

我认为这是一个错误,所以我甚至更早地报告它https://issues.apache.org/jira/browse/SPARK-20865,我只需要确认我的难题。谢谢。 –

+0

由于目标代码可能会改变,因此链接到主链并不是真正相关的。 我认为这就是你的链接追加的内容 – crak

+0

@crak正确。我不应该用这个链接的主人。你认为什么会更好?查看过去特定版本的链接,但无法弄清楚如何在github上进行操作。介意提供一些帮助?我会很感激。 –

相关问题