4

从数据帧获得一个重复计数我有一个看起来像这样使用Apache星火

+--------------+---------+-------+---------+ 
|  dataOne|OtherData|dataTwo|dataThree| 
+--------------+---------|-------+---------+ 
|   Best|  tree|  5|  533| 
|   OK|  bush|  e|  3535| 
|   MEH|  cow|  -|  3353| 
|   MEH|  oak| none|  12| 
+--------------+---------+-------+---------+ 

数据,我想进入它的

+--------------+---------+ 
|  dataOne| Count| 
+--------------+---------| 
|   Best|  1| 
|   OK|  1| 
|   Meh|  2| 
+--------------+---------+ 

输出我没问题将dataOne自己获取到数据框中并显示它的内容以确保我只抓取dataOne列 但是,我似乎无法找到将该sql查询转换为数据的正确语法我需要。我试图创建从整个数据创建临时鉴于这种下列数据框设置

Dataset<Row> dataOneCount = spark.sql("select dataOne, count(*) from 
dataFrame group by dataOne"); 
dataOneCount.show(); 

但引发 我能找到这只是演示了如何做这类聚集的火花1.6和以前这样的文档任何帮助,将不胜感激。

这是我得到的错误消息,但是我检查了数据,并且在那里没有索引错误。

java.lang.ArrayIndexOutOfBoundsException: 11 

我也尝试应用功能()方法countDistinct

Column countNum = countDistinct(dataFrame.col("dataOne")); 
Dataset<Row> result = dataOneDataFrame.withColumn("count",countNum); 
result.show(); 

其中dataOneDataFrame是从运行

select dataOne from dataFrame 

创建的数据帧,但它返回一个分析异常,我米仍然是新的火花,所以我不知道如何/当我正在评估countDistinct方法有错误

编辑:为了澄清,显示的第一个表是我从阅读的文本文件,并应用自定义模式,以它所创建的数据帧的结果(他们仍然是所有的字符串)

Dataset<Row> dataFrame 

这里是我的完整代码

public static void main(String[] args) { 


    SparkSession spark = SparkSession 
      .builder() 
      .appName("Log File Reader") 
      .getOrCreate(); 

    //args[0] is the textfile location 
    JavaRDD<String> logsRDD = spark.sparkContext() 
      .textFile(args[0],1) 
      .toJavaRDD(); 

    String schemaString = "dataOne OtherData dataTwo dataThree"; 

    List<StructField> fields = new ArrayList<>(); 
    String[] fieldName = schemaString.split(" "); 


    for (String field : fieldName){ 
     fields.add(DataTypes.createStructField(field, DataTypes.StringType, true)); 
    } 
    StructType schema = DataTypes.createStructType(fields); 

    JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> { 
     String[] attributes = record.split(" "); 
     return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3]); 
    }); 


    Dataset<Row> dF = spark.createDataFrame(rowRDD, schema); 

    //first attempt 
    dF.groupBy(col("dataOne")).count().show(); 

    //Trying with a sql statement 
    dF.createOrReplaceTempView("view"); 
    dF.sparkSession().sql("select command, count(*) from view group by command").show(); 

想到的最可能的事情是使用RowFactory返回行的lambda函数?这个想法听起来很合理,但我不确定它是如何坚持下去的,或者有另一种方式可以做到。除此之外,我挺纳闷的

样本数据

best tree 5 533 
OK bush e 3535 
MEH cow - 3353 
MEH oak none 12 

回答

2

使用Scala的语法方便。这是非常类似于Java的语法:

// Input data 
val df = { 
    import org.apache.spark.sql._ 
    import org.apache.spark.sql.types._ 
    import scala.collection.JavaConverters._ 

    val simpleSchema = StructType(
    StructField("dataOne", StringType) :: 
    StructField("OtherData", StringType) :: 
    StructField("dataTwo", StringType) :: 
    StructField("dataThree", IntegerType) :: Nil) 

    val data = List(
    Row("Best", "tree", "5", 533), 
    Row("OK", "bush", "e", 3535), 
    Row("MEH", "cow", "-", 3353), 
    Row("MEH", "oak", "none", 12) 
) 

    spark.createDataFrame(data.asJava, simpleSchema) 
} 

df.show 
+-------+---------+-------+---------+ 
|dataOne|OtherData|dataTwo|dataThree| 
+-------+---------+-------+---------+ 
| Best|  tree|  5|  533| 
|  OK|  bush|  e|  3535| 
| MEH|  cow|  -|  3353| 
| MEH|  oak| none|  12| 
+-------+---------+-------+---------+ 
df.groupBy(col("dataOne")).count().show() 
+-------+-----+ 
|dataOne|count| 
+-------+-----+ 
| MEH| 2| 
| Best| 1| 
|  OK| 1| 
+-------+-----+ 

我可以提交以上S3的四列数据文件如下给出的Java代码它工作正常:

$SPARK_HOME/bin/spark-submit \ 
    --class sparktest.FromStackOverflow \ 
    --packages "org.apache.hadoop:hadoop-aws:2.7.3" \ 
    target/scala-2.11/sparktest_2.11-1.0.0-SNAPSHOT.jar "s3a://my-bucket-name/sample.txt" 
+0

我已经在我的java程序中尝试了这种方法,并且它返回java.lang.ArrayIndexOutOfBoundsException:11 – Sentinel

+0

你能用一个小的隔离示例来重现它吗?您使用的是什么版本的Spark?你尝试过别人吗? – clay

+0

使用spark 2.1添加了完整的代码,我不想退回到以前版本的火花,尽管 – Sentinel