2017-03-01 58 views
0

我在Stackoverflow上找到类似的帖子。但是,我无法解决我的问题所以,这就是我写这篇文章的原因。sparks scala typesafe config安全迭代特定列名的值

目的

目的是在加载SQL表(I使用SQL Server),以执行列投影[投影=过滤柱]。

根据斯卡拉食谱这是过滤colums的方式[使用数组]:

sqlContext.read.jdbc(url,"person",Array("gender='M'"),prop) 

不过,我不想硬编码阵列(所有“col1”,“col2上”,... )在我的Scala代码中,这就是为什么我使用带有类型安全的配置文件(参见下文)。

配置文件

dataset { 
    type = sql 
    sql{ 
     url = "jdbc://host:port:user:name:password" 
     tablename = "ClientShampooBusinesLimited" 
     driver = "driver" 
     other = "i have a lot of other single string elements in the config file..." 
     columnList = [ 
     { 
      colname = "id" 
      colAlias = "identifient" 
     } 
     { 
      colname = "name" 
      colAlias = "nom client" 
     } 
     { 
      colname = "age" 
      colAlias = "âge client" 
     } 
     ] 
    } 
} 

让我们专注于 'columnList':在SQL列的名称exatecly对应于 'colname的'。 'colAlias'是我稍后会用到的一个字段。

​​

configFromFile是由自己在其他自定义类创建data.scala文件。但这并不重要。 columnList的类型是“ConfigList”,这个类型来自类型安全。

主文件

def loadDataSQL(): DataFrame = { 

val url = datasetConfig.dbUrl 
val dbTablename = datasetConfig.DbTableName 
val dbDriver = datasetConfig.DriverName 
val columns = // I need help to solve this 


/* EDIT 2 march 2017 
    This code should not be used. Have a look at the accepted answer. 
*/ 
sparkSession.read.format("jdbc").options(
    Map("url" -> url, 
    "dbtable" -> dbTablename, 
    "predicates" -> columns, 
    "driver" -> dbDriver)) 
    .load() 
} 

所以我所有的问题是为了把它们放在一个合适的阵列来提取“colnames的价值观。有人可以帮我写出'val列'的正确的操作符吗?

感谢

回答

1

如果你正在寻找一种方式来读取colname值列表为斯卡拉阵列 - 我觉得这个做的:

import scala.collection.JavaConverters._ 

val columnList = configFromFile.getConfigList("dataset.sql.columnList") 
val colNames: Array[String] = columnList.asScala.map(_.getString("colname")).toArray 

使用随机提供的文件,这将导致Array(id, name, age)

编辑: 至于你的实际目标,我居然不知道叫predication任何选项(我也不能使用Spark 2.0.2在源代码中找到证据)。

JDBC数据源根据所用查询中选择的实际列执行“投影下推”。换句话说 - 只有选择列会从数据库中读取,这样你就可以在select使用colNames阵列立即DF创建以下,如:

import org.apache.spark.sql.functions._ 

sparkSession.read 
    .format("jdbc") 
    .options(Map("url" -> url, "dbtable" -> dbTablename, "driver" -> dbDriver)) 
    .load() 
    .select(colNames.map(col): _*) // selecting only desired columns 
+0

亲爱Tzach琐,这是exactely我一直在寻找对于。非常感谢你的帮助。 – S12000

+0

但是,在“预测” - >列中出现错误,它表示“过载”方法。你知道什么是问题吗?谢谢 – S12000

+0

不确定你所指的错误,但我更新了我的答案,希望能帮助你实际读取DB –