2017-04-14 124 views
1

我尝试将wordcount结果保存在文件中。值saveAsTextFile不是org.apache.spark.streaming.dstream.DStream [(String,Long)]的成员]

val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) 
wordCounts.saveAsTextFile("/home/hadoop/datafile1") 

却是露出

value saveAsTextFile is not a member of org.apache.spark.streaming.dstream.DStream[(String, Long)]    [error]  wordCounts.saveAsTextFile("/home/hadoop/datafile1") 

我使用的火花2.1。我展示了一个建议老火花版本的答案,但我想在火花2.1中做。谢谢。

回答

0

您在DStream上使用的定义方法为RDD

这是RDD方法:

def saveAsTextFile(path: String): Unit 

...与描述 “保存此RDD为一个文本文件,使用元素的字符串表示。”

这是DStream方法:在此DSTREAM作为文本文件

saveAsTextFiles(prefix: String, suffix: String = ""): Unit 

...与描述“保存每个RDD,使用元素的字符串表示在每批间隔的文件名是。根据前缀和后缀生成:“prefix-TIME_IN_MS.suffix。

因此该方法的签名是不同的。 - 无论是在名称和参数

在你的代码,wordCounts显然是DStream,因此它不具有saveAsTextFile方法

然而,我得到你对抽象概念感到困惑的感觉,并且确实想要写出包含在DStream microbatch中的个人RDD。要做到这一点:

counts.foreachRDD { rdd => 
    ...   
    rdd.saveAsTextFiles(s"/home/hadoop/datafile-$timestamp") 

} 
0

API documentation提到API为 “saveAsTextFiles”

saveAsTextFiles(String prefix, String suffix) 

保存每个RDD在此DSTREAM如在文本文件中,使用字符串元素的 表示。

相关问题