0

我正在使用Spark JDBC从MS SQL数据库读取数据,但我得到了一些奇怪的结果。Apache Spark JDBC DataFrame计数问题

例如,下面是我的代码来从我的MS SQL数据库中读取记录。 请注意,我正在读取数据的表格不断插入记录。

//Extract Data from JDBC source 
    val jdbcTable = sqlContext.read.format("jdbc").options(
     Map(
     "url" -> jdcbUrl, 
     "driver" -> "net.sourceforge.jtds.jdbc.Driver", 
     "dbtable" -> 
      s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t")) 
     .load 

    println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}") 

    val updateJdbcDF = jdbcTable 
     .withColumn("ID-COL1", trim($"COl1")) 
     .withColumn("ID-COL1", trim($"COl2")) 

    println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}") 

我一次比一次我跑我的程序,例如我总是${updateJdbcDF.count()}计数>${jdbcTable.count()} 2个不同的计数值。

有人可以解释我为什么会发生这种情况?这在我的用例中创造了很多问题。如何在创建DataFrame后限制数量为jdbcTable。我试过jdbcTable.cache(),但没有运气。

当我对来自jdbcTable DataFrame的其他数据帧使用任何操作时,记录只会变得越来越大。每次使用从jdbcTable数据框派生的任何数据帧时,是否会调用jdbcTable数据帧?

+0

差异是否不变?或者你每次都得到不同的结果吗? – philantrovert

+0

@philantrovert没有区别是不恒定的,我每次都得到不同的计数。 – nilesh1212

+1

那么,如果“我正在读取数据的表格不断插入记录”,并且您的请求没有定义一个固定范围谓词,那么每次火花访问它时,表中的行数都不相同。所以你所看到的(改变计数)只是预料之中,不是吗? – GPI

回答

1

我能够通过应用jdbcTable.cache()来解决这个问题,现在任何派生自jdbcTable数据框的DF都不会给我一个比jdbcTable.count()更高的计数。所有的计算现在都可以。感谢您的解释@GPI

//Extract Data from JDBC source 
    val jdbcTable = sqlContext.read.format("jdbc").options(
     Map(
     "url" -> jdcbUrl, 
     "driver" -> "net.sourceforge.jtds.jdbc.Driver", 
     "dbtable" -> 
      s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t")) 
     .load 

    jdbcTable.cache() 

    println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}") 


    val updateJdbcDF = jdbcTable 
     .withColumn("ID-COL1", trim($"COl1")) 
     .withColumn("ID-COL1", trim($"COl2")) 

    println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}") 
    /** 
    * MORE DATA PROCESSING HERE 
    /** 

    jdbcTable.unpersist()