2016-12-26 66 views
0

发生了!我是Cloud-DataFlow的新手。OutOfMemoryError使用TextIO.Read

我使用DataflowPipelineRunner读取一个CSV文件,并将结果输出到BigQuery.It效果很好,当CSV文件的尺寸很小(只有20条记录,小于1MB),但去OOM错误,而该文件的大小变得巨大(超过1000万条记录,大约616.42 MB)。

以下是错误消息:

java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at com.google.cloud.dataflow.sdk.util.StreamUtils.getBytes(StreamUtils.java:63) at co.coder.MyCoder.decode(MyCoder.java:54) at co.coder.MyCoder.decode(MyCoder.java:1) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.decodeCurrentElement(TextIO.java:1065) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.readNextRecord(TextIO.java:1052) at com.google.cloud.dataflow.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:536) at com.google.cloud.dataflow.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:287) at com.google.cloud.dataflow.sdk.runners.worker.WorkerCustomSources$BoundedReaderIterator.advance(WorkerCustomSources.java:541) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$SynchronizedReaderIterator.advance(ReadOperation.java:425) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:217) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:284) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:220) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:170) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:172) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:159) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

从错误信息,在发生错误[MyCoder.java:54].MyCoder是我实现CustomCoder的子类,它是要编码的CSV文件从按住Shift JIS为UTF-8:

53:@Override 
54:public String decode(InputStream inStream, Context context) throws CoderException, IOException { 
55: if (context.isWholeStream) { 
56:  byte[] bytes = StreamUtils.getBytes(inStream); 
57:  return new String(bytes, Charset.forName("Shift_JIS")); 
58: } else { 
59:  try { 
60:   return readString(new DataInputStream(inStream)); 
61:  } catch (EOFException | UTFDataFormatException exn) { 
62:   // These exceptions correspond to decoding problems, so change 
63:   // what kind of exception they're branded as. 
64:   throw new CoderException(exn); 
65:  } 
66: } 
67:} 

和ALSE,这里是我跑了DataflowPipelineRunner:

DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); 
    options.setRunner(DataflowPipelineRunner.class); 
    options.setProject(projectId); 
    options.setStagingLocation(stagingFolderPathInGCS); 
    options.setWorkerMachineType("n1-highmem-4"); 
    options.setMaxNumWorkers(5); 
    Pipeline p = Pipeline.create(options); 
    // read csv from gcs 
    PCollection<String> lines = p.apply(TextIO.Read.named("csv input") 
      .from("gs://" + bucketName + "/original/" + fileName).withCoder(MyCoder.of())); 
    lines.apply(TextIO.Write.named("csv output").to("gs://" + bucketName + "/encoded/" + fileName) 
      .withCoder(StringUtf8Coder.of()).withoutSharding().withHeader("test Header")); 
p.run(); 

因为Dataflow是一个可扩展的大数据云服务,所以我对这个OOM错误有点困惑,任何人都可以向我解释为什么[OutOfMemoryError]发生了,以及如何解决它?

非常感谢!

+0

你应该尝试通过线和工艺这样TII阅读您的文件一致。在这个时候,如果一个600MB的大文件正在创建一个字符串。在这里,你得到的是异常 – ZeusNet

+0

你好,ZeusNet,谢谢你的回复。我将GCE的机器类型设置为“n1-highmem-4”,这意味着虚拟机的内存应该是13GB,即使是不逐行读取它的问题,600MB的字符串似乎没有达到内存的限制。 .. – xialin

+0

您的管道定义中没有任何内容会导致OOM。使用'withoutSharding()'会严重限制性能,因为写操作无法并行化,但这不会导致崩溃。在你的编码器中调用了什么'readString()'? –

回答

1

我没有声张理解,但解决像下面这样的问题:

but went OOM error while the file's size becomes huge(over 10million records, about 616.42 MB).

那是因为我刚刚从应对较小的文件,使测试数据(仅20个记录,小于1MB),在换句话说,这1000万条数据只有20个密钥。 所以我改变了另一个有很多密钥的测试数据(不是太多的重复数据)。

还有,我跟着肯·诺尔斯建议,让数据流进行管理的工作,例如全自动去除下面的代码:

withoutSharding() 
options.setWorkerMachineType("n1-highmem-4"); 

Finnaly数据流的工作效果很好(MachineType全自动使用N1-STANDARD-1 )!

有关数据流的【动态再平衡工作进一步信息可以发现如下: https://cloud.google.com/dataflow/service/dataflow-service-desc#Autotuning