SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "C:/tmp/spark")
.config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
.appName("my-test")
.getOrCreate
.readStream
.schema(schema)
.json("src/test/data")
.cache
.writeStream
.start
.awaitTermination
在spark 2.1.0中执行此示例时出现错误。 没有.cache
选择它的工作如预期,但与.cache
选项我得到:为什么在流式数据集上使用缓存会失败,并显示“AnalysisException:必须使用writeStream.start()执行带有流式数据源的查询”?
异常线程“main” org.apache.spark.sql.AnalysisException:查询与流媒体源必须与writeStream.start执行( );; FileSource [SRC /测试/数据] 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ $ .ORG阿帕奇$ $火花SQL $ $催化剂分析$ $$ UnsupportedOperationChecker throwError(UnsupportedOperationChecker.scala:196) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ $ checkForBatch 1.适用(UnsupportedOperationChecker.scala:35) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ $ checkForBatch 1 。适用(UnsupportedOperationChecker.scala:33) 在org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $。 checkForBatch(UnsupportedOperationChecker.scala:33) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecut ion.scala:58) 在org.apache.spark.sql.execution.QueryExecution.withCachedData $ lzycompute(QueryExecution.scala:69) 在org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala: (QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute(QueryExecution.scala:79) 在org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) 在org.apache。 spark.sql.execution.QueryExecution.executedPlan $ lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryEx ecution.scala:84) at org.apache.spark.sql.execution.CacheManager $$ anonfun $ cacheQuery $ 1.apply(CacheManager.scala:102) at org.apache.spark.sql.execution.CacheManager.writeLock( CacheManager.scala:65) at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89) at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479) at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489) 在org.me.App $。主要(App.scala:23) 在org.me.App.main(App.scala)
任何想法?
对不起,但我不认为只是不使用缓存是解决方案。 –
Martin,请随时参与[SPARK-20927]上的评论(https://issues.apache.org/jira/browse/SPARK-20927?focusedCommentId=16334363&page=com.atlassian.jira.plugin.system.issuetabpanels %3Acomment-tabpanel#comment-16334363)关于在流式计算上缓存的需求 – mathieu