2014-11-05 64 views
2
val sc = new SparkContext("local[4]", "wc") 

    val lines: RDD[String] = sc.textFile("/tmp/inputs/*") 
    val errors = lines.filter(line => line.contains("ERROR")) 

    // Count all the errors 
    println(errors.count()) 

上面的代码会计算包含单词ERROR的行数。是否有一个与“contains”类似的简化函数,它将返回该单词的出现次数?apache spark中每个单词的发生次数

表示该文件是在Gigs方面,我想用parallelalize使用火花集群的努力。

回答

1

就指望每行的实例,总结那些在一起:

val errorCount = lines.map{line => line.split("[\\p{Punct}\\p{Space}]").filter(_ == "ERROR").size}.reduce(_ + _) 
+1

我认为'map'和'filter'应该由星火得到流水线在一起,所以我会感到惊讶,如果这些优化减少了内存消费。 Spark应该永远不会实现完整的中间'lines.map(...)'数据集;我认为简单的'lines.filter(...)。count()'应该非常高效。 – 2014-11-05 16:45:32

+0

@JoshRosen有趣的。谢谢!。关于内存消耗,我指的是RDD的大小,因为RDD [Int]的大小应该大于RDD [String],这可能是有益的,假设RDD被进一步使用,但是filter(.. )。count“确实很简单。 – maasg 2014-11-05 17:11:56

+0

Tokenizer.scala:39:value collect is not the member of Int [error] val errors = lines.flatMap {line => if(line.contains(“ERROR”))Some(1)else None} .reduce( _ + _)。collect ????但上面的代码片段显示了一种计算包含ERROR的行数而不是ERROR出现次数的方法。 – Siva 2014-11-06 05:46:43

相关问题