2016-11-26 235 views
1

我正在使用Spark v1.5.2。我用Python编写了一个程序,我不明白为什么它会读取两次输入文件。用Scala编写的相同程序只能读取一次输入文件。Spark:为什么Python读取输入文件两次,但Scala不创建DataFrame时?

我使用累加器来计算调用map()的次数。从累加器值中,我推断输入文件被读取的次数。
输入文件包含3行文本。

的Python:

from pyspark import SparkContext, SQLContext 
from pyspark.sql.types import * 

def createTuple(record): # used with map() 
    global map_acc 
    map_acc += 1 
    return (record[0], record[1].strip()) 

sc   = SparkContext(appName='Spark test app') # appName is shown in the YARN UI 
sqlContext = SQLContext(sc) 
map_acc = sc.accumulator(0) 
lines  = sc.textFile("examples/src/main/resources/people.txt") 
people_rdd = lines.map(lambda l: l.split(",")).map(createTuple) #.cache() 
fieldNames = 'name age' 
fields  = [StructField(field_name, StringType(), True) for field_name in fieldNames.split()] 
schema  = StructType(fields) 
df   = sqlContext.createDataFrame(people_rdd, schema) 
print 'record count DF:', df.count() 
print 'map_acc:', map_acc.value 
#people_rdd.unpersist() 
$ spark-submit --master local[1] test.py 2> err 
record count DF: 3 
map_acc: 6    ##### why 6 instead of 3?? 

斯卡拉:

import org.apache.spark._ 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.types.{StructType,StructField,StringType}; 

object SimpleApp { 
    def main(args: Array[String]) { 
    def createTuple(record:Array[String], map_acc: Accumulator[Int]) = { // used with map() 
     map_acc += 1 
     Row(record(0), record(1).trim) 
    } 
    val conf  = new SparkConf().setAppName("Scala Test App") 
    val sc   = new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    val map_acc = sc.accumulator(0) 
    val lines  = sc.textFile("examples/src/main/resources/people.txt") 
    val people_rdd = lines.map(_.split(",")).map(createTuple(_, map_acc)) 
    val fieldNames = "name age" 
    val schema  = StructType(
     fieldNames.split(" ").map(fieldName => StructField(fieldName, StringType, true))) 
    val df   = sqlContext.createDataFrame(people_rdd, schema) 
    println("record count DF: " + df.count) 
    println("map_acc: " + map_acc.value) 
    } 
} 
$ spark-submit ---class SimpleApp --master local[1] test.jar 2> err 
record count DF: 3 
map_acc: 3 

如果我删除从Python程序的意见和缓存RDD,然后输入文件不读两次。但是,我认为我不应该缓存RDD,对吧?在Scala版本中,我不需要缓存RDD。

people_rdd = lines.map(lambda l: l.split(",")).map(createTuple).cache() 
... 
people_rdd.unpersist() 
$ spark-submit --master local[1] test.py 2> err 
record count DF: 3 
map_acc: 3 
$ hdfs dfs -cat examples/src/main/resources/people.txt 
Michael, 29 
Andy, 30 
Justin, 19 

回答

1

这是因为在1.5 createDataFrameeagerly validates provided schema on a few elements

elif isinstance(schema, StructType): 
    # take the first few rows to verify schema 
    rows = rdd.take(10) 
    for row in rows: 
     _verify_type(row, schema) 

相比之下当前版本 validate schema for all elements but it is done lazily,你不会看到相同的行为。例如,这将在1.5瞬间失败:当您尝试评估DataFrame

from pyspark.sql.types import * 

rdd = sc.parallelize([("foo",)]) 
schema = StructType([StructField("foo", IntegerType(), False)]) 
sqlContext.createDataFrame(rdd, schema) 

但2.0当量会失败。

一般而言,除非您严格限制自己与SQL API的交互,否则不应该期望Python和Scala代码的行为方式相同。 PySpark:

  • 本地实现几乎所有的RDD方法,因此相同的转换链可以导致不同的DAG。
  • 与Java API的交互可能需要进行急切的评估以提供Java类的类型信息。
相关问题