2017-04-13 72 views
1

我有一个火花数据帧 DF与模式作为这样:如何在Spark数据框中进行组合并进行过滤?

[id:string, label:string, tags:string] 

id | label | tag 
---|-------|----- 
1 | h  | null 
1 | w  | x 
1 | v  | null 
1 | v  | x 
2 | h  | x 
3 | h  | x 
3 | w  | x 
3 | v  | null 
3 | v  | null 
4 | h  | null 
4 | w  | x 
5 | w  | x 

(H,W,V是标签x可以是任何非空值)

每个ID,有最多只有一个标签“h”或“w”,但可能会有多个“v”。我想选择所有符合以下条件的ID:

每个ID都有: 1.一个标签“h”及其标签= null, 2.一个标签“w”及其标签!= null, 3.每个ID至少有一个标签“v”。

我在想,我需要创建三列检查上述条件。然后我需要通过“id”做一个组。

val hCheck = (label: String, tag: String) => {if (label=="h" && tag==null) 1 else 0} 
val udfHCheck = udf(hCheck) 
val wCheck = (label: String, tag: String) => {if (label=="w" && tag!=null) 1 else 0} 
val udfWCheck = udf(wCheck) 
val vCheck = (label: String) => {if (label==null) 1 else 0} 
val udfVCheck = udf(vCheck) 

dfx = df.withColumn("hCheck", udfHCheck(col("label"), col("tag"))) 
     .withColumn("wCheck", udfWCheck(col("label"), col("tag"))) 
     .withColumn("vCheck", udfVCheck(col("label"))) 
     .select("id","hCheck","wCheck","vCheck") 
     .groupBy("id") 

不知何故我需要组三列{ “H检查”, “W检查”, “需求,vCheck”}入列表的矢量[X,0,0],[0,X,0],[0, 0,x]中。并检查这些向量是否包含全部三个{[1,0,0],[0,1,0],[0,0,1]}

我还没有能够解决这个问题呢。并且可能会有比这更好的方法。希望有人能给我建议。由于

回答

2

要转换三个检查向量可以这样做: 具体来说,你可以这样做:

val df1 = df.withColumn("hCheck", udfHCheck(col("label"), col("tag"))) 
      .withColumn("wCheck", udfWCheck(col("label"), col("tag"))) 
      .withColumn("vCheck", udfVCheck(col("label"))) 
      .select($"id",array($"hCheck",$"wCheck",$"vCheck").as("vec")) 

下一步GROUPBY返回上您需要执行聚合组合对象。具体要得到所有你应该做的事情,如向量:

.groupBy("id").agg(collect_list($"vec")) 

你也不必为各种检查的UDF。你可以用列语义来做到这一点。

with($"label" == lit("h") && tag.isnull 1).otherwise(0) 

顺便说一句,你说你想要一个标签“V”每个但在需求,vCheck你只是检查标签是否为空:例如udfHCheck可以作为被写入。

更新:替代解决方案

在再次寻找关于这个问题,我会做这样的事情:

val grouped = df.groupBy("id", "label").agg(count("$label").as("cnt"), first($"tag").as("tag")) 
val filtered1 = grouped.filter($"label" === "v" || $"cnt" === 1) 
val filtered2 = filtered.filter($"label" === "v" || ($"label" === "h" && $"tag".isNull) || ($"label" === "w" && $"tag".isNotNull)) 
val ids = filtered2.groupBy("id").count.filter($"count" === 3) 

的想法是,首先我们GROUPBY两个编号和标签,所以我们有信息在组合上。我们收集的信息是多少个值(cnt)和第一个元素(哪个不重要)。

现在我们做两个过滤步骤: 1.我们需要正好一个h和一个w以及任意数量的v,所以第一个过滤器可以获得这些情况。 2.我们确保每个案例都符合所有规则。

现在我们只有id和label匹配的规则组合,所以为了使id合法,我们需要正好有三个标签实例。这导致了第二个组,它简单地计数匹配规则的标签的数量。我们需要三个合法的(即符合所有规则)。

+0

我找到了解决方法,将数据帧转换回rdd。但是你的解决方案很好,我喜欢它。非常感谢你! – neikusc

相关问题