2016-03-02 75 views
1

我尝试运行pyspark下面的SQL查询(在星火1.5.0):星火SQL:交叉联接与子查询

SELECT * FROM (SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN (SELECT obj AS origProperty2 FROM b LIMIT 10) tab2 

这是pyspark命令什么样子:

from pyspark.sql import SQLContext 
sqlCtx = SQLContext(sc) 

a = sqlCtx.parquetFile("hdfs/path/to/table/a.parquet") 
a.registerTempTable("a") 

b = sqlCtx.parquetFile("hdfs/path/to/table/b.parquet") 
b.registerTempTable("b") 

result = sqlCtx.sql("SELECT * FROM (SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN (SELECT obj AS origProperty2 FROM b LIMIT 10) tab2").collect() 

但它会产生这样的错误:据我所知CROSS JOIN以及子查询(在FROM)

Traceback (most recent call last): 
File "<stdin>", line 1, in <module> 
File "/usr/lib/spark/python/pyspark/sql/context.py", line 552, in sql 
return DataFrame(self._ssql_ctx.sql(sqlQuery), self) 
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco 
return f(*a, **kw) 
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o19.sql. 
: java.lang.RuntimeException: [1.67] failure: ``union'' expected but identifier CROSS found 

SELECT * FROM (SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN (SELECT obj AS origProperty2 FROM b LIMIT 10) tab2 
                   ^
at scala.sys.package$.error(package.scala:27) 
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36) 
at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67) 
at org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:175) 
at org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:175) 
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:115) 
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114) 
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) 
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) 
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) 
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) 
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) 
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) 
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) 
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) 
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) 
at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) 
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34) 
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:172) 
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:172) 
at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:42) 
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:195) 
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
at py4j.Gateway.invoke(Gateway.java:259) 
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
at py4j.commands.CallCommand.execute(CallCommand.java:79) 
at py4j.GatewayConnection.run(GatewayConnection.java:207) 
at java.lang.Thread.run(Thread.java:745) 

应该由星火S为支持QL。你有什么建议如何解决它?

感谢 蒂莫

回答

1

使用HiveContext而不是SQLContext似乎解决了问题。该解决方案是工作的罚款:

from pyspark.sql import HiveContext 
sqlCtx = HiveContext(sc) 

a = sqlCtx.parquetFile("hdfs/path/to/table/a.parquet") 
a.registerTempTable("a") 

b = sqlCtx.parquetFile("hdfs/path/to/table/b.parquet") 
b.registerTempTable("b") 

result = sqlCtx.sql("SELECT * FROM (SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN (SELECT obj AS origProperty2 FROM b LIMIT 10) tab2").collect() 

但是,在我的理解这应该也与SQLContext火花SQL据工作作为子查询和交叉连接的支持...