0

乡亲您好我有一个加载一些S3位置的数据集,并返回有趣的数据有条件申请`filter` /`where`到火花`Dataset` /`Dataframe`

private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] = { 
import spark.implicits._ 

spark 
    .sparkContext.textFile(s3BrowseIndex) 
    // split text dataset 
    .map(line => line.split("\\s+")) 
    // get types for attributes 
    .map(BrowseIndex.strAttributesToBrowseIndex) 
    // cast it to a dataset (requires implicit conversions) 
    .toDS() 
    // pick rows for the given marketplaces 
    .where($"mid".isin(mids: _*)) 
    // pick rows for the given indices 
    .where($"index".isin(indices: _*)) 

功能}

如果有人提供mids = Seq()indices = Seq(),此实现将过滤所有内容。另一方面,我希望语义只有在“mids非空时才应用此where子句”(与indices相同),以便在函数的用户提供空序列时不发生过滤。

有没有一个很好的功能方法来做到这一点?

回答

2

拉斐尔·罗斯的回答是一个很好的选择应用过滤器的具体问题,如果你不介意稍微复杂的逻辑。一般的解决方法,这对于任何有条件改造工程(不只是过滤,而不是只是在做该决定的一个分支没有),就是用transform,例如,

spark 
    .sparkContext.textFile(s3BrowseIndex) 
    // split text dataset 
    .map(line => line.split("\\s+")) 
    // get types for attributes 
    .map(BrowseIndex.strAttributesToBrowseIndex) 
    // cast it to a dataset (requires implicit conversions) 
    .toDS() 
    .transform { ds => 
    // pick rows for the given marketplaces 
    if (mids.isEmpty) ds 
    else ds.where($"mid".isin(mids: _*)) 
    } 
    .transform { ds => 
    // pick rows for the given indices 
    if (indices.isEmpty) ds 
    else ds.where($"index".isin(indices: _*)) 
    } 

如果您使用的是稳定的类型的数据集(或dataframes,这是Dataset[Row]),transform是非常有用的,你可以建立的转换函数序列,然后再应用:

transformations.foldLeft(ds)(_ transform _) 

在许多情况下,这种做法与代码重用和可测试性的帮助。

+1

对我表示感谢! –

1

可以使用短路计算,这应该只适用于过滤器,如果提供Seq s为不为空:

import org.apache.spark.sql.functions.lit 

spark 
    .sparkContext.textFile(s3BrowseIndex) 
    // split text dataset 
    .map(line => line.split("\\s+")) 
    // get types for attributes 
    .map(BrowseIndex.strAttributesToBrowseIndex) 
    // cast it to a dataset (requires implicit conversions) 
    .toDS() 
    // pick rows for the given marketplaces 
    .where(lit(mids.isEmpty) or $"mid".isin(mids: _*)) 
    // pick rows for the given indices 
    .where(lit(indices.isEmpty) or $"index".isin(indices: _*)) 
+0

谢谢拉斐尔,你的解决方案有效。我upvoted它!尽管Sim的回答是一般性和简单的逻辑,但我会选择Sim的回答作为这个问题的答案。 –