2017-02-09 67 views
2

我收到以下错误(我认为是因为在我的应用程序分叉),“此结果对象不返回行”。分叉,sqlalchemy和范围会话

Traceback 
--------- 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 263, in execute_task 
result = _execute_task(task, data) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 245, in _execute_task 
return func(*args2) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/smg/analytics/services/impact_analysis.py", line 140, in _do_impact_analysis_mp 
Correlation.user_id.in_(user_ids)).all()) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2241, in all 
return list(self) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 65, in instances 
fetch = cursor.fetchall() 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 752, in fetchall 
self.cursor, self.context) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1027, in _handle_dbapi_exception 
util.reraise(*exc_info) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 746, in fetchall 
l = self.process_rows(self._fetchall_impl()) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 715, in _fetchall_impl 
self._non_result() 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 720, in _non_result 
"This result object does not return rows. " 

我正在使用dask和它的多处理调度程序(它使用multiprocessing.Pool)。 据我了解(基于文档),从范围会话对象创建的会话(通过scoped_session()创建)是线程安全的。这是因为他们是threadlocal。这将导致我相信,当我拨打Session()(或使用代理Session)时,我得到一个只存在的会话对象,并且只能从被调用的线程访问。 这看起来很简单。

我感到困惑的是,为什么我在分叉过程中遇到问题。据我所知,你不能 重复使用跨进程的引擎,所以我从文档遵循基于事件的解决方案,并逐字做到了这一点:

class _DB(object): 

    _engine = None 

    @classmethod 
    def _get_engine(cls, force_new=False): 
     if cls._engine is None or force_new is True: 
      cfg = Config.get_config() 
      user = cfg['USER'] 
      host = cfg['HOST'] 
      password = cfg['PASSWORD'] 
      database = cfg['DATABASE'] 
      engine = create_engine(
       'mysql://{}:{}@{}/{}?local_infile=1&' 
       'unix_socket=/var/run/mysqld/mysqld.sock'. 
        format(user, password, host, database), 
       pool_size=5, pool_recycle=3600) 
      cls._engine = engine 
     return cls._engine 



# From the docs, handles multiprocessing 
@event.listens_for(_DB._get_engine(), "connect") 
def connect(dbapi_connection, connection_record): 
    connection_record.info['pid'] = os.getpid() 

#From the docs, handles multiprocessing 
@event.listens_for(_DB._get_engine(), "checkout") 
def checkout(dbapi_connection, connection_record, connection_proxy): 
    pid = os.getpid() 
    if connection_record.info['pid'] != pid: 
     connection_record.connection = connection_proxy.connection = None 
     raise exc.DisconnectionError(
      "Connection record belongs to pid %s, " 
      "attempting to check out in pid %s" % 
      (connection_record.info['pid'], pid) 
     ) 


# The following is how I create the scoped session object. 

Session = scoped_session(sessionmaker(
    bind=_DB._get_engine(), autocommit=False, autoflush=False)) 

Base = declarative_base() 
Base.query = Session.query_property() 

所以我的假设(基于文档)有以下几种:

  1. 使用从范围的会话对象创建一个会话对象,它必须给我一个ThreadLocal会话(这在我的情况下,也只是子进程的主线程)。虽然没有在文档中,我想这应该适用,即使范围会话对象是在另一个进程中创建的。

  2. ThreadLocal的会议将获得通过发动机从池中的连接,如果连接没有这个过程中创建它会创建一个新的(基于上述connection()checkout()实现。)

如果这两件事情都是真的,那么一切都应该“正常工作”(AFAICT)。但情况并非如此。

我设法通过在每个新进程中创建一个新的有限会话会话对象,并在所有后续使用会话的呼叫中使用它。

顺便提一下,Base.query属性也需要从这个新的有作用域的会话对象中更新。

我想我上面的#1假设是不正确的。任何人都可以帮助我理解为什么我需要在每个进程中创建一个新的范围会话对象?

干杯。

+0

你可以发布一个最小的例子,包括分叉的代码,以及完整的堆栈跟踪?我怀疑连接池在fork之前已经连接到数据库,导致两个进程共享套接字。 – univerio

+0

我将添加一些示例代码。在完成任何分支之前,池肯定已经连接,但是使用池的子进程通过在使用它之前检查调用代码的pid来处理,或者创建一个新进程(按照上面的“checkout”方法)。或者至少这是意图的AIUI。 –

+0

您可以创建单节点分布式调度程序,而不是使用'dask.multiprocessing.get'。这将从更清洁的流程中预先分流,并且通常是更清洁的体验:http://dask.pydata.org/en/latest/scheduler-choice.html – MRocklin

回答

0

不清楚什么时候发生fork,但最常见的问题是引擎是在fork之前创建的,它使用pool_size = 5初始化与数据库的TCP连接,然后将其复制到新进程并导致多个进程与相同的物理套接字交互=>麻烦。

选项是:

  • 禁用池和使用按需连接:poolclass = NullPool
  • 重新创建后叉池:sqla_engine。dispose()
  • 延迟的create_engine直到叉后
+0

它肯定是在引擎创建后分叉的,但是AIUI是整个点自定义'checkout()'方法;使用一个池来处理多个进程。我怀疑这与dask推动这个过程的方式有关。 –