2017-04-22 290 views
0

有人可以告诉如何使用Phoenix Spark连接器而不是使用Phoenix作为JDBC数据源来连接到Spark(如果作为JDBC源使用,但性能是问题https://phoenix.apache.org/phoenix_spark.html)。使用Apache Phoenix Spark连接到Spark并运行自定义SQL查询

这是我试图用Phoenix驱动程序来做的,但它会抛出“Table Not Found”异常。

sql = '(select COL1, COL2 from TABLE where COL3 = 5) as TEMP_TABLE' 

df2 = sqlContext.read\ 
       .format("org.apache.phoenix.spark")\ 
       .option("table", sql)\ 
       .option("zkUrl", "<HOSTNAME>:<PORT>")\ 
       .load() 

结果

Traceback (most recent call last): 
File "<stdin>", line 1, in <module> 
File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 139, in load 
return self._df(self._jreader.load()) 
File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__ 
    File "/usr/lib/spark/python/pyspark/sql/utils.py", line 45, in deco 
    return f(*a, **kw) 
    File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o1625.load. 
: org.apache.phoenix.schema.TableNotFoundException: ERROR 1012 (42M03): Table undefined. tableName=sql 
    at org.apache.phoenix.schema.PMetaDataImpl.getTableRef(PMetaDataImpl.java:244) 
    at org.apache.phoenix.jdbc.PhoenixConnection.getTable(PhoenixConnection.java:441) 
    at org.apache.phoenix.util.PhoenixRuntime.getTable(PhoenixRuntime.java:379) 
    at org.apache.phoenix.util.PhoenixRuntime.generateColumnInfo(PhoenixRuntime.java:405) 
    at 

org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getSelectColumnMetadataList(PhoenixConfigurationUtil.java:279) 
    at org.apache.phoenix.spark.PhoenixRDD.toDataFrame(PhoenixRDD.scala:105) 
    at org.apache.phoenix.spark.PhoenixRelation.schema(PhoenixRelation.scala:57) 
    at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:37) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125) 
    at sun.reflect.GeneratedMethodAccessor102.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
+0

你能证明这个表是否存在吗? –

+0

@ cricket_007是的,该表存在,我可以通过将Phoenix视为JDBC数据源来使用它。 –

+0

@ cricket_007,这是它如何与JDBC一起工作: 'sql ='(选择COL1,来自TABLE,其中COL3 = 5的COL2)为TEMP_TABLE'; df = sqlContext.read.format('jdbc')。options(driver =“org.apache.phoenix.jdbc.PhoenixDriver”,url ='jdbc:phoenix:',dbtable = sql).load )' –

回答

1

sql变量是毫无意义的。你需要从TABLE营造全数据帧。该"table"选项不是一个SQL语句

然后使用星火的数据帧API来selectCOL1, COL2filterCOL3 = 5

你可以在这里看到的例子是TABLE1被创建,然后在选项,然后使用(在Scala示例),它都是Dataframe操作。

Phoenix Spark

在你的情况,一旦你正确地装载表,不使用sql变量,你已经有了这个

df3 = df2.select('COL1', 'COL2').where('COL3 = 5') 

或者你正在寻找如何星火API的工作之外凤凰...

喜欢的东西,Running queries programmatically

您可以使用数据框为原料q ueries,不通过查询来构建Dataframe

df = sqlContext.read\ 
      .format("org.apache.phoenix.spark")\ 
      .option("table", "TABLE")\ 
      .option("zkUrl", "<HOSTNAME>:<PORT>")\ 
      .load() 
# df.createOrReplaceTempView("phoenix") # Maybe necessary 

sqlDF = sqlContext.sql("SELECT COL1, COL2 FROM TABLE where COL3 = 5") 
sqlDF.show() 
+0

我有兴趣了解Phoenix Spark连接器是否支持JDBC连接器允许我执行的操作。例如,下面的语句实现了使用JDBC的目的: 'sql ='(从TABLE选择COL1,COL2,其中COL3 = 5)为TEMP_TABLE'; df = sqlContext.read.format('jdbc')。options(driver =“org.apache.ph oenix.jdbc.PhoenixDr iver”,url ='jdbc:phoenix:',dbtable = sql) .load()' –

相关问题