我正在使用Spark v1.5.2。我用Python编写了一个程序,我不明白为什么它会读取两次输入文件。用Scala编写的相同程序只能读取一次输入文件。Spark:为什么Python读取输入文件两次,但Scala不创建DataFrame时?
我使用累加器来计算调用map()
的次数。从累加器值中,我推断输入文件被读取的次数。
输入文件包含3行文本。
的Python:
from pyspark import SparkContext, SQLContext
from pyspark.sql.types import *
def createTuple(record): # used with map()
global map_acc
map_acc += 1
return (record[0], record[1].strip())
sc = SparkContext(appName='Spark test app') # appName is shown in the YARN UI
sqlContext = SQLContext(sc)
map_acc = sc.accumulator(0)
lines = sc.textFile("examples/src/main/resources/people.txt")
people_rdd = lines.map(lambda l: l.split(",")).map(createTuple) #.cache()
fieldNames = 'name age'
fields = [StructField(field_name, StringType(), True) for field_name in fieldNames.split()]
schema = StructType(fields)
df = sqlContext.createDataFrame(people_rdd, schema)
print 'record count DF:', df.count()
print 'map_acc:', map_acc.value
#people_rdd.unpersist()
$ spark-submit --master local[1] test.py 2> err
record count DF: 3
map_acc: 6 ##### why 6 instead of 3??
斯卡拉:
import org.apache.spark._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
object SimpleApp {
def main(args: Array[String]) {
def createTuple(record:Array[String], map_acc: Accumulator[Int]) = { // used with map()
map_acc += 1
Row(record(0), record(1).trim)
}
val conf = new SparkConf().setAppName("Scala Test App")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val map_acc = sc.accumulator(0)
val lines = sc.textFile("examples/src/main/resources/people.txt")
val people_rdd = lines.map(_.split(",")).map(createTuple(_, map_acc))
val fieldNames = "name age"
val schema = StructType(
fieldNames.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val df = sqlContext.createDataFrame(people_rdd, schema)
println("record count DF: " + df.count)
println("map_acc: " + map_acc.value)
}
}
$ spark-submit ---class SimpleApp --master local[1] test.jar 2> err
record count DF: 3
map_acc: 3
如果我删除从Python程序的意见和缓存RDD,然后输入文件不读两次。但是,我认为我不应该缓存RDD,对吧?在Scala版本中,我不需要缓存RDD。
people_rdd = lines.map(lambda l: l.split(",")).map(createTuple).cache()
...
people_rdd.unpersist()
$ spark-submit --master local[1] test.py 2> err
record count DF: 3
map_acc: 3
$ hdfs dfs -cat examples/src/main/resources/people.txt
Michael, 29
Andy, 30
Justin, 19