从数据帧获得一个重复计数我有一个看起来像这样使用Apache星火
+--------------+---------+-------+---------+
| dataOne|OtherData|dataTwo|dataThree|
+--------------+---------|-------+---------+
| Best| tree| 5| 533|
| OK| bush| e| 3535|
| MEH| cow| -| 3353|
| MEH| oak| none| 12|
+--------------+---------+-------+---------+
数据,我想进入它的
+--------------+---------+
| dataOne| Count|
+--------------+---------|
| Best| 1|
| OK| 1|
| Meh| 2|
+--------------+---------+
输出我没问题将dataOne自己获取到数据框中并显示它的内容以确保我只抓取dataOne列 但是,我似乎无法找到将该sql查询转换为数据的正确语法我需要。我试图创建从整个数据创建临时鉴于这种下列数据框设置
Dataset<Row> dataOneCount = spark.sql("select dataOne, count(*) from
dataFrame group by dataOne");
dataOneCount.show();
但引发 我能找到这只是演示了如何做这类聚集的火花1.6和以前这样的文档任何帮助,将不胜感激。
这是我得到的错误消息,但是我检查了数据,并且在那里没有索引错误。
java.lang.ArrayIndexOutOfBoundsException: 11
我也尝试应用功能()方法countDistinct
Column countNum = countDistinct(dataFrame.col("dataOne"));
Dataset<Row> result = dataOneDataFrame.withColumn("count",countNum);
result.show();
其中dataOneDataFrame是从运行
select dataOne from dataFrame
创建的数据帧,但它返回一个分析异常,我米仍然是新的火花,所以我不知道如何/当我正在评估countDistinct方法有错误
编辑:为了澄清,显示的第一个表是我从阅读的文本文件,并应用自定义模式,以它所创建的数据帧的结果(他们仍然是所有的字符串)
Dataset<Row> dataFrame
这里是我的完整代码
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Log File Reader")
.getOrCreate();
//args[0] is the textfile location
JavaRDD<String> logsRDD = spark.sparkContext()
.textFile(args[0],1)
.toJavaRDD();
String schemaString = "dataOne OtherData dataTwo dataThree";
List<StructField> fields = new ArrayList<>();
String[] fieldName = schemaString.split(" ");
for (String field : fieldName){
fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(" ");
return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3]);
});
Dataset<Row> dF = spark.createDataFrame(rowRDD, schema);
//first attempt
dF.groupBy(col("dataOne")).count().show();
//Trying with a sql statement
dF.createOrReplaceTempView("view");
dF.sparkSession().sql("select command, count(*) from view group by command").show();
想到的最可能的事情是使用RowFactory返回行的lambda函数?这个想法听起来很合理,但我不确定它是如何坚持下去的,或者有另一种方式可以做到。除此之外,我挺纳闷的
样本数据
best tree 5 533
OK bush e 3535
MEH cow - 3353
MEH oak none 12
我已经在我的java程序中尝试了这种方法,并且它返回java.lang.ArrayIndexOutOfBoundsException:11 – Sentinel
你能用一个小的隔离示例来重现它吗?您使用的是什么版本的Spark?你尝试过别人吗? – clay
使用spark 2.1添加了完整的代码,我不想退回到以前版本的火花,尽管 – Sentinel