我希望运行总共nAnalysis = 25个的Abaqus模型,核中的每使用X数目,我可以同时运行nParallelLoops = 5这些模型的。如果目前的5所分析的一个结束,然后又分析应该开始,直到所有nAnalysis完成。使用Concurrent.Futures.ProcessPoolExecutor运行同时与独立ABAQUS模型
我使用和中发布的解决方案实现了以下代码。然而,我错过了一些东西,因为所有n分析尝试从“一次”开始,代码发生死锁并且没有分析完成,因为很多时候可能想要使用与已经开始的分析相同的内核。
- Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs
- How to parallelize this nested loop in Python that calls Abaqus
def runABQfile(*args):
import subprocess
import os
inpFile,path,jobVars = args
prcStr1 = (path+'/runJob.sh')
process = subprocess.check_call(prcStr1, stdin=None, stdout=None, stderr=None, shell=True, cwd=path)
def safeABQrun(*args):
import os
try:
runABQfile(*args)
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import wait
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,nAnalysis)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
最多只有这样,现在我能够运行是,如果我修改errFunction
在下面的时候用了整整5分析。然而,这种方法有时会导致在分析的每一个组中采取比其他4长得多(每ProcessPoolExecutor
调用),因此5下一组不会不顾资源的可用性(核)开始。最终这会导致更多时间完成所有25个型号。
def errFunction(ppos, *args):
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import wait
# Group 1
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,5)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 2
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(5,10)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 3
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(10,15)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 4
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(15,20)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 5
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(20,25)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
我试过使用as_completed
函数,但它似乎不工作。
请你能帮助找出正确的并行化,所以我可以运行nAnalysis,总是具有nParallelLoops同时运行? 您的帮助表示感谢。 我使用Python 2.7
最好成绩, 大卫P.
UPDATE JULY二千零十六分之三十〇:
我在safeABQrun
推出了循环和管理的5个不同的“队列”。循环是避免分析尝试在节点中运行而另一个仍在运行的情况下所必需的。分析预先配置为在开始任何实际分析之前在其中一个请求节点中运行。
def safeABQrun(*list_args):
import os
inpFiles,paths,jobVars = list_args
nA = len(inpFiles)
for k in range(0,nA):
args = (inpFiles[k],paths[k],jobVars[k])
try:
runABQfile(*args) # Actual Run Function
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args) # 5Nodes
for f in as_completed(futures):
print("|=== Finish Process Train %d ===|" % futures[f])
if f.exception() is not None:
print('%r generated an exception: %s' % (futures[f], f.exception()))