23

我有一个反应器,从一个RabbitMQ的经纪人获取消息,并引发工人方法的过程池来处理这些消息,是这样的:如何处理ProcessPool中的SQLAlchemy连接?

Reactor

这是使用python asyncioloop.run_in_executor()concurrent.futures.ProcessPoolExecutor实施。

现在我想使用SQLAlchemy访问worker方法中的数据库。大多数处理将是非常简单快速的CRUD操作。

该反应器将处理每秒10-50消息在一开始,所以这是不能接受打开为每个请求一个新的数据库连接。相反,我想维护每个进程的一个持久连接。

我的问题是:我该怎么做?我可以将它们存储在全局变量中吗? SQA连接池会为我处理这个问题吗?反应堆停止时如何清理?

[更新]

  • 该数据库是MySQL搭配InnoDB的。

为什么选择这种模式与进程池?

当前实现使用每个消费者在自己的线程上运行不同的模式。不知何故,这不起作用。目前已有大约200名消费者在各自的线程中运行,并且系统正在快速增长。为了更好地扩展,这个想法是将关注点分开,并在I/O循环中使用消息并将处理委托给池。当然,整个系统的性能主要是I/O界限。但是,处理大型结果集时,CPU是个问题。

另一个原因是“易用性”。虽然消息的连接处理和消耗是异步实现的,但工作者中的代码可以是同步且简单的。

不久,很明显,从工人中访问通过持续的网络连接远程系统是一个问题。这就是CommunicationChannels的用途:在工作人员内部,我可以通过这些渠道向消息总线发出请求。

我的一个想法目前是处理类似的方式DB访问:通过队列事件循环在那里它们被发送到DB传递报表。但是,我不知道如何使用SQLAlchemy完成此操作。 入口点在哪里? 对象在通过队列时需要为pickled。我如何从SQA查询中获得这样的对象? 为了不阻塞事件循环,与数据库的通信必须异步工作。我可以使用例如aiomysql作为SQA的数据库驱动程序?

+0

那么每个工人都是自己的过程?那么不能共享连接,所以也许你应该用最大1或2个连接限制实例化每个(本地)SQA池。然后观察,也许通过数据库(哪个数据库?)什么连接正在产生/被杀死。受到严重的伤害 - 你不愿意做的是在SQA的顶端实施你自己的天真的连接池。或者尝试确定SQA conn是否关闭。 –

+0

@JLPeyret:我用你要求的信息更新了问题。不,我不打算实现我自己的连接池。 – roman

+0

因此,我想我记得连接不能跨进程(在OS的意义上说,与线程区分)。而且我知道关系根本就不好。你应该能够发送“死”(字符串)sql语句,但我相信你会很难通过db conns,我想可能包括SQA结果。对我的猜测,但在一定程度上玩奇怪的SQA用法来证明它。 –

回答

6

你的每个进程池进程一个数据库连接可以很容易满足,如果一些小心你如何实例化session,假设您正在使用ORM的工作,在工作进程的要求。

简单的解决办法是让你跨请求重用全球session

# db.py 
engine = create_engine("connection_uri", pool_size=1, max_overflow=0) 
DBSession = scoped_session(sessionmaker(bind=engine)) 

而且对工人的任务:

# task.py 
from db import engine, DBSession 
def task(): 
    DBSession.begin() # each task will get its own transaction over the global connection 
    ... 
    DBSession.query(...) 
    ... 
    DBSession.close() # cleanup on task end 

参数pool_sizemax_overflowcustomize所使用的默认QueuePool create_engine。 pool_size将确保您的进程只保留进程池中每个进程的1个连接。

如果您希望它重新连接,您可以使用DBSession.remove()这将从注册表中删除会话,并将使它在下一次DBSession使用时重新连接。您也可以使用recycle参数Pool在指定的时间段之后使连接重新连接。

在开发/ debbuging期间,您可以使用AssertionPool,如果从池中检出多个连接,将会引发异常,请参阅switching pool implementations关于如何执行此操作。

+0

所以你基本上是建议我不要担心,因为SQA池会处理这个问题。这会很好!我将在接下来的几天内将我们的主应用程序+200用户和+20000行代码移植到新的软件架构中,并查看它是否可行。 – roman

+0

@roman祝你好运,如果你有任何问题,请不要犹豫,在这里发表评论,如果你觉得我覆盖了你的问题,这将是很好的标记为接受:)。 – olokki

+0

似乎到目前为止工作得很好! :)应该提及文档中的这一部分,我认为http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html?highlight=multiprocessing#using-connection-pools-with-multiprocessing。人们必须特别关心多处理。 – roman

0

@roman:你在那里有很好的挑战。

我已经在类似的情况是之前,所以这是我的2美分:除非只是这种消费“读”“写”消息,没有做任何真正等待处理,你可以重新设计这个消费者作为消费者/生产者将会消耗消费消息,它将处理消息并且然后将结果放到另一个队列中,那个队列(处理的消息对于说)可以被1..N非读取 - 在本身的整个生命周期中打开数据库连接的异步过程。

我可以延伸我的答案,但我不知道这种方法是否适合您的需求,如果是这样,我可以给你更多关于扩展设计的细节。

+0

我正在考虑这样一种方法,但我认为这将是很难得到正确的交易处理。我想我不想尝试构建我自己的分布式事务管理器。 – roman

0

一个非常好的方法是使用web服务器来处理和扩展进程池。即使在默认状态下,flask-sqlalchemy也会保留连接池,并且不会在每个请求响应周期中关闭每个连接。

asyncio执行程序可以调用url终点来执行您的功能。额外的好处是,因为所有执行这项工作的进程都在url的后面,所以你可以在多台机器上扩展你的工作池,通过gunicorn或其他许多方法来扩展一个简单的wsgi服务器。再加上你获得了所有的容错善良。

不利的一面是,您可能会通过网络传递更多信息。不过,正如你所说的那样,问题是CPU绑定,你可能会传递更多的数据到数据库。

+0

当我说CPU是一个问题时,我并不是说主要的工作负载是CPU绑定的!这不是......与上面的其他方法一样,我在这里看到了交易处理的严重问题。在业务逻辑和持久层之间建立无状态网络连接听起来很可怕。 – roman