1

我正在过滤来自输入parquet文件的下列逻辑的整数列,并试图修改此逻辑以添加其他验证以查看是否有任何一个输入列有计数等于输入parquet文件rdd计数。我想过滤掉这样的列。筛选列的计数等于输入文件rdd Spark

更新

列名称的输入文件的数量不会是一成不变的,它会改变我们每次得到文件的时间。 其目标是也过滤出其计数等于输入文件rdd计数的列。以下逻辑已经实现了过滤整数列。

e.g input parquet file count = 100 
    count of values in column A in the input file = 100 

过滤掉任何这样的列。

当前逻辑

//Get array of structfields 

val columns = df.schema.fields.filter(x => 
       x.dataType.typeName.contains("integer")) 

    //Get the column names 
    val z = df.select(columns.map(x => col(x.name)): _*) 

    //Get array of string 
    val m = z.columns 

新逻辑是这样

val cnt = spark.read.parquet("inputfile").count() 

    val d = z.column.where column count is not equals cnt 

我不想明确地传递列名到新的条件下,由于具有数等于列输入文件将改变(val d = ..以上) 我们如何为此编写逻辑?

+0

所有你过滤,将有相同数量,即行数的整列将是相同的。因此,如果计数与输入地板文件行数相匹配,那么数据框中不会有任何行?那是你要的吗? –

+0

谢谢Ramesh看看这个。所有Integer列将不会有相同的计数,如果您采用approx_count_distinct,则每列的计数将根据其中的不同值而有所不同,现在列的计数或多或少与输入parquet文件计数相似(在我的情况下为100)应该被过滤掉。 – sabby

+0

你可以使用select和where函数我猜:) –

回答

2

根据我对你的问题的理解,您在列与integer为的dataType试图filter,其distinct count是不是在另一个输入parquet文件等于rowscount。如果我的理解是正确的,你可以在你现有的过滤器添加列数过滤器

val cnt = spark.read.parquet("inputfile").count() 
val columns = df.schema.fields.filter(x => 
    x.dataType.typeName.contains("string") && df.select(x.name).distinct().count() != cnt) 

代码的其余部分应遵循它。

我希望答案是有帮助的。

+0

谢谢Ramesh!棒极了! :) – sabby

+0

@sabby我的荣幸:) –

0

Jeanr和Ramesh提出正确的做法,这里是我做过什么,以获得所需的输出,它的工作:)

cnt = (inputfiledf.count()) 

val r = df.select(df.col("*")).where(df.col("MY_COLUMN_NAME").<(cnt))