2016-12-29 101 views
1

我想使用RxPy打开一个(csv)文件并逐行处理文件。我正是我设想有以下步骤RxPy读取csv文件和进程行

  1. 提供一个文件名到流
  2. 打开由线
  3. 文件
  4. 读取文件中的行删除与注释(例如#开头的行.. 。)
  5. 申请CSV阅读
  6. 筛选记录满足某种条件

到目前为止,我有:

def to_file(filename): 
f = open(filename) 
return Observable.using(
    lambda: AnonymousDisposable(lambda: f.close()), 
    lambda d: Observable.just(f) 
) 

def to_reader(f): 
    return csv.reader(f) 

def print_rows(reader): 
    for row in reader: 
     print(row) 

这工作

Observable.from_(["filename.csv", "filename2.csv"]) 
    .flat_map(to_file).**map**(to_reader).subscribe(print_rows) 

这不:ValueError异常:I/O操作上关闭的文件

Observable.from_(["filename.csv", "filename2.csv"]) 
    .flat_map(to_file).**flat_map**(to_rows).subscribe(print) 

第二届不会因为工作(见https://github.com/ReactiveX/RxPY/issues/69

When the observables from the first flatmap is merged by the second flatmap, the inner subscriptions will be disposed when they complete. Thus the files will be closed, even if the file handles are on_next'ed into the new observable set up by the second flatmap.

任何想法我如何能实现: 喜欢的东西:

Observable.from_(["filename.csv", "filename2.csv"] 
    ).flat_map(to_file 
    ).filter(comment_lines 
    ).filter(empty_lines 
    ).map(to_csv_reader 
    ).filter(filter_by..) 
    ).do whatever 

非常感谢您的帮助

克林斯曼

回答

0

刚开始我最近与RxPy工作,做同样的事情需要。惊讶的人还没有回答你的问题,但决定回答,以防别人需要知道。假设你有一个CSV文件是这样的:

$ cat datafile.csv 
"iata","airport","city","state","country","lat","long" 
"00M","Thigpen ","Bay Springs","MS","USA",31.95376472,-89.23450472 
"00R","Livingston Municipal","Livingston","TX","USA",30.68586111,-95.01792778 
"00V","Meadow Lake","Colorado Springs","CO","USA",38.94574889,-104.5698933 
"01G","Perry-Warsaw","Perry","NY","USA",42.74134667,-78.05208056 
"01J","Hilliard Airpark","Hilliard","FL","USA",30.6880125,-81.90594389 

这里是一个解决方案:

from rx import Observable 
from csv import DictReader 

Observable.from_(DictReader(open('datafile.csv', 'r'))) \ 
      .subscribe(lambda row: 
        print("{0:3}\t{1:<35}".format(row['iata'], row['airport'][:35])) 
     )