2017-02-14 328 views
0

我有一个Spark作业,它可以非常快速地处理数据,但是当它试图将结果写入postgresql数据库时,它非常缓慢。以下是大部分相关代码:pyspark + psycopg2将结果写入数据库的速度很慢

import psycopg2 

def save_df_to_db(records): 
    # each item in record is a dictionary with 'url', 'tag', 'value' as keys 
    db_conn = psycopg2.connect(connect_string) 
    db_conn.autocommit = True 
    cur = db_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) 
    upsert_query = """INSERT INTO mytable (url, tag, value) 
         VALUES (%(url)s, %(tag)s, %(value)s) ON CONFLICT (url, tag) DO UPDATE SET value = %(value)s""" 

    try: 
     cursor.executemany(upsert_query, records) 
    except Exception as e: 
     print "Error in executing save_df_to_db: ", e.message 

data = [...] # initial data 
rdd = sc.parallelize(data) 
rdd = ... # Some simple RDD transforms... 
rdd.foreachPartition(save_df_to_db) 

该表还对url +标记是唯一的约束。我正在寻找解决方案来提高此代码的速度。任何建议或建议是受欢迎的。

+0

作为此时Psycopg 2.7,它提供'execute_values',仍处于测试阶段。现在使用适合2.6的解决方案:http://stackoverflow.com/a/30985541/131874 –

回答

0

感谢您的回复。由于我使用的psycopg2版本不支持批处理执行,因此我不得不依赖使用copy命令的稍微不同的方法。我写下了一个小功能,帮助将保存时间从20分钟缩短到30秒左右。这是功能。它采用熊猫数据帧作为输入,并将其写入一个表(CURSO):

import StringIO 
import pandas as pd 

def write_dataframe_to_table(cursor, table, dataframe, batch_size=100, null='None'): 
    """ 
    Write a pandas dataframe into a postgres table. 
    It only works if the table columns have the same name as the dataframe columns. 
    :param cursor: the psycopg2 cursor object 
    :param table: the table name 
    :param dataframe: the dataframe 
    :param batch_size: batch size 
    :param null: textual representation of NULL in the file. The default is the string None. 
    """ 
    for i in range(0, len(dataframe), batch_size): 
     chunk_df = dataframe[i: batch_size + i] 
     content = "\n".join(chunk_df.apply(lambda x: "\t".join(map(str, x)), axis=1)) 
     cursor.copy_from(StringIO.StringIO(content), table, columns=list(chunk_df.columns), null=null) 
1

我相信主要的瓶颈是cursor.executemanyconnection.autocommit的组合。因为它是executemany

官方文档中解释在目前的实现这个方法是不是比执行汉在一个循环中执行​​更快。

既然你把它和connection.autocommit结合起来,你可以在每次插入后有效地提交。

Psycopg提供fast execution helpers

可以被用于执行批处理操作

。手动处理提交也更有意义。

另外还有可能会使用大量的并发写入和索引更新来限制数据库服务器。通常我会建议写入磁盘并使用COPY执行批量导入,但不能保证在此处提供帮助。

由于您使用不带时间戳的可变记录,因此您不能只删除索引并在导入后重新创建它作为提高性能的另一种方式。