2015-06-20 78 views
4

我们需要高效的转换键/值对的大名单,像这样:转弯列表到每个键的值列表火花

val providedData = List(
     (new Key("1"), new Val("one")), 
     (new Key("1"), new Val("un")), 
     (new Key("1"), new Val("ein")), 
     (new Key("2"), new Val("two")), 
     (new Key("2"), new Val("deux")), 
     (new Key("2"), new Val("zwei")) 
) 

到每个关键值的列表,这样:

val expectedData = List(
    (new Key("1"), List(
    new Val("one"), 
    new Val("un"), 
    new Val("ein"))), 
    (new Key("2"), List(
    new Val("two"), 
    new Val("deux"), 
    new Val("zwei"))) 
) 

的关键值对是从一个大的键/值存储(Accumulo),所以各键将被排序,但通常将横火花分区边界。每个密钥可以有数百万个密钥和数百个值。

我认为这个工作的正确工具是spark的combineByKey操作,但一直只能找到带泛型类型(比如Int)的简洁示例,我一直无法推广到用户定义的类型,比如上面。

因为我怀疑许多其他人会有同样的问题,我希望有人可以提供完全指定(详细)和scala语法的例子,用于使用combineByKey与上面的用户定义类型,或可能点找出我错过的更好的工具。

+0

对不起,虚惊一场。它的编译代码很好,只是不在REPL中。我现在很好。谢谢! – Bradjcox

+1

这是一个公开的问题https://issues.apache.org/jira/browse/SPARK-2620 –

回答

4

我不是一个真正的星火专家,但基于this question,我认为你可以做到以下几点:

val rdd = sc.parallelize(providedData) 

rdd.combineByKey(
    // createCombiner: add first value to a list 
    (x: Val) => List(x), 
    // mergeValue: add new value to existing list 
    (acc: List[Val], x) => x :: acc, 
    // mergeCominber: combine the 2 lists 
    (acc1: List[Val], acc2: List[Val]) => acc1 ::: acc2 
) 

使用aggregateByKey

rdd.aggregateByKey(List[Val]())(
    (acc, x) => x :: acc, 
    (acc1, acc2) => acc1 ::: acc2 
) 
+0

嗯; (1,List(ein)),(1,List(one)),(1,List,List(ein))Array((Key,List [Val])] (un)),(2,List(deux)),(2,List(two)),(2,List(zwei))) – Bradjcox

+0

@Bradjcox你实现了'Key'作为case类还是normal类?在后一种情况下,您应该重写equals方法。尝试'case class key(key:String)' –

+0

这里是我正在使用的Key类:@SerialVersionUID(123L) case class Key(v:String)extends Serializable {val_l = v; 重写def toString:String = {return n; } 倍率DEF等于(○:任何)= O匹配{ 情况下:密钥=> that.n.equals(this.n) 情况_ =>假 } 倍率DEF的hashCode = n.hashCode } – Bradjcox