2016-11-30 59 views
1

我有以下形式的数据帧:如何在星火应用自定义过滤功能,数据帧

A_DF = |id_A: Int|concatCSV: String| 

和另一个问题:

B_DF = |id_B: Int|triplet: List[String]| 

concatCSV例子可能看起来像:

"StringD, StringB, StringF, StringE, StringZ" 
"StringA, StringB, StringX, StringY, StringZ" 
... 

triplet是这样的:

("StringA", "StringF", "StringZ") 
("StringB", "StringU", "StringR") 
... 

我想产生笛卡尔一套A_DFB_DF,例如;

| id_A: Int | concatCSV: String        | id_B: Int | triplet: List[String]   | 
|  14 | "StringD, StringB, StringF, StringE, StringZ" |  21 | ("StringA", "StringF", "StringZ")| 
|  14 | "StringD, StringB, StringF, StringE, StringZ" |  45 | ("StringB", "StringU", "StringR")| 
|  18 | "StringA, StringB, StringX, StringY, StringG" |  21 | ("StringA", "StringF", "StringZ")| 
|  18 | "StringA, StringB, StringX, StringY, StringG" |  45 | ("StringB", "StringU", "StringR")| 
| ... |            |   |         | 

然后保持这一点至少有两个(例如StringA, StringB)从A_DF("concatCSV")出现在B_DF("triplet"),即使用filter排除那些不符合这个条件记录。

第一个问题是:我可以做到这一点,而无需将DF转换成RDD?

第二个问题是:我可以做理想整个事情的join一步 - 为where条件?

我曾尝试用类似的实验:

val cartesianRDD = A_DF 
    .join(B_DF,"right") 
    .where($"triplet".exists($"concatCSV".contains(_))) 

where不能得到解决。我用filter而不是where尝试过,但仍然没有运气。此外,由于某种奇怪的原因,键入cartesianRDD的注释是SchemaRDD而不是DataFrame。我是怎么结束的?最后,我上面所尝试的(我写的短代码)是不完整的,因为它将保留仅在中发现的一个子字符串的记录,其在triplet中找到。

因此,第三个问题是:我应该改用RDDs并用自定义过滤函数解决它吗?

最后,最后一个问题:我可以对DataFrames使用自定义过滤功能吗?

感谢您的帮助。

+0

不应该的类型'triplet'是'List [(String,String,String)]'? –

+0

你还使用什么版本的Spark? –

+0

谢谢,修正它在示例 - 糟糕的措辞。我使用Spark 1.5.2 –

回答

2

功能CROSS JOINHive实现的,所以你可以先做交叉联接使用Hive SQL

A_DF.registerTempTable("a") 
B_DF.registerTempTable("b") 

// sqlContext should be really a HiveContext 
val result = sqlContext.sql("SELECT * FROM a CROSS JOIN b") 

然后你可以过滤到使用两个udf你期望的输出。一个你的字符串转换为字的阵列,并且第二个,让我们相交所得阵列列的长度和现有柱"triplet"

import scala.collection.mutable.WrappedArray 
import org.apache.spark.sql.functions.col 

val splitArr = udf { (s: String) => s.split(",").map(_.trim) } 
val commonLen = udf { (a: WrappedArray[String], 
         b: WrappedArray[String]) => a.intersect(b).length } 

val temp = (result.withColumn("concatArr", 
    splitArr(col("concatCSV"))).select(col("*"), 
    commonLen(col("triplet"), col("concatArr")).alias("comm")) 
    .filter(col("comm") >= 2) 
    .drop("comm") 
    .drop("concatArr")) 

temp.show 
+----+--------------------+----+--------------------+ 
|id_A|   concatCSV|id_B|    triplet| 
+----+--------------------+----+--------------------+ 
| 14|StringD, StringB,...| 21|[StringA, StringF...| 
| 18|StringA, StringB,...| 21|[StringA, StringF...| 
+----+--------------------+----+--------------------+ 
+1

完美的答案。谢谢! –