2017-04-02 80 views
3

我想使用Spark来处理来自JDBC源的一些数据。但是,首先,我不想从JDBC读取原始表,而是想在JDBC端运行一些查询来过滤列和连接表,并将查询结果作为Spark SQL中的表加载。如何在jdbc数据源中使用dbtable选项的子查询?

以下语法来加载原始JDBC表对我的作品:

df_table1 = sqlContext.read.format('jdbc').options(
    url="jdbc:mysql://foo.com:3306", 
    dbtable="mydb.table1", 
    user="me", 
    password="******", 
    driver="com.mysql.jdbc.Driver" # mysql JDBC driver 5.1.41 
).load() 
df_table1.show() # succeeded 

据星火documentation(我使用PySpark 1.6.3):

DBTABLE:该JDBC表应该阅读。请注意,可以使用任何在SQL查询的FROM子句中有效的 。例如,您可以在圆括号中使用子查询而不是 全表。

所以只是为了实验,我试着像这样简单的东西:

df_table1 = sqlContext.read.format('jdbc').options(
    url="jdbc:mysql://foo.com:3306", 
    dbtable="(SELECT * FROM mydb.table1) AS table1", 
    user="me", 
    password="******", 
    driver="com.mysql.jdbc.Driver" 
).load() # failed 

它抛出以下异常:

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'table1 WHERE 1=0' at line 1 

我也尝试了语法的一些其他变化(增加/删除括号,删除'as'子句,切换案例等),没有任何运气。那么正确的语法是什么?我在哪里可以找到更详细的语法文档?此外,错误信息中这个奇怪的“WHERE 1 = 0”来自哪里?谢谢!

+0

从我的角度来看,你只需要指定你试图拉入的表格,这样就省略了选择的表述。 0 = 1来自您未指定的参数。看看[Dataframe Reader]的源代码(https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader。 scala#L207-L216) –

+0

这个http://stackoverflow.com/q/32628717/1305344看起来很相似,但它与PostgreSQL(不是MySQL)相似。 –

回答

1

对于使用SQL星火SQL查询从JDBC源读取数据,你可以尝试这样的事:

val df_table1 = sqlContext.read.format("jdbc").options(Map(
    ("url" -> "jdbc:postgresql://localhost:5432/mydb"), 
    ("dbtable" -> "(select * from table1) as table1"), 
    ("user" -> "me"), 
    ("password" -> "******"), 
    ("driver" -> "org.postgresql.Driver")) 
).load() 

我尝试了使用PostgreSQL。您可以根据MySQL进行修改。

+0

看来我和你的答案一样,除了我使用python。我的PySpark特有的代码可能有一些语法错误? – Dichen

0

我认为这可能是Spark SQL中的一个错误。

似乎thisthis line给你的错误。两者都使用Scala字符串插值来替换tabledbtable

s"SELECT * FROM $table WHERE 1=0" 

这就是你可以从你所面临由于上述模式将成为错误发现table1 WHERE 1=0

SELECT * FROM (select * from table1) as table1 WHERE 1=0 

看起来不正确。

有的确是一个MySQL特有的方言 - MySQLDialect - 重写getTableExistsQueryits own

override def getTableExistsQuery(table: String): String = { 
    s"SELECT 1 FROM $table LIMIT 1" 
} 

,所以我的办法是,其他方法getSchemaQuery是错误的来源。如果您使用Spark 1.6.3,而此方法有@Since("2.1.0")标记,则这种方法非常不可靠。

我强烈建议检出MySQL数据库的日志并查看导致错误消息的执行的查询。

相关问题