我有一个Spark作业,它可以非常快速地处理数据,但是当它试图将结果写入postgresql数据库时,它非常缓慢。以下是大部分相关代码:pyspark + psycopg2将结果写入数据库的速度很慢
import psycopg2
def save_df_to_db(records):
# each item in record is a dictionary with 'url', 'tag', 'value' as keys
db_conn = psycopg2.connect(connect_string)
db_conn.autocommit = True
cur = db_conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
upsert_query = """INSERT INTO mytable (url, tag, value)
VALUES (%(url)s, %(tag)s, %(value)s) ON CONFLICT (url, tag) DO UPDATE SET value = %(value)s"""
try:
cursor.executemany(upsert_query, records)
except Exception as e:
print "Error in executing save_df_to_db: ", e.message
data = [...] # initial data
rdd = sc.parallelize(data)
rdd = ... # Some simple RDD transforms...
rdd.foreachPartition(save_df_to_db)
该表还对url +标记是唯一的约束。我正在寻找解决方案来提高此代码的速度。任何建议或建议是受欢迎的。
作为此时Psycopg 2.7,它提供'execute_values',仍处于测试阶段。现在使用适合2.6的解决方案:http://stackoverflow.com/a/30985541/131874 –