0

我有一个包含许多字段的JSON文件。我在java中使用spark的Dataset读取文件。在Spark数据集中使用custome UDF withColumn <Row>; java.lang.String不能转换为org.apache.spark.sql.Row

  • 星火版本2.2.0

  • JAVA JDK 1.8.0_121

下面是代码。

SparkSession spark = SparkSession 
       .builder() 
       .appName("Java Spark SQL basic example") 
       .config("spark.some.config.option", "some-value") 
       .master("local") 
       .getOrCreate(); 

Dataset<Row> df = spark.read().json("jsonfile.json"); 

我想使用withColumn函数与自定义的UDF添加一个新的列。

UDF1 someudf = new UDF1<Row,String>(){ 
     public String call(Row fin) throws Exception{ 
      String some_str = fin.getAs("String"); 
      return some_str; 
     } 
    }; 
spark.udf().register("some_udf", someudf, DataTypes.StringType); 
df.withColumn("procs", callUDF("some_udf", col("columnx"))).show(); 

当我运行上面的代码时出现转换错误。 java.lang.String中不能被强制转换为org.apache.spark.sql.Row

问题:

1 - 正在读入行的数据集的唯一选择?我可以将df转换为字符串的df。但我无法选择字段。

2 - 试过但未能定义用户定义的数据类型。我无法使用此自定义UDDatatype注册UDF。我需要用户定义的数据类型吗?

3 - 和主要问题,我怎么能从字符串投到行?日志

以下部分被复制:

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row 
    at Risks.readcsv$1.call(readcsv.java:1) 
    at org.apache.spark.sql.UDFRegistration$$anonfun$27.apply(UDFRegistration.scala:512) 
     ... 16 more 

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$27: (string) => string) 

您的帮助将不胜感激。

回答

3

由于UDF将在列的数据类型(不是)上执行,因此您正在获取该异常。考虑我们有Dataset<Row> ds其中有两列col1col2都是字符串类型。现在,如果我们想使用UDFcol2的值转换为大写。

我们可以注册并拨打电话UDF,如下所示。

spark.udf().register("toUpper", toUpper, DataTypes.StringType); 
ds.select(col("*"),callUDF("toUpper", col("col2"))).show(); 

或者使用withColumn

ds.withColumn("Upper",callUDF("toUpper", col("col2"))).show(); 

而且UDF应该像下面。

private static UDF1 toUpper = new UDF1<String, String>() { 
    public String call(final String str) throws Exception { 
     return str.toUpperCase(); 
    } 
}; 
+0

太棒了,我需要仔细阅读文档。非常感谢 – valearner

相关问题