2017-04-10 66 views
0

我的SQL数据库拥有数以百万计的记录他们中的一些表有一千几百万,我的主要选择是约4000行代码,但结构是这样的:最佳方式使用Pyspark与SQL数据库

SELECT A.seq field1, field2, field3, field4, 
     (select field from tableX X... where A.seq = X.seq ...) field5, 
     (select field from tableY Y... where A.seq = Y.seq ...) field6, 
     (select field from tableN Z... where A.seq = Z.seq ...) field7, 
     field8, field9 
    FROM tableA A, tableB B, tableN N 
WHERE A.seq = B.seq 
    AND A.req_seq = N.req_seq; 

我的想法是做这样的事情:

# load the tables in the cluster separately 

conf = SparkConf().setAppName("MyApp") 
sc = SparkContext(master="local[*]", conf=conf) 
sql = HiveContext(sc)  

dataframeA = sql.read.format("jdbc").option("url", 
            "db_url")\ 
    .option("driver", "myDriver")\ 
    .option("dbtable", tableA)\ 
    .option("user", "myuser")\ 
    .option("password", "mypass").load() 

dataframeB = sql.read.format("jdbc").option("url", 
            "db_url")\ 
    .option("driver", "myDriver")\ 
    .option("dbtable", tableC)\ 
    .option("user", "myuser")\ 
    .option("password", "mypass").load() 

dataframeC = sql.read.format("jdbc").option("url", 
            "db_url")\ 
    .option("driver", "myDriver")\ 
    .option("dbtable", tableC)\ 
    .option("user", "myuser")\ 
    .option("password", "mypass").load() 

# then do the needed joins 

df_aux = dataframeA.join(dataframeB, dataframeA.seq == dataframeB.seq) 

df_res_aux = df_aux.join(dataframeC, df_aux.req_seq == dataframeC.req_seq) 


# then with that dataframe calculate the subselect fields 

def calculate_field5(seq): 
    # load the table in the cluster as with the main tables 
    # and query the datafame 
    # or make the query to DB and return the field 
    return field 

df_res = df_res_aux.withColumn('field5', calculate_field5(df_res_aux.seq)) 
# the same for the rest of fields 

这是一个好办法吗? 我应该采用不同的方式吗?

任何意见将真的,真的很感激

回答

0

好,

如果wanto在执行使用MySQL,这是做到这一点的方式。

但得到一些说明,也许你的执行将花费大量的时间来运行,由于mySql查询时间。 MySql不是分布式数据库,因此您可以花费大量时间从mySql检索数据。

我建议你。

尝试将数据检索到hdfs(如果您有HDFS),请尝试使用SqoopHere一个例子如何以增量的方式使用它。

尝试转换存储为Orc的数据。请参阅示例here

这个建议是为了减少数据库的执行时间。每次你从你的MySql中直接请求数据。您正在使用MySql的资源将数据发送到Spark。按照我的建议,您可以将您的数据库复制到HDFS并将这些数据提交给Spark进行处理。这不会导致您的数据库执行时间。

为什么要使用Orc? Orc是将数据转换为紧凑和柱状结构的理想选择。这会增加您的数据检索和搜索。

+0

感谢您的回答!我会看看这些技术。 因此,最好将所有需要的表检索到文件系统或内存中,然后应用过滤器 –

相关问题