2016-09-15 35 views
3

我在Zeppelin笔记本上使用了Spark,而groupByKey()似乎没有工作。Spark错误:无法找到存储在数据集中的类型的编码器

此代码:

df.groupByKey(row => row.getLong(0)) 
    .mapGroups((key, iterable) => println(key)) 

给我这个错误(可能是一个编译错误,因为它显示了在任何时间,而我的工作数据集是相当大的):

error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. 

我想补充的情况下阶层和地图我所有的行成,但仍然得到了同样的错误

import spark.implicits._ 

case class DFRow(profileId: Long, jobId: String, state: String) 

def getDFRow(row: Row):DFRow = { 
    return DFRow(row.getLong(row.fieldIndex("item0")), 
       row.getString(row.fieldIndex("item1")), 
       row.getString(row.fieldIndex("item2"))) 
} 

df.map(DFRow(_)) 
    .groupByKey(row => row.getLong(0)) 
    .mapGroups((key, iterable) => println(key)) 

我的数据框的模式是:

root 
|-- item0: long (nullable = true) 
|-- item1: string (nullable = true) 
|-- item2: string (nullable = true) 

回答

5

你试图mapGroups与函数(Long, Iterator[Row]) => Unit并没有EncoderUnit(而不是它将使意义有一个)。

Dataset API未集中在SQL DSL(DataFrame => DataFrameDataFrame => RelationalGroupedDatasetRelationalGroupedDataset => DataFrameRelationalGroupedDataset => RelationalGroupedDataset)的通用部分需要的输出值隐式或显式的编码器。

由于对象没有预定义的编码器,所以使用Dataset[Row]和静态类型数据的方法设计没有多大意义。作为一个经验法则,你应该总是转换为静态类型的变异第一:

df.as[(Long, String, String)] 

参见Encoder error while trying to map dataframe row to updated row

+0

谢谢您的完整的答案,@ zero323。我目前正在接受Spark,并且即使这个问题是愚蠢的,你对编码器的介绍也是非常有帮助的。但是,一个简单的问题是,如果不存在编码器,我该如何执行计算并获得像List这样的基本数据类型? – Wahbivic

+0

对于初学者来看看https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types。如果桌面上有东西在被修复。案例类的处理方式以及字段与SQL类型相对应。其余部分:http://stackoverflow.com/q/36648128/1560062 – zero323

相关问题