我想使用RxPy打开一个(csv)文件并逐行处理文件。我正是我设想有以下步骤RxPy读取csv文件和进程行
- 提供一个文件名到流
- 打开由线
- 文件
- 读取文件中的行删除与注释(例如#开头的行.. 。)
- 申请CSV阅读
- 筛选记录满足某种条件
到目前为止,我有:
def to_file(filename):
f = open(filename)
return Observable.using(
lambda: AnonymousDisposable(lambda: f.close()),
lambda d: Observable.just(f)
)
def to_reader(f):
return csv.reader(f)
def print_rows(reader):
for row in reader:
print(row)
这工作
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**map**(to_reader).subscribe(print_rows)
这不:ValueError异常:I/O操作上关闭的文件
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**flat_map**(to_rows).subscribe(print)
第二届不会因为工作(见https://github.com/ReactiveX/RxPY/issues/69)
When the observables from the first flatmap is merged by the second flatmap, the inner subscriptions will be disposed when they complete. Thus the files will be closed, even if the file handles are on_next'ed into the new observable set up by the second flatmap.
任何想法我如何能实现: 喜欢的东西:
Observable.from_(["filename.csv", "filename2.csv"]
).flat_map(to_file
).filter(comment_lines
).filter(empty_lines
).map(to_csv_reader
).filter(filter_by..)
).do whatever
非常感谢您的帮助
克林斯曼