2016-09-16 115 views
1

警告:我是spark和scala的新手。我发现堆栈溢出的一些问题与我的非常相似,但一直没能将这些问题转化为我的问题。Spark Scala创建具有排序日期值的RDD对

上下文。我有一对RDD最初与窗体(id,日期)的记录,我想创建窗体(ID,last_date_seen)的RDD。在原始数据中,日期是一个字符串,我使用Joda转换为DateTime

我已经使用combineByKey成功完成了这项工作,并且我明白groupByKey效率低下,在大的情况下这可能不实用,但我试图了解使用调用范围的方法。

我想要做的是groupByKey然后mapValues,以groupByKey产生的值列表获得列表中的最大值。

我已经试过:(我创建了一个基于不同的堆栈溢出的问题上的日期时间排序,所以排序)

我已经尝试了许多方法,并大多数都给我一个例外,即任务不可序列化。一个例子是,

rdd.groupByKey().mapValues(_.toList.sorted.last) 

我已经试过任何数量的这种变种;)没有toList,我得到整理例外不是Iterable[org.joda.time.DateTime]成员。我成功地使用了mapValues并做了更简单的事情,但是一旦我尝试添加排序,事情就会变糟糕。我试过sortBy并指定Ordering

深入了解为什么发送到排序方法的东西不可序列化,这对我总体上会有帮助。当我陷入陷阱时,我不知道该如何识别。

其中一个类似的堆栈溢出问题表明,您可以仅使用sortBy而不是使用mapValues,并指定它位于第二个元素上,因此.sortBy(_._2)。这对我来说也失败了。理想情况下,如果这样做是有道理的,我也想知道。

这似乎是一个非常简单而且可能很常见的事情,所以我觉得我错过了一些东西。

编辑 - 为例外的更多细节添加。请注意,虽然我无法重现此错误。

错误堆栈中的不可序列化错误表明我在另一个堆栈溢出中使用的隐式排序是罪魁祸首。请注意,我无法重现这个让我几个小时感到困扰的错误(请参阅答案)。

Caused by: java.io.NotSerializableException:  
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$ 
Serialization stack: 
    - object not serializable (class: 
    $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$, 
    value:  $i[email protected]6fc2db37) 
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, 
name: Joda$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$) 

Joda模块刚刚定义过。

object Joda { 
implicit def dateTimeOrdering: Ordering[DateTime] = 
    Ordering.fromLessThan(_ isBefore _) 
} 
+1

只是'rdd.reduceByKey((x,y)=> if(x.isAfter(y))x else y)'并确保阅读http://stackoverflow.com/a/33439328/1560062 – zero323

+0

谢谢@ zero323。另一个更紧凑的combineByKey版本,我还没有尝试过。似乎在Spark中遇到序列化问题是很常见的,所以尽管我现在为我的小练习获得了多个解决方案,但我想更好地理解为什么要使用toList和排序抛出错误。也许我的问题应该重写到导致序列化错误的原因。 –

+1

对于序列化问题,您应该检查链接的问题。 Joda类实际上不是Spark友好的。 – zero323

回答

0

我原来的问题确实有两个组成部分: *如何改造利用GROUPBY的RDD以检索每个ID中的最后查看日期和 *为什么我会想尽了办法给人一种“任务不可序列化“的错误。

不幸的是,在重新启动spark-shell并回溯我的步骤后,我无法重现此错误。我在问题中列出的代码与我已经建立的DateTime顺序配对,工作得很好。最近我又遇到了另一个类似的问题,我可以将它追溯到隐式值的冲突中,这是我之前在shell中为完全不同的目的设置的。我怀疑这也是这里的罪魁祸首,但无法证实这一点。

评论中引用的另一个堆栈溢出问题表示Joda引起了其他问题。

为了完整起见,我能够进行转换并从几种方式中提取出最后一个日期。 @ zero323在使用reduceByKey的评论中给出的最直接的方式。

使用groupByKey,在问题的代码

rdd.groupByKey().mapValues(_.toList.sorted.last) 

正常工作时下面的隐式排序是到位:

object Joda { 
    implicit def dateTimeOrdering: Ordering[DateTime] = 
    Ordering.fromLessThan(_ isBefore _)} 
import Joda._ 

同样,

rdd.groupByKey.mapValues(_.toList.max) 

产生相同的。

我也使用定义的顺序复制结果并传递给显式排序。

不幸的是,我无法确定为什么对象乔达在第一次会议中抛出了一个异常,并没有在我尝试的下一个。