2016-03-05 32 views
4

我正在使用大熊猫hdfstore处理来自正在进行的迭代过程的数据帧。在每次迭代中,我追加到hdfstore中的表格。这里是一个玩具例子:如何在追加失败时有效地重建熊猫hdfstore表格

对hdfstore.append抛出各种异常
import pandas as pd 
from pandas import HDFStore 
import numpy as np 
from random import choice 
from string import ascii_letters 
alphanum=np.array(list(ascii_letters)+range(0,9)) 
def hdfstore_append(storefile,key,df,format="t",columns=None,data_columns=None): 
    if df is None: 
     return 
    if key[0]!='/': 
     key='/'+key 
    with HDFStore(storefile) as store: 
     if key not in store.keys(): 
      store.put(key,df,format=format,columns=columns,data_columns=data_columns) 
     else: 
      try: 
       store.append(key,df) 
      except Exception as inst: 
       df = pd.concat([store.get(key),df]) 
       store.put(key,df,format=format,columns=columns, 
          data_columns=data_columns) 

storefile="db.h5" 
for i in range(0,100): 
    df=pd.DataFrame([dict(n=np.random.randn(), 
         s=''.join(alphanum[np.random.randint(1,len(alphanum),np.random.randint(1,2*(i+1))]))],index=[i]) 
    hdfstore_append(storefile,'/SO/df',df,columns=df.columns,data_columns=True) 

的hdfstore_append功能警卫,并重建所必需的表时。这种方法的问题是,当商店中的桌子变得非常大时,它变得非常慢。

有没有更有效的方法来做到这一点?

+0

你不应该赶上这样的expectioms - 他们通常是指你正在做的事情错 – Jeff

+0

在这种情况下,除了指追加数据帧是与pytable好歹不兼容(itemsize,假设类型等)在hdfstore中,在新数据框与旧数据框完全不兼容的情况下,它不会在异常处“传递”。我从你的一位旧公关获得了类似的建议,https://github.com/pydata/pandas/issues/824。你有更好的食谱吗? – jmerkow

+0

如果它不兼容,那么它让你知道是这样的 - 这确实是一个用户错误。是的,我可以像你在做的那样重建,但是如果你反复做这个事情,最好指定min_itemsize和/或列顺序;否则你隐藏错误 – Jeff

回答

0

下面是一个建立大熊猫hdfstore的高效方法的例子。关键是在表格变大时缓存帧号。而不是追加,删除预先存在的数据将基本上创建一个put。

from __future__ import (absolute_import, division, print_function, 
         unicode_literals) 
import six 
import logging 
import os 
from abc import ABCMeta, abstractmethod, abstractproperty 
import warnings 

import pandas as pd 

logger = logging.getLogger(__name__) 


class FramewiseData(object): 
    "Abstract base class defining a data container with framewise access." 

    __metaclass__ = ABCMeta 

    @abstractmethod 
    def put(self, df): 
     pass 

    @abstractmethod 
    def get(self, frame_no): 
     pass 

    @abstractproperty 
    def frames(self): 
     pass 

    @abstractmethod 
    def close(self): 
     pass 

    @abstractproperty 
    def t_column(self): 
     pass 

    def __getitem__(self, frame_no): 
     return self.get(frame_no) 

    def __len__(self): 
     return len(self.frames) 

    def dump(self, N=None): 
     """Return data from all, or the first N, frames in a single DataFrame 
     Parameters 
     ---------- 
     N : integer 
      optional; if None, return all frames 
     Returns 
     ------- 
     DataFrame 
     """ 
     if N is None: 
      return pd.concat(iter(self)) 
     else: 
      i = iter(self) 
      return pd.concat((next(i) for _ in range(N))) 

    @property 
    def max_frame(self): 
     return max(self.frames) 

    def _validate(self, df): 
     if self.t_column not in df.columns: 
      raise ValueError("Cannot write frame without a column " 
          "called {0}".format(self.t_column)) 
     if df[self.t_column].nunique() != 1: 
      raise ValueError("Found multiple values for 'frame'. " 
          "Write one frame at a time.") 

    def __iter__(self): 
     return self._build_generator() 

    def _build_generator(self): 
     for frame_no in self.frames: 
      yield self.get(frame_no) 

    def __enter__(self): 
     return self 

    def __exit__(self, type, value, traceback): 
     self.close() 

KEY_PREFIX = 'Frame_' 
len_key_prefix = len(KEY_PREFIX) 


def code_key(frame_no): 
    "Turn the frame_no into a 'natural name' string idiomatic of HDFStore" 
    key = '{0}{1}'.format(KEY_PREFIX, frame_no) 
    return key 


def decode_key(key): 
    frame_no = int(key[len_key_prefix:]) 
    return frame_no 


class PandasHDFStore(FramewiseData): 
    """An interface to an HDF5 file with framewise access, using pandas. 
    Save each frame's data to a node in a pandas HDFStore. 
    Any additional keyword arguments to the constructor are passed to 
    pandas.HDFStore(). 
    """ 

    def __init__(self, filename, mode='a', t_column='frame', **kwargs): 
     self.filename = os.path.abspath(filename) 
     self._t_column = t_column 
     self.store = pd.HDFStore(self.filename, mode, **kwargs) 

    @property 
    def t_column(self): 
     return self._t_column 

    @property 
    def max_frame(self): 
     return max(self.frames) 

    def put(self, df): 
     if len(df) == 0: 
      warnings.warn('An empty DataFrame was passed to put(). Continuing.') 
      return 
     frame_no = df[self.t_column].values[0] # validated to be all the same 
     key = code_key(frame_no) 
     # Store data as tabular instead of fixed-format. 
     # Make sure remove any prexisting data, so don't really 'append'. 
     try: 
      self.store.remove(key) 
     except KeyError: 
      pass 
     self.store.put(key, df, format='table') 

    def get(self, frame_no): 
     key = code_key(frame_no) 
     frame = self.store.get(key) 
     return frame 

    @property 
    def frames(self): 
     """Returns sorted list of integer frame numbers in file""" 
     return self._get_frame_nos() 

    def _get_frame_nos(self): 
     """Returns sorted list of integer frame numbers in file""" 
     # Pandas' store.keys() scans the entire file looking for stored Pandas 
     # structures. This is very slow for large numbers of frames. 
     # Instead, scan the root level of the file for nodes with names 
     # matching our scheme; we know they are DataFrames. 
     r = [decode_key(key) for key in self.store.root._v_children.keys() if 
      key.startswith(KEY_PREFIX)] 
     r.sort() 
     return r 

    def close(self): 
     self.store.close() 


class PandasHDFStoreBig(PandasHDFStore): 
    """Like PandasHDFStore, but keeps a cache of frame numbers. 
    This can give a large performance boost when a file contains thousands 
    of frames. 
    If a file was made in PandasHDFStore, opening it with this class 
    and then closing it will add a cache (if mode != 'r'). 
    Any additional keyword arguments to the constructor are passed to 
    pandas.HDFStore(). 
    """ 

    def __init__(self, filename, mode='a', t_column='frame', **kwargs): 
     self._CACHE_NAME = '_Frames_Cache' 
     self._frames_cache = None 
     self._cache_dirty = False # Whether _frames_cache needs to be written out 
     super(PandasHDFStoreBig, self).__init__(filename, mode, t_column, 
               **kwargs) 

    @property 
    def frames(self): 
     # Hit memory cache, then disk cache 
     if self._frames_cache is not None: 
      return self._frames_cache 
     else: 
      try: 
       self._frames_cache = list(self.store[self._CACHE_NAME].index.values) 
       self._cache_dirty = False 
      except KeyError: 
       self._frames_cache = self._get_frame_nos() 
       self._cache_dirty = True # In memory, but not in file 
      return self._frames_cache 

    def put(self, df): 
     self._invalidate_cache() 
     super(PandasHDFStoreBig, self).put(df) 

    def rebuild_cache(self): 
     """Delete cache on disk and rebuild it.""" 
     self._invalidate_cache() 
     _ = self.frames # Compute cache 
     self._flush_cache() 

    def _invalidate_cache(self): 
     self._frames_cache = None 
     try: 
      del self.store[self._CACHE_NAME] 
     except KeyError: pass 

    def _flush_cache(self): 
     """Writes frame cache if dirty and file is writable.""" 
     if (self._frames_cache is not None and self._cache_dirty 
       and self.store.root._v_file._iswritable()): 
      self.store[self._CACHE_NAME] = pd.DataFrame({'dummy': 1}, 
                 index=self._frames_cache) 
      self._cache_dirty = False 

    def close(self): 
     """Updates cache, writes if necessary, then closes file.""" 
     if self.store.root._v_file._iswritable(): 
      _ = self.frames # Compute cache 
      self._flush_cache() 
     super(PandasHDFStoreBig, self).close() 


class PandasHDFStoreSingleNode(FramewiseData): 
    """An interface to an HDF5 file with framewise access, 
    using pandas, that is faster for cross-frame queries. 
    This implementation is more complex than PandasHDFStore, 
    but it simplifies (speeds up?) cross-frame queries, 
    like queries for a single probe's entire trajectory. 
    Any additional keyword arguments to the constructor are passed to 
    pandas.HDFStore(). 
    """ 

    def __init__(self, filename, key='FrameData', mode='a', t_column='frame', 
       use_tabular_copy=False, **kwargs): 
     self.filename = os.path.abspath(filename) 
     self.key = key 
     self._t_column = t_column 
     self.store = pd.HDFStore(self.filename, mode, **kwargs) 

     with pd.get_store(self.filename) as store: 
      try: 
       store[self.key] 
      except KeyError: 
       pass 
      else: 
       self._validate_node(use_tabular_copy) 

    @property 
    def t_column(self): 
     return self._t_column 

    def put(self, df): 
     if len(df) == 0: 
      warnings.warn('An empty DataFrame was passed to put(). Continuing.') 
      return 
     self._validate(df) 
     self.store.append(self.key, df, data_columns=True) 

    def get(self, frame_no): 
     frame = self.store.select(self.key, '{0} == {1}'.format(
      self._t_column, frame_no)) 
     return frame 

    def dump(self, N=None): 
     """Return data from all, or the first N, frames in a single DataFrame 
     Parameters 
     ---------- 
     N : integer 
      optional; if None, return all frames 
     Returns 
     ------- 
     DataFrame 
     """ 
     if N is None: 
      return self.store.select(self.key) 
     else: 
      Nth_frame = self.frames[N - 1] 
      return self.store.select(self.key, '{0} <= {1}'.format(
       self._t_column, Nth_frame)) 

    def close(self): 
     self.store.close() 

    def __del__(self): 
     if hasattr(self, 'store'): 
      self.close() 

    @property 
    def frames(self): 
     """Returns sorted list of integer frame numbers in file""" 
     # I assume one column can fit in memory, which is not ideal. 
     # Chunking does not seem to be implemented for select_column. 
     frame_nos = self.store.select_column(self.key, self.t_column).unique() 
     frame_nos.sort() 
     return frame_nos 

    def _validate_node(self, use_tabular_copy): 
     # The HDFStore might be non-tabular, which means we cannot select a 
     # subset, and this whole structure will not work. 
     # For convenience, this can rewrite the table into a tabular node. 
     if use_tabular_copy: 
      self.key = _make_tabular_copy(self.filename, self.key) 

     pandas_type = getattr(getattr(getattr(
      self.store._handle.root, self.key, None), '_v_attrs', None), 
      'pandas_type', None) 
     if not pandas_type == 'frame_table': 
      raise ValueError("This node is not tabular. Call with " 
          "use_tabular_copy=True to proceed.") 


def _make_tabular_copy(store, key): 
    """Copy the contents nontabular node in a pandas HDFStore 
    into a tabular node""" 
    tabular_key = key + '/tabular' 
    logger.info("Making a tabular copy of %s at %s", (key, tabular_key)) 
    store.append(tabular_key, store.get(key), data_columns=True) 
    return tabular_key