2016-11-18 100 views
3

我是一名博士研究生,学习市场微观结构。我需要处理非常大的数据集(几百GB的毫秒数据)。我一直在使用SAS,这对于处理数据帧格式的大数据非常好。但是,这是昂贵的。我想用Python来进行我的学习/研究。 我在Python中有一些但并不是先进的技能。我听说熊猫在处理数据帧方面非常有效,但它仅限于RAM,这对我的目的来说并不是很好。将大型csv文件读入字典时出现内存错误

我曾尝试过: 我试图逐行迭代数据,处理它们并存储到字典中,但是这具有内存限制。 我得到了内存错误,我可以看到Python咀嚼所有的RAM(我有32GB)。与稍后我要处理的数据(50〜100 gb)相比,该数据集仍然非常小(500 MB)。此外,还有一些难以一行一行地完成的事情,例如回归,图表等。 所以我的问题是我应该如何处理和存储这些数据?

输入的数据是这样的:

#RIC Date[L]  Time[L] Type Price Volume Bid Price Ask Price 
TPI.AX 20140820 00:11.7 Quote        0.91 
TPI.AX 20140820 00:11.7 Trade 0.91 10000  
TPI.AX 20140820 00:21.5 Quote        0.91 
TPI.AX 20140820 00:22.1 Quote     0.905 
TPI.AX 20140820 00:42.2 Quote     0.905 
TPI.AX 20140820 00:42.6 Trade 0.9075 117  
TPI.AX 20140820 00:43.1 Trade 0.9075 495  
TPI.AX 20140820 00:49.6 Quote     0.905 
TPI.AX 20140820 00:57.6 Quote     0.905 
TPI.AX 20140820 00:57.6 Quote     0.905 
TPI.AX 20140820 00:58.3 Quote     0.905 
TPI.AX 20140820 01:02.6 Quote        0.91 
TPI.AX 20140820 01:02.6 Quote        0.91 
TPI.AX 20140820 01:02.6 Quote     0.905 
TPI.AX 20140820 01:02.6 Trade 0.91 9365   
TPI.AX 20140820 01:02.6 Trade 0.91 9041   

这是我的代码:

def spread_calculation(input_file_list, output_file): 
    """This function calculates the spreads for securities in input_file_list 
    input: trade and quote data from TRTH 
    2 parameters: 1. list of file names, 2.output file name 
    output: csv file contains spreads""" 
    # Set variables: 
    date = None 
    exchange_bbo = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float))))) 
    effective_spread = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float))))) 
    time_bucket = [i * 100000.0 for i in range(0, (16 * 60 * 60 * 1000) * 1000/100000)] 
    for file in input_file_list: 
     file_to_open = '%s.csv' % file 
     reader = csv.DictReader(open(file_to_open, 'rb')) 
     for i in reader: 
      if not bool(date): 
       date = i['Date[L]'][0:4] + "-" + i['Date[L]'][4:6] + "-" + i['Date[L]'][6:8] 
      if i['Type'] == 'Quote' and (time_to_milli(i['Time[L]']) <= (16*60*60*1000)*1000): 
       security = i['#RIC'].split('.')[0] 
       exchange = i['#RIC'].split('.')[1] 
       timestamp = float(time_to_milli(i['Time[L]'])) 
       bucket = ceil(float(time_to_milli(i['Time[L]']))/100000.0) * 100000.0 
       if i['Bid Price'] == "": 
        bid = 0.0 
       else: 
        bid = float(i['Bid Price']) 
       if i['Ask Price'] == "": 
        ask = 0.0 
       else: 
        ask = float(i['Ask Price']) 
       if bid < ask < 199999.99: 
        if not bool(exchange_bbo[security][exchange][date][bucket]['ask']): 
         exchange_bbo[security][exchange][date][bucket]['ask'] = ask 
         exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp 
        elif exchange_bbo[security][exchange][date][bucket]['diff_ask'] > bucket - timestamp: 
         exchange_bbo[security][exchange][date][bucket]['ask'] = ask 
         exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp 
        if not bool(exchange_bbo[security][exchange][date][bucket]['bid']): 
         exchange_bbo[security][exchange][date][bucket]['bid'] = bid 
         exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp 
        elif exchange_bbo[security][exchange][date][bucket]['diff_bid'] > bucket - timestamp: 
         exchange_bbo[security][exchange][date][bucket]['bid'] = bid 
         exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp 
      if i['Type'] == 'Trade' and i['Price'] != "" and i['Price'] != 0.0: 
       timestamp = float(time_to_milli(i['Time[L]'])) 
       bucket = ceil(float(time_to_milli(i['Time[L]']))/100000.0) * 100000.0 
       security = i['#RIC'].split('.')[0] 
       exchange = i['#RIC'].split('.')[1] 
       price = float(i['Price']) 
       volume= float(i['Volume']) 
       if not bool(exchange_bbo[security][exchange][date][bucket]['price']): 
        exchange_bbo[security][exchange][date][bucket]['price'] = price 
        exchange_bbo[security][exchange][date][bucket]['volume'] = volume 
        exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp 
       elif exchange_bbo[security][exchange][date][bucket]['time_diff'] > bucket - timestamp and price != 0.0: 
        exchange_bbo[security][exchange][date][bucket]['price'] = price 
        exchange_bbo[security][exchange][date][bucket]['volume'] = volume 
        exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp 

     # Fill the empty buckets - exchange level 
     for security in exchange_bbo: 
      for exchange in exchange_bbo[security]: 
       for date in exchange_bbo[security][exchange]: 
        for bucket in time_bucket: 
         previous = bucket - 100000.0 
         # best offer 
         bo_t = exchange_bbo[security][exchange][date][bucket]['ask'] 
         bo_t1 = exchange_bbo[security][exchange][date][previous]['ask'] 
         if bo_t == 0.0 and bo_t1 != 0.0: 
          exchange_bbo[security][exchange][date][bucket]['ask'] = bo_t1 
         # best bid 
         bb_t = exchange_bbo[security][exchange][date][bucket]['bid'] 
         bb_t1 = exchange_bbo[security][exchange][date][previous]['bid'] 
         if bb_t == 0.0 and bb_t1 != 0.0: 
          exchange_bbo[security][exchange][date][bucket]['bid'] = bb_t1 

     for security in exchange_bbo: 
      for exchange in exchange_bbo[security]: 
       for date in exchange_bbo[security][exchange]: 
        for bucket in exchange_bbo[security][exchange][date]: 
         if not bool(exchange_bbo[security][exchange][date][bucket]['price']): 
          nbo = exchange_bbo[security][exchange][date][bucket]['ask'] 
          nbb = exchange_bbo[security][exchange][date][bucket]['bid'] 
          midpoint = (nbo + nbb)/2.0 
          price = exchange_bbo[security][exchange][date][bucket]['price'] 
          volume= exchange_bbo[security][exchange][date][bucket]['volume'] 
          # print security, exchange, bucket, price, midpoint 
          if price > 0.0 and midpoint != 0.0: 
           effective_spread[security][exchange][date][bucket]['espread_bps'] = 2.0 * abs(price - midpoint)/midpoint 
           effective_spread[security][exchange][date][bucket]['volume']=volume 
           effective_spread[security][exchange][date]['count'] += 1.0 

     data_writer = csv.DictWriter(open(output_file, 'wb'), 
            fieldnames=['security', 'exchange', 'date', 'bucket' 'espread_bps', 'volume', 'count']) 

     data_writer.writeheader() 

     for security in effective_spread: 
      for exchange in effective_spread[security]: 
       for date in effective_spread[security][exchange]: 
        for bucket in effective_spread[security][exchange][date]: 
         espread_bps = effective_spread[security][exchange][date][bucket]['espread_bps'] 
         volume = effective_spread[security][exchange][date][bucket]['volume'] 
         count = effective_spread[security][exchange][date][bucket]['count'] 
         data_writer.writerow({'security': security, 'exchange': exchange, 'date': date, 'bucket': bucket, 
               'espread_bps': espread_bps, 'volume': volume, 'count': count}) 

input_files = ['ScandinavianTAQ'] 

谢谢你这么多

回答

0

100 GB不算多数据。一个SQL数据库和Pandas应该是你所需要的。您需要了解如何编写SQL查询,并且我建议您获取Wes McKinney的book副本。我没有看过你的代码,但在我看来,最大的问题是你正在逐行处理所有事情,而不是分组你的操作。
此外,检查出Dask

0

如果你打算处理大量的外部存储字典,我会检查出elastic。适用于大数据并具有平均学习曲线。

对于大于内存的文件,您可以查看memmaplazy reading,以便逐行接受。通常迭代是可接受的方法。

分组操作也有助于您的上下文,例如考虑是否存在可以并行执行的独立操作。为此请查看一些示例SO帖子,如this。这将有利于您与您的领域的领域专家谈论优化计算。

你也有权访问外部服务器吗?如果你这样做,而且它是一个分布式系统,你的选择就更多了。