2017-08-01 1177 views
1

我对scala和spark非常陌生,我一直在试图为这个问题找到一整天的解决方案 - 它在做我的头。我尝试了以下代码的20种不同变体,并在尝试对列进行计算时不断得到type mismatch错误。Spark(scala)dataframes - 检查列中的字符串是否包含集合中的任何项目

我有一个火花数据框,我想检查一个特定列中的每个字符串是否包含来自预定义的List(或Set)字的任何数量的字。

这里是复制一些示例数据:

// sample data frame 
val df = Seq(
     (1, "foo"), 
     (2, "barrio"), 
     (3, "gitten"), 
     (4, "baa")).toDF("id", "words") 

// dictionary Set of words to check 
val dict = Set("foo","bar","baaad") 

现在,我想创建一个比较结果的第三列,看看内他们在$"words"列中的字符串包含任何的dict单词组词。所以结果应该是:

+---+-----------+-------------+ 
| id|  words| word_check| 
+---+-----------+-------------+ 
| 1|  foo|   true|  
| 2|  bario|   true| 
| 3|  gitten|  false| 
| 4|  baa|  false| 
+---+-----------+-------------+ 

首先,我想看看我是否能本身做没有使用UDF的,因为字典设置实际上是一个大字典> 40K字,按照我的理解这个会比UDF更高效:

df.withColumn("word_check", dict.exists(d => $"words".contains(d))) 

,但我得到的错误:

type mismatch; 
found : org.apache.spark.sql.Column 
required: Boolean 

我也试图创建一个UDF做到这一点(同时使用mutable.Setmutable.WrappedArray来形容设置 - 不知道这是正确的,但既没有工作):

val checker: ((String, scala.collection.mutable.Set[String]) => Boolean) = (col: String, array: scala.collection.mutable.Set[String]) => array.exists(d => col.contains(d)) 

val udf1 = udf(checker) 

df.withColumn("word_check", udf1($"words", dict)).show() 

却得到另一种类型不匹配:

found : scala.collection.immutable.Set[String] 
required: org.apache.spark.sql.Column 

如果设置为一个固定的号码,我应该能够使用Lit(Int)的表达式?但我不明白在一列上执行更复杂的功能,通过在scala中混合不同的数据类型。

任何帮助非常感谢,特别是如果它可以有效地完成(这是一个大于5米行df)。

回答

1

如果你的字典很大,你不应该只在你的udf中引用它,因为整个字典是通过网络发送的每一个任务。我会广播你的字典与udf结合使用:

import org.apache.spark.broadcast.Broadcast 

def udf_check(words: Broadcast[scala.collection.immutable.Set[String]]) = { 
    udf {(s: String) => words.value.exists(s.contains(_))} 
} 

df.withColumn("word_check", udf_check(sparkContext.broadcast(dict))($"words")) 
5

无论效率的,这似乎工作:

df.withColumn("word_check", dict.foldLeft(lit(false))((a, b) => a || locate(b, $"words") > 0)).show 

+---+------+----------+ 
| id| words|word_check| 
+---+------+----------+ 
| 1| foo|  true| 
| 2|barrio|  true| 
| 3|gitten|  false| 
| 4| baa|  false| 
+---+------+----------+ 
+0

This Works,thank you。但就效率而言,使用40K字典列表可能会非常昂贵。 – renegademonkey

6

这里是你如何使用UDF做到这一点:

val checkerUdf = udf { (s: String) => dict.exists(s.contains(_)) } 

df.withColumn("word_check", checkerUdf($"words")).show() 

实现流程的错误是,你已经创建了一个UDF需要两个参数,这意味着在应用它时必须通过两个Column - 但在您的DataFrame中dict不是Column,而是本地可用的。

+0

字典也可以播放它很大 –

+0

aha!这真的能够澄清事情,并且是一个很好的答案,但是我认为我可能不得不采用@拉斐尔罗斯的建议,因为在这种情况下效率将非常重要。 – renegademonkey

相关问题