2016-05-31 156 views
1

遇到一个相当令人沮丧的错误,当我的一个api端点被访问时弹出。为了给出上下文,我正在使用的应用程序Flask应用程序使用SQLAlchemy将数据存储在一个PostgreSQL数据库集以保存1000个连接。TimeoutError:QueuePool限制大小5溢出10达到,连接超时,超时30

用户可以查询所述数据的方式之一是通过端点/timeseries端点。数据以json的形式返回,该数据由查询数据库返回的ResultProxies组装而成。

的希望是,通过使用多线程,我能把由/时间序列运行速度更快的视图控制器调用的方法,因为我们原来设定的时间太长,其将返回大量数据的查询。

由于没有正确清理会话,我读过许多其他帖子,但我感觉好像我已经覆盖了相同的问题。我编写的代码有什么明显的错误?

该应用程序使用AWS弹性beanstalk进行部署。

@classmethod 
def timeseries_all(cls, table_names, agg_unit, start, end, geom=None): 
    """ 
    For each candidate dataset, query the matching timeseries and push datasets with nonempty 
    timeseries into a list to convert to JSON and display. 

    :param table_names: list of tables to generate timetables for 
    :param agg_unit: a unit of time to divide up the data by (day, week, month, year) 
    :param start: starting date to limit query 
    :param end: ending date to limit query 
    :param geom: geometric constraints of the query 

    :returns: timeseries list to display 
    """ 

    threads = [] 
    timeseries_dicts = [] 

    # set up engine for use with threading 
    psql_db = create_engine(DATABASE_CONN, pool_size=10, max_overflow=-1, pool_timeout=100) 
    scoped_sessionmaker = scoped_session(sessionmaker(bind=psql_db, autoflush=True, autocommit=True)) 

    def fetch_timeseries(t_name): 
     _session = scoped_sessionmaker() 
     # retrieve MetaTable object to call timeseries from 
     table = MetaTable.get_by_dataset_name(t_name) 
     # retrieve ResultProxy from executing timeseries selection 
     rp = _session.execute(table.timeseries(agg_unit, start, end, geom)) 

     # empty results will just have a header 
     if rp.rowcount > 0: 

      timeseries = { 
       'dataset_name': t_name, 
       'items': [], 
       'count': 0 
      } 

      for row in rp.fetchall(): 
       timeseries['items'].append({'count': row.count, 'datetime': row.time_bucket.date()}) 
       timeseries['count'] += row.count 

      # load to outer storage 
      timeseries_dicts.append(timeseries) 

     # clean up session 
     rp.close() 
     scoped_sessionmaker.remove() 

    # create a new thread for every table to query 
    for name in table_names: 
     thread = threading.Thread(target=fetch_timeseries, args=(name,)) 
     threads.append(thread) 

    # start all threads 
    for thread in threads: 
     thread.start() 

    # wait for all threads to finish 
    for thread in threads: 
     thread.join() 

    # release all connections associated with this engine 
    psql_db.dispose() 

    return timeseries_dicts 
+0

嗯,是有可能,你简单地试图同时查询超过15个(或10个,从你的代码)表? – univerio

回答

3

我认为你是以一种迂回的方式讨论这个问题的。这里有一些关于充分利用postgres连接的建议(我在生产中使用了这种配置)。

  1. 我将使用Flask-SQLAlchemy扩展来处理到您的Postgres实例的连接。如果您查看SQLAlchemy文档,您将看到author highly recommends使用它来处理数据库连接生命周期,而不是滚动您自己的数据库。
  2. 处理大量请求的更高性能的方法是将Flask应用程序放在一个wsgi服务器后面,如gunicornuwsgi。这些服务器将能够产生你的应用程序的多个实例。然后,当有人点击你的端点时,你的连接将在这些实例之间进行负载平衡。

    因此,举例来说,如果你有uwsgi的配置运行5个进程,那么你将能够同时门把手50个DB连接(5个应用程序的每个×10个池)