2017-08-31 58 views
1

并行化的HTTP请求我使用pyspark做一些数据转换:类似如下:与Pyspark

df_systems_tree_users = sqlContext.read.format("jdbc") \ 
    .option("dbtable", 
      "(select ID as SYSTEMUID,M_EXTERNAL_ID,metric,DATATRANSMISSIONFREQUENCY,MODEL,BRAND,BUILDING FROM SYSTEM INNER JOIN SENSOR ON SYSTEM.ID=SENSOR.SYSTEMID WHERE LPWANOPERATOR='Objenious' AND M_EXTERNAL_ID!='None')") \ 
    .option("url", "jdbc:phoenix:master1:2181:/hbase-unsecure") \ 
    .option("driver", "org.apache.phoenix.jdbc.PhoenixDriver") \ 
    .load() 
objRDD = df_systems_tree_users.rdd.map(lambda x: getStatesAndUplink(x)) 

getStatesAndUplink方法使用请求python库来执行HTTP GET请求外部API。

我在4个执行器上运行4个执行器,每个执行器有4个内核,但运行30分钟需要很多时间。

我的问题是如何优化我的代码以高效的方式并行化我的http请求?

+0

你的getStatesAndUplink(x)函数到底是什么?坚持使用数据框和使用UDF可能会更快。 – MaFF

回答

1

正如documentation提到的,你必须指定4个参数:

  • partitionColumn
  • lowerBound
  • upperBound
  • numPartitions

只有这些选项星火会并行读入其他情况下,它将在一个线程中完成。

编辑:存在Phoenix Spark plugin。它将并行读取而不指定这4个参数。

编辑2:如果数据不平衡,则方法getStatesAndUplink可能会限制外部服务。它可能只是“卡住”在一个节点上的处理