2016-08-01 85 views
1

有我的RPC方法两个操作:如何使用asyncio和postgres在python中进行交易?

async def my_rpc(self, data): 
    async with self.Engine() as conn: 
     await conn.execute("SELECT ... FROM MyTable"); 
     ... # It seems the table MyTable can be changed by another RPC 
     await conn.execute("UPDATA MyTable ..."); 

另一个RPC方法可以操作前“my_rpc”将完成(SQL查询两个等待着之间)改变DB。如何避免这种情况?

self.Engine的代码(带发动机调用aiopg.sa.create_engine):

class ConnectionContextManager(object): 
    def __init__(self, engine): 
     self.conn = None 
     self.engine = engine 

    async def __aenter__(self): 
     if self.engine: 
      self.conn = await self.engine.acquire() 
      return self.conn 

    async def __aexit__(self, exc_type, exc, tb): 
     try: 
      self.engine.release(self.conn) 
      self.conn.close() 
     finally: 
      self.conn = None 
      self.engine = None 
+0

你可以显示self.Engine的代码吗? – jsbueno

+0

@jsbueno我添加了代码 – Broly

回答

2

首先,aiopg以自动提交模式工作,这意味着您必须在手动模式下使用交易。 Read more details。其次,您必须使用SELECT FOR UPDATE来读取第一个语句中读取的锁定行。 SELECT FOR UPDATE锁定选择行,直到事务完成。 Read more details

async def my_rpc(self, data): 
    async with self.Engine() as conn: 
     await conn.execute("BEGIN") 
     await conn.execute("SELECT ... FROM MyTable WHERE some_clause = some_value FOR UPDATE") 
     ... # It seems the table MyTable can be changed by another RPC 
     await conn.execute("UPDATE MyTable SET some_clause=...") 
     await conn.execute("""COMMIT""") 
1

它看起来像,以避免混乱的唯一方法是让每个交易发生在一个单独的数据库连接(Python端游标赢得't do) 要做到这一点的方法是拥有一个连接池 - 并让您的Engine方法为每个“异步线程”提供不同的连接。

如果Postgresql本身的连接器是异步感知的(你使用哪个驱动程序,btw?),那会更容易一些。或者它上面的数据库包装层。如果不是,你将不得不自己实现这个连接池。我认为Sqlalchemy连接池将适用于这种情况,因为独立于共同程序使用,连接将仅在async with区块结束时释放。

+0

我还没有测试过我的代码。 但是,使用“aiopg.sa”的单独连接。 也许这正是我需要的? – Broly

+0

是的。因为你正在通过'async with'获取你的连接,并使用一个准备好asyncio的连接器,这应该就是你所需要的。 – jsbueno

+0

我想所以不是所有的...你必须手动使用事务并且还需要更新 – Petr