2017-02-20 132 views
1

我想模拟一些dna-sequencing读取,并且为了加速代码,我需要并行运行它。基本上,我想要做的是以下几点:我从人类基因组中取样读取,并且我认为从多处理模块的两个过程中的一个尝试从相同的文件(人类基因组)中获取数据进程被破坏,无法获得所需的DNA序列。我尝试过不同的事情,但我对并行编程非常陌生,并且我无法解决我的问题多进程,读取同一文件的各种进程

当我使用一个内核运行脚本时,它工作正常。

这是我调用该函数

if __name__ == '__main__': 
    jobs = [] 
    # init the processes 
    for i in range(number_of_cores): 
     length= 100 
     lock = mp.Manager().Lock() 
     p = mp.Process(target=simulations.sim_reads,args=(lock,FastaFile, "/home/inigo/msc_thesis/genome_data/hg38.fa",length,paired,results_dir,spawn_reads[i],temp_file_names[i])) 
     jobs.append(p) 
     p.start() 
    for p in jobs: 
     p.join() 

的方式,这是我使用来获取读取功能,分别处理数据写入到不同的文件。

def sim_single_end(lc,fastafile,chr,chr_pos_start,chr_pos_end,read_length, unique_id): 

    lc.acquire() 
    left_split_read = fastafile.fetch(chr, chr_pos_end - (read_length/2), chr_pos_end) 
    right_split_read = fastafile.fetch(chr, chr_pos_start, chr_pos_start + (read_length/2)) 
    reversed_left_split_read = left_split_read[::-1] 
    total_read = reversed_left_split_read + right_split_read 
    seq_id = "id:%s-%s|left_pos:%s-%s|right:%s-%s " % (unique_id,chr, int(chr_pos_end - (read_length/2)), int(chr_pos_end), int(chr_pos_start),int(chr_pos_start + (read_length/2))) 
    quality = "I" * read_length 
    fastq_string = "@%s\n%s\n+\n%s\n" % (seq_id, total_read, quality) 
    lc.release() 
    new_record = SeqIO.read(StringIO(fastq_string), "fastq") 
    return(new_record) 

这里是回溯:

Traceback (most recent call last): 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "/home/inigo/Dropbox/PycharmProjects/circ_dna/simulations.py", line 107, in sim_ecc_reads 
    new_read = sim_single_end(lc,fastafile, chr, chr_pos_start, chr_pos_end, read_length, read_id) 
    File "/home/inigo/Dropbox/PycharmProjects/circ_dna/simulations.py", line 132, in sim_single_end 
    new_record = SeqIO.read(StringIO(fastq_string), "fastq") 
    File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/__init__.py", line 664, in read 
    first = next(iterator) 
    File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/__init__.py", line 600, in parse 
for r in i: 
    File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/QualityIO.py", line 1031, in FastqPhredIterator 
for title_line, seq_string, quality_string in FastqGeneralIterator(handle): 
    File "/usr/local/lib/python3.5/dist-packages/Bio/SeqIO/QualityIO.py", line 951, in FastqGeneralIterator 
% (title_line, seq_len, len(quality_string))) 

ValueError: Lengths of sequence and quality values differs for id:6-chr1_KI270707v1_random|left_pos:50511537-50511587|right:50511214-50511264 (0 and 100). 
+0

您还没有表现出回溯的错误。如果进程每次写入不同的文件,则可以事先随机化所有读取的行(确保不重叠),将该块列出并传递给每个进程。只要您阻止他们同时读取文件,每个进程只会查看其预先分配的块 – roganjosh

+1

您已经包含了追踪,除了错误本身。 – roganjosh

+0

@roganjosh嗨!我添加了回溯。我试图阻止他们用'Lock()'同时读取文件。然而它没有奏效。我应该如何防止他们同时读取文件? – Praderas

回答

0

我这个答案,我做了差不多一年前的OP。问题是我用来阅读人类基因组文件(pysam)的软件包失败了。调用多处理时,这个问题是一个错字。

从respose作者,这应该工作:

p = mp.Process(target=get_fasta, args=(genome_fa,)) 

注意“”以确保您传递一个元组

详情请参阅https://github.com/pysam-developers/pysam/issues/409