我有以下形式的数据帧:如何在星火应用自定义过滤功能,数据帧
A_DF = |id_A: Int|concatCSV: String|
和另一个问题:
B_DF = |id_B: Int|triplet: List[String]|
的concatCSV
例子可能看起来像:
"StringD, StringB, StringF, StringE, StringZ"
"StringA, StringB, StringX, StringY, StringZ"
...
而triplet
是这样的:
("StringA", "StringF", "StringZ")
("StringB", "StringU", "StringR")
...
我想产生笛卡尔一套A_DF
和B_DF
,例如;
| id_A: Int | concatCSV: String | id_B: Int | triplet: List[String] |
| 14 | "StringD, StringB, StringF, StringE, StringZ" | 21 | ("StringA", "StringF", "StringZ")|
| 14 | "StringD, StringB, StringF, StringE, StringZ" | 45 | ("StringB", "StringU", "StringR")|
| 18 | "StringA, StringB, StringX, StringY, StringG" | 21 | ("StringA", "StringF", "StringZ")|
| 18 | "StringA, StringB, StringX, StringY, StringG" | 45 | ("StringB", "StringU", "StringR")|
| ... | | | |
然后保持这一点至少有两个子(例如StringA, StringB
)从A_DF("concatCSV")
出现在B_DF("triplet")
,即使用filter
排除那些不符合这个条件的记录。
第一个问题是:我可以做到这一点,而无需将DF转换成RDD?
第二个问题是:我可以做理想整个事情的join
一步 - 为where
条件?
我曾尝试用类似的实验:
val cartesianRDD = A_DF
.join(B_DF,"right")
.where($"triplet".exists($"concatCSV".contains(_)))
但where
不能得到解决。我用filter
而不是where
尝试过,但仍然没有运气。此外,由于某种奇怪的原因,键入cartesianRDD
的注释是SchemaRDD
而不是DataFrame
。我是怎么结束的?最后,我上面所尝试的(我写的短代码)是不完整的,因为它将保留仅在中发现的一个子字符串的记录,其在triplet
中找到。
因此,第三个问题是:我应该改用RDDs并用自定义过滤函数解决它吗?
最后,最后一个问题:我可以对DataFrames使用自定义过滤功能吗?
感谢您的帮助。
不应该的类型'triplet'是'List [(String,String,String)]'? –
你还使用什么版本的Spark? –
谢谢,修正它在示例 - 糟糕的措辞。我使用Spark 1.5.2 –