我有一个反应器,从一个RabbitMQ的经纪人获取消息,并引发工人方法的过程池来处理这些消息,是这样的:如何处理ProcessPool中的SQLAlchemy连接?
这是使用python asyncio
,loop.run_in_executor()
和concurrent.futures.ProcessPoolExecutor
实施。
现在我想使用SQLAlchemy访问worker方法中的数据库。大多数处理将是非常简单快速的CRUD操作。
该反应器将处理每秒10-50消息在一开始,所以这是不能接受打开为每个请求一个新的数据库连接。相反,我想维护每个进程的一个持久连接。
我的问题是:我该怎么做?我可以将它们存储在全局变量中吗? SQA连接池会为我处理这个问题吗?反应堆停止时如何清理?
[更新]
- 该数据库是MySQL搭配InnoDB的。
为什么选择这种模式与进程池?
当前实现使用每个消费者在自己的线程上运行不同的模式。不知何故,这不起作用。目前已有大约200名消费者在各自的线程中运行,并且系统正在快速增长。为了更好地扩展,这个想法是将关注点分开,并在I/O循环中使用消息并将处理委托给池。当然,整个系统的性能主要是I/O界限。但是,处理大型结果集时,CPU是个问题。
另一个原因是“易用性”。虽然消息的连接处理和消耗是异步实现的,但工作者中的代码可以是同步且简单的。
不久,很明显,从工人中访问通过持续的网络连接远程系统是一个问题。这就是CommunicationChannels的用途:在工作人员内部,我可以通过这些渠道向消息总线发出请求。
我的一个想法目前是处理类似的方式DB访问:通过队列事件循环在那里它们被发送到DB传递报表。但是,我不知道如何使用SQLAlchemy完成此操作。 入口点在哪里? 对象在通过队列时需要为pickled
。我如何从SQA查询中获得这样的对象? 为了不阻塞事件循环,与数据库的通信必须异步工作。我可以使用例如aiomysql作为SQA的数据库驱动程序?
那么每个工人都是自己的过程?那么不能共享连接,所以也许你应该用最大1或2个连接限制实例化每个(本地)SQA池。然后观察,也许通过数据库(哪个数据库?)什么连接正在产生/被杀死。受到严重的伤害 - 你不愿意做的是在SQA的顶端实施你自己的天真的连接池。或者尝试确定SQA conn是否关闭。 –
@JLPeyret:我用你要求的信息更新了问题。不,我不打算实现我自己的连接池。 – roman
因此,我想我记得连接不能跨进程(在OS的意义上说,与线程区分)。而且我知道关系根本就不好。你应该能够发送“死”(字符串)sql语句,但我相信你会很难通过db conns,我想可能包括SQA结果。对我的猜测,但在一定程度上玩奇怪的SQA用法来证明它。 –