2012-04-09 76 views
0

我在亚马逊S3上有巨大的csv文件(100MB +),我想用块读取它们并使用ruby CSV库处理它们。我有一个很难创建CSV处理正确的IO对象:Ruby中的缓冲/ RingBuffer IO + Amazon S3非阻塞区块读取

buffer = TheRightIOClass.new 
bytes_received = 0 
RightAws::S3Interface.new(<access_key>, <access_secret>).retrieve_object(bucket, key) do  |chunk| 
    bytes_received += buffer.write(chunk) 
    if bytes_received >= 1*MEGABYTE 
    bytes_received = 0 
    csv(buffer).each do |row| 
     process_csv_record(row) 
    end 
    end 
end 

def csv(io) 
    @csv ||= CSV.new(io, headers: true) 
end 

我不知道这里的正确设置应该是什么和什么TheRightIOClass是。我不想用StringIO将整个文件加载到内存中。在ruby中是否有bufferedio或ringbuffer来执行此操作? 如果有人使用线程(无进程)和管道有一个很好的解决方案,我很想看到它。

回答

2

您可以使用StringIO并进行一些巧妙的错误处理,以确保您在处理块之前拥有整行。此示例中的打包器类仅将分析的行累积到内存中,直到将它们刷新到磁盘或数据库。

packer = Packer.new 
object = AWS::S3.new.buckets[bucket].objects[path] 
io = StringIO.new 
csv = ::CSV.new(io, headers: true) 
object.read do |chunk| 
    #Append the most recent chunk and rewind the IO 
    io << chunk 
    io.rewind 
    last_offset = 0 
    begin 
    while row = csv.shift do 
     #Store the parsed row unless we're at the end of a chunk 
     unless io.eof? 
     last_offset = io.pos 
     packer << row.to_hash 
     end 
    end 
    rescue ArgumentError, ::CSV::MalformedCSVError => e 
    #Only rescue malformed UTF-8 and CSV errors if we're at the end of chunk 
    raise e unless io.eof? 
    end 
    #Seek to our last offset, create a new StringIO with that partial row & advance the cursor 
    io.seek(last_offset) 
    io.reopen(io.read) 
    io.read 
    #Flush our accumulated rows to disk every 1 Meg 
    packer.flush if packer.bytes > 1*MEGABYTES 
end 
#Read the last row 
io.rewind 
packer << csv.shift.to_hash 
packer