警告:我是spark和scala的新手。我发现堆栈溢出的一些问题与我的非常相似,但一直没能将这些问题转化为我的问题。Spark Scala创建具有排序日期值的RDD对
上下文。我有一对RDD最初与窗体(id,日期)的记录,我想创建窗体(ID,last_date_seen)的RDD。在原始数据中,日期是一个字符串,我使用Joda转换为DateTime
。
我已经使用combineByKey
成功完成了这项工作,并且我明白groupByKey
效率低下,在大的情况下这可能不实用,但我试图了解使用调用范围的方法。
我想要做的是groupByKey
然后mapValues
,以groupByKey
产生的值列表获得列表中的最大值。
我已经试过:(我创建了一个基于不同的堆栈溢出的问题上的日期时间排序,所以排序)
我已经尝试了许多方法,并大多数都给我一个例外,即任务不可序列化。一个例子是,
rdd.groupByKey().mapValues(_.toList.sorted.last)
我已经试过任何数量的这种变种;)没有toList
,我得到整理例外不是Iterable[org.joda.time.DateTime]
成员。我成功地使用了mapValues
并做了更简单的事情,但是一旦我尝试添加排序,事情就会变糟糕。我试过sortBy
并指定Ordering
。
深入了解为什么发送到排序方法的东西不可序列化,这对我总体上会有帮助。当我陷入陷阱时,我不知道该如何识别。
其中一个类似的堆栈溢出问题表明,您可以仅使用sortBy
而不是使用mapValues
,并指定它位于第二个元素上,因此.sortBy(_._2)
。这对我来说也失败了。理想情况下,如果这样做是有道理的,我也想知道。
这似乎是一个非常简单而且可能很常见的事情,所以我觉得我错过了一些东西。
编辑 - 为例外的更多细节添加。请注意,虽然我无法重现此错误。
错误堆栈中的不可序列化错误表明我在另一个堆栈溢出中使用的隐式排序是罪魁祸首。请注意,我无法重现这个让我几个小时感到困扰的错误(请参阅答案)。
Caused by: java.io.NotSerializableException:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$
Serialization stack:
- object not serializable (class:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$,
value: $i[email protected]6fc2db37)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC,
name: Joda$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$)
Joda模块刚刚定义过。
object Joda {
implicit def dateTimeOrdering: Ordering[DateTime] =
Ordering.fromLessThan(_ isBefore _)
}
只是'rdd.reduceByKey((x,y)=> if(x.isAfter(y))x else y)'并确保阅读http://stackoverflow.com/a/33439328/1560062 – zero323
谢谢@ zero323。另一个更紧凑的combineByKey版本,我还没有尝试过。似乎在Spark中遇到序列化问题是很常见的,所以尽管我现在为我的小练习获得了多个解决方案,但我想更好地理解为什么要使用toList和排序抛出错误。也许我的问题应该重写到导致序列化错误的原因。 –
对于序列化问题,您应该检查链接的问题。 Joda类实际上不是Spark友好的。 – zero323