2016-07-05 54 views
3

我每天都会将大约2至250万条记录加载到Postgres数据库中。将熊猫数据框转换为内存中类似文件的对象?

然后我用pd.read_sql读取这个数据,将它变成一个数据帧,然后我做一些列操作和一些次要的合并。我将这些修改后的数据保存为一个单独的表格供其他人使用。

当我做pd.to_sql它需要永远。如果我在Postgres中保存一个csv文件并使用COPY FROM,整个过程只需要几分钟,但服务器位于另一台机器上,并且在那里传输文件是一件痛苦的事情。

使用psycopg2,它看起来像我可以使用copy_expert从批量复制中受益,但仍然使用python。如果可能,我想尽量避免写一个实际的csv文件。我可以用熊猫数据框在内存中执行此操作吗?

这是我的熊猫代码的一个例子。如果可能的话,我想添加copy_expert或其他东西以使数据保存得更快。

for date in required_date_range: 
     df = pd.read_sql(sql=query, con=pg_engine, params={'x' : date}) 
     ... 
     do stuff to the columns 
     ... 
     df.to_sql('table_name', pg_engine, index=False, if_exists='append', dtype=final_table_dtypes) 

有人可以帮助我的例子代码?我宁愿仍然使用熊猫,这将是很好的记忆。如果不是的话,我只会写一个csv临时文件并且这样做。

编辑 - 这里是我的最终代码,它的工作原理。每个日期(数百万行)只需要几百秒而不是几个小时。

to_sql = “” “COPY%S FROM STDIN WITH CSV HEADER” “”

def process_file(conn, table_name, file_object): 
    fake_conn = cms_dtypes.pg_engine.raw_connection() 
    fake_cur = fake_conn.cursor() 
    fake_cur.copy_expert(sql=to_sql % table_name, file=file_object) 
    fake_conn.commit() 
    fake_cur.close() 


#after doing stuff to the dataframe 
    s_buf = io.StringIO() 
    df.to_csv(s_buf) 
    process_file(cms_dtypes.pg_engine, 'fact_cms_employee', s_buf) 
+1

我不知道psycopg2,但你可以尝试如下:'s_buf = io.StringIO()','df.to_csv(s_buf)',它会将你的df存储在类似文件的缓冲区中。那么也许'cur.copy_from(s_buf,...)'而不是'copy_expert'。 – ptrj

+0

强壮的工作!尽管我仍然保留着复制专家。当我使用普通的pandas.to_sql时,它只花费了100秒而不是10000秒。做一个真实的答案,所以我可以接受 – trench

+0

很高兴我能帮上忙。 – ptrj

回答

9

Python模块iodocs)具有用于类文件对象必需的工具。

import io 

# text buffer 
s_buf = io.StringIO() 

# saving a data frame to a buffer (same as with a regular file): 
df.to_csv(s_buf) 

编辑。 (我忘了),以便事后从缓冲区读取,它的位置应设置为开头:

s_buf.seek(0) 

我不熟悉psycopg2但根据docscopy_expertcopy_from可以使用,例如:

cur.copy_from(s_buf, table) 

(对于Python 2,参见StringIO。)

+0

谢谢。此外,我正在循环查看每个日期以查询该日期。每次我再次连接到数据库。有没有更好的方法不必连接/重新连接每个循环?就像我会连接一次,然后我只是在我的循环中更改查询? – trench

+0

我不确定我是否理解,但是不能只查询一次所有日期(范围内的所有日期)。如果这太大,那么可能会查询大块。我想它会添加日期到数据框的另一列。然后,如果您不需要它,或者选择并使用子帧,或者按日期“groupby”并遍历组,则可以删除此列。或者,如果您想完全避免使用'pd.read_sql',可以使用'copy_expert' /'copy_to'将数据复制到字符串缓冲区,并使用'pd.read_csv'将其加载到数据帧中。这只是从我的头顶开始。 – ptrj

3

我不得不实现从溶液中的问题ptrj。

我认为这个问题源于熊猫设置缓冲区的位置到最后。

见如下:

from StringIO import StringIO 
df = pd.DataFrame({"name":['foo','bar'],"id":[1,2]}) 
s_buf = StringIO() 
df.to_csv(s_buf) 
s_buf.__dict__ 

# Output 
# {'softspace': 0, 'buflist': ['foo,1\n', 'bar,2\n'], 'pos': 12, 'len': 12, 'closed': False, 'buf': ''} 

注意,POS是12.我不得不把POS机设置为0,以使后续copy_from命令工作

s_buf.pos = 0 
cur = conn.cursor() 
cur.copy_from(s_buf, tablename, sep=',') 
conn.commit() 
+0

我在周末运行了该代码,并没有得到任何错误。当我星期一到办公室时,我的桌子是空的,被吸了。我最终编写了完整工作的临时csv文件。所以,如果我做to_csv(s_buf),然后s_buf.pos = 0,那么它会工作,而不写一个csv? – trench

+1

@a_bigbadwolf @trench好点!这是文件/流缓冲区的标准行为(即不是熊猫的错误​​)。我只是忘了包括它,我的歉意。将位置设置为开头的惯用方法是运行's_buf.seek(0)'。 – ptrj