2016-07-29 96 views
0

使用mapPartition比方说,我有以下数据框:星火:使用Scala

var randomData = Seq(("a",8),("h",5),("f",3),("a",2),("b",8),("c",3) 
val df = sc.parallelize(randomData,2).toDF() 

,我有这个功能,这将是为mapPartition输入:

def trialIterator(row:Iterator[(String,Int)]): Iterator[(String,Int)] = 
    row.toArray.tail.toIterator 

而且使用地图分区:

df.mapPartition(trialIterator) 

我有以下错误信息:

类型不匹配,预期(迭代[行])=>迭代[NotInferedR],则实际:迭代[(字符串,强度)=>迭代[(字符串,整数)]

我可以明白这是由于我的函数的输入,输出类型,但如何解决这个问题呢?

回答

2

如果你想获得强类型输入在这个特殊的情况下不使用Dataset[Row]DataFrame),但Dataset[T]其中T(String, Int)。还没有转换为Array,如果分区是空的不叫盲目tail不知道:

def trialIterator(iter: Iterator[(String, Int)]) = iter.drop(1) 

randomData 
    .toDS // org.apache.spark.sql.Dataset[(String, Int)] 
    .mapPartitions(trialIterator _) 

randomData.toDF // org.apache.spark.sql.Dataset[Row] 
    .as[(String, Int)] // org.apache.spark.sql.Dataset[(String, Int)] 
    .mapPartitions(trialIterator _) 
+0

感谢您的回答。这里的功能只是为了说明我的问题。不是我想用的那个。为什么我不应该使用数据框? –

+2

因为实际应用'DataFrame'只是一个'数据集[Seq [Any]]',所以你可以简单地认为它是无类型的/不安全的。 – zero323

0

你需要使用类型为Iterator[(String,Int)],而你应该期望Iterator[Row]

def trialIterator(row:Iterator[Row]): Iterator[(String,Int)] = { 
    row.next() 
    row //seems to do the same thing w/o all the conversions 
}