3

我在想如果有可能创建一个UDF接收两个参数Column和另一个变量(Object,Dictionary或任何其他类型),然后做一些操作并返回结果。如何使用UDF将列与值进行比较?

其实,我试图做到这一点,但我得到了一个例外。因此,我想知道是否有办法避免这个问题。

df = sqlContext.createDataFrame([("Bonsanto", 20, 2000.00), 
           ("Hayek", 60, 3000.00), 
           ("Mises", 60, 1000.0)], 
           ["name", "age", "balance"]) 

comparatorUDF = udf(lambda c, n: c == n, BooleanType()) 

df.where(comparatorUDF(col("name"), "Bonsanto")).show() 

而且我得到以下错误:

AnalysisException: u"cannot resolve 'Bonsanto' given input columns name, age, balance;"

所以,很明显的是,UDF“看到” string“Bonsanto”作为列名,实际上我想比较的纪录值与第二个参数。

在另一方面,我知道这是可能使用一些运营商where子句中(但实际上我想知道,如果它是可以实现使用UDF),具体如下:

df.where(col("name") == "Bonsanto").show() 

#+--------+---+-------+ 
#| name|age|balance| 
#+--------+---+-------+ 
#|Bonsanto| 20| 2000.0| 
#+--------+---+-------+ 

回答

9

一切,传递给UDF被解释为列/列名称。如果你想通过文字,你有两个选择:

  1. 传递参数,使用钻营:

    def comparatorUDF(n): 
        return udf(lambda c: c == n, BooleanType()) 
    
    df.where(comparatorUDF("Bonsanto")(col("name"))) 
    

    这可以与任何类型的参数,只要它是序列化的使用。

  2. 使用SQL文本和当前实现:

    from pyspark.sql.functions import lit 
    
    df.where(comparatorUDF(col("name"), lit("Bonsanto"))) 
    

    这仅适用于支持的类型(字符串,数字,布尔值)。

相关问题