2016-05-14 55 views
0

我使用带PySpark的Jupyter Notebook。在那里我有一个数据框架,这些数据架构有一个列名和类型(整数,...)的列。现在我使用flatMap这样的方法,但是这会返回一个没有固定类型的元组列表。有没有办法实现这一点?PySpark平面图应该返回带有类型值的元组

df.printSchema() 
root 
|-- name: string (nullable = true) 
|-- ... 
|-- ... 
|-- ratings: integer (nullable = true) 

然后我用flatMap做的额定值一些计算(这里混淆):

df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)) 
y_rate.toDF().printSchema() 

现在我得到一个错误:

TypeError: Can not infer schema for type:

有什么办法通过保持模式使用map/flatMap/reduce?或者至少返回具有特定类型值的元组?

回答

1

首先,您使用的是错误的功能。 flatMapmapflatten所以假设你的数据是这样的:

df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"]) 

flatMap的输出将等同于:

sc.parallelize(['foo', 0, 'bar', 5]) 

因此,你看到的错误。如果你真的想使它工作,你应该使用map

df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF() 
## DataFrame[_1: string, _2: bigint] 

接着,在DataFrame映射不再2.0支持。您应首先提取rdd(请参阅上面的df.rdd.map)。

最后在Python和JVM之间传递数据效率极低。它不仅需要在Python和JVM之间传递数据以及相应的序列化/反序列化和模式推理(如果没有明确提供模式),这也会打破懒惰。这是更好地使用SQL表达式这样的事情:

from pyspark.sql.functions import when 

df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings)) 

如果由于某种原因,你需要普通的Python代码的UDF可能是一个更好的选择。

+0

非常有帮助。感谢您的示例代码。我只是没有得到flatMap vs Map的部分。 – Matthias

+1

'flatMap'是一个函数'RDD [T] =>(T => Iterable [U])=> RDD [U]'。换句话说,它期望函数返回'Itereble'(Python元组),并连接这些(变平)结果。 – zero323

+0

有没有办法在该声明中给出when/otherwise列的名称?请参阅'df.select(df.id,when(df.ratings> 5,5).otherwise(df.ratings))'@ zero323 – Matthias

相关问题