1

有人可以提供一个使用pyspark的例子来说明如何运行自定义的Apache Phoenix SQL查询并将查询的结果存储在RDD或DF中。注意:我正在查找自定义查询,而不是要将整个表读入RDD。在PySpark中运行自定义的Apache Phoenix SQL查询

从凤凰文件,加载整个表我可以用这个:

table = sqlContext.read \ 
     .format("org.apache.phoenix.spark") \ 
     .option("table", "<TABLENAME>") \ 
     .option("zkUrl", "<hostname>:<port>") \ 
     .load() 

我想知道什么是使用自定义的SQL

sqlResult = sqlContext.read \ 
      .format("org.apache.phoenix.spark") \ 
      .option("sql", "select * from <TABLENAME> where <CONDITION>") \ 
      .option("zkUrl", "<HOSTNAME>:<PORT>") \ 
      .load() 

由于相应的等价物。

回答

1

这可以使用凤凰作为一个JDBC数据源,如下进行:

sql = '(select COL1, COL2 from TABLE where COL3 = 5) as TEMP_TABLE' 

df = sqlContext.read.format('jdbc')\ 
     .options(driver="org.apache.phoenix.jdbc.PhoenixDriver", url='jdbc:phoenix:<HOSTNAME>:<PORT>', dbtable=sql).load() 

df.show() 

然而,应该指出的是,如果在SQL语句中列的别名则。 show()语句会抛出一个异常(如果你使用.select()来选择没有别名的列,它将会工作),这是Phoenix中的一个可能的错误。

+0

这是问题的答案还是问题的一部分? – YOU

+0

两者。它使用JDBC来实现我想要做的事情,但使用Phoenix Spark选项会更好,因此我尝试使用它以及相应的错误消息。 –

+0

问题应该在第一篇文章中编辑,因为这是答案部分。 stackoverflow不像普通的论坛。 – YOU

0

在这里您需要使用.sql来处理自定义查询。这里是语法

dataframe = sqlContext.sql("select * from <table> where <condition>") 
dataframe.show() 
+0

这是行不通的,因为我们不告诉Spark在任何地方使用Phoenix。这是Phoenix文档的链接https://phoenix.apache.org/phoenix_spark.html –