18
与我发表的另一篇文章类似,这回答了该帖子并创建了一个新问题。创建数据库连接并维护多个进程(多处理)
回顾:我需要更新空间数据库中的每个记录,其中有一组覆盖多边形数据集的点的数据集。对于每个点要素,我想分配一个键以将其与它所在的面要素关联起来。因此,如果我的观点'纽约市'位于美国多边形和美国多边形'GID = 1'内,我将为我的观点纽约市分配'gid_fkey = 1'。
好吧,这已经实现了使用多处理。我注意到使用这个速度增加了150%,所以它工作。但我认为有一些不必要的开销,因为每个记录需要一个数据库连接。
所以这里是代码:
import multiprocessing, time, psycopg2
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
print 'Tasks Complete'
self.task_queue.task_done()
break
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a):
self.a = a
def __call__(self):
pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConn.set_isolation_level(0)
pyCursor1 = pyConn.cursor()
procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)
pyCursor1.execute(procQuery)
print 'What is self?'
print self.a
return self.a
def __str__(self):
return 'ARC'
def run(self):
print 'IN'
if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
for w in consumers:
w.start()
pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConnX.set_isolation_level(0)
pyCursorX = pyConnX.cursor()
pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')
temp = pyCursorX.fetchall()
num_job = temp[0]
num_jobs = num_job[0]
pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')
cityIdListTuple = pyCursorX.fetchall()
cityIdListList = []
for x in cityIdListTuple:
cityIdList.append(x[0])
for i in xrange(num_jobs):
tasks.put(Task(cityIdList[i - 1]))
for i in xrange(num_consumers):
tasks.put(None)
while num_jobs:
result = results.get()
print result
num_jobs -= 1
它看起来是每个连接0.3和1.5秒之间,因为我有“时间”模块测量。
有没有办法让每个进程都建立一个数据库连接,然后只用city_id info作为一个变量,我可以将这个变量提供给这个open中的游标查询?这样我说四个进程与每个数据库连接,然后放下我city_id以某种方式处理。
伴侣得到成功的治疗。没有荣誉给你批准的机会,但代码是绝对的魔法。摆脱固定的数据库连接可以轻松地将速度提高50%。在某些情况下可能接近100%。再次感谢。 –
@EnE_:我很高兴它帮助你:)。你应该接受答案,你有权这样做,因为你是问题的主人。 –
好吧,我不得不承认,我认为我应该按下向上的箭头而不是剔。 '批准的滴答声'是一个不幸的自我谴责的转向= D –