2011-09-29 130 views
1

问题同时使用线程/多处理器

我写一个程序,读取了一组从语料库文件(每行一个文件)如何使用简单的SQLAlchemy电话。使用功能processdocument处理每个文档,分配一个唯一的ID,然后写入数据库。理想情况下,我们希望使用多个流程来完成此操作。逻辑如下:

  1. 主例程创建一个新的数据库并设置一些表。
  2. 主例程设置一组进程/线程将运行一个辅助函数。
  3. 主例程启动所有进程。
  4. 主例程读取语料库,将文档添加到队列中。
  5. 每个进程的工作函数循环,从队列中读取文档,使用processdocument从中提取信息,并将信息写入数据库表中的新条目。
  6. 一旦队列为空并且主例程设置了相应的标志(一旦没有更多文档添加到队列中),工作程序就会中断。

问题

我是比较新的SQLAlchemy的(和数据库一般)。我认为用于在主例程中设置数据库的代码工作正常,从我可以告诉的。我被卡住的地方是我不确定如何将每个进程的工作者函数写入数据库而不与其他人冲突。

没有什么特别复杂的事情发生:每个进程都获得一个唯一的值,以便从受Lock保护的multiprocessing.Value对象分配给一个条目。如果有的话,我只是不确定是否应该传递给工作函数(除队列外)。我是否会传递我在主例程中创建的sqlalchemy.Engine实例?元数据实例?我是否为每个流程创建一个新的引擎?有没有其他规范的方式来做到这一点?我需要记住一些特别的东西吗?

附加评论

我很清楚我能不能与多费心,但在一个单一的过程中做到这一点,但我会写出具有几个进程读取数据库后代码在,所以我不妨想出如何做到这一点。

在此先感谢您的帮助!

回答

4

MetaData及其Table对象的集合应该被视为应用程序的固定的,不可变的结构,与您的函数和类定义不同。正如你在分叉子进程时所知道的那样,你的应用程序的所有模块级结构仍然存在于进程边界之内,而表的defs通常在这个类别中。

然而,引擎是指一个DBAPI连接池,通常是TCP/IP连接,有时也是文件句柄。 DBAPI连接本身通常不能在子进程边界上移植,因此您希望为每个子进程创建一个新的Engine,或者使用非池化引擎,这意味着您使用的是NullPool。

你也不应该做任何形式的MetaData与引擎的关联,即“绑定”元数据。这种做法虽然在各种过时的教程和博客文章中很突出,但实际上并不是一个通用目的,我尽可能不去强调这种工作方式。

如果您使用的是ORM,则存在类似于“程序结构/活动工作”的二分法,您的映射类当然是在所有子进程之间共享的,但您肯定希望Session对象对于特定子进程是本地的 - 这些对应于一个实际的DBAPI连接以及大量的其他可变状态,这些状态最好保存在操作本地。

+0

您可能想要提到的是,在OP决定使用线程而不是进程的情况下,会话对象不是线程安全的。整个“提交”操作很大程度上取决于在提交期间对象是不可变的。 – patrys

+0

感谢您的回复,但您可能需要更详细地解释此问题。让我们假装,为了争辩,我真的很愚蠢,不明白这里使用的大部分词汇......我不知道我是否正确理解这一点:我不需要做任何特别的事情所有?不要将Table实例作为参数传递给worker函数,只需添加并提交即可? –

+0

我没有使用ORM,因为我还没有真正想出详细的工作方式(对于所有这些都很新颖),但是我可能需要,因为我基本上正在处理两个表格,并且在ManytoMany关系中有行。这需要使用ORM,对吧? –