发生了!我是Cloud-DataFlow的新手。OutOfMemoryError使用TextIO.Read
我使用DataflowPipelineRunner读取一个CSV文件,并将结果输出到BigQuery.It效果很好,当CSV文件的尺寸很小(只有20条记录,小于1MB),但去OOM错误,而该文件的大小变得巨大(超过1000万条记录,大约616.42 MB)。
以下是错误消息:
java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at com.google.cloud.dataflow.sdk.util.StreamUtils.getBytes(StreamUtils.java:63) at co.coder.MyCoder.decode(MyCoder.java:54) at co.coder.MyCoder.decode(MyCoder.java:1) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.decodeCurrentElement(TextIO.java:1065) at com.google.cloud.dataflow.sdk.io.TextIO$TextSource$TextBasedReader.readNextRecord(TextIO.java:1052) at com.google.cloud.dataflow.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:536) at com.google.cloud.dataflow.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:287) at com.google.cloud.dataflow.sdk.runners.worker.WorkerCustomSources$BoundedReaderIterator.advance(WorkerCustomSources.java:541) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation$SynchronizedReaderIterator.advance(ReadOperation.java:425) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:217) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:284) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:220) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:170) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:172) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:159) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
从错误信息,在发生错误[MyCoder.java:54].MyCoder是我实现CustomCoder的子类,它是要编码的CSV文件从按住Shift JIS为UTF-8:
53:@Override
54:public String decode(InputStream inStream, Context context) throws CoderException, IOException {
55: if (context.isWholeStream) {
56: byte[] bytes = StreamUtils.getBytes(inStream);
57: return new String(bytes, Charset.forName("Shift_JIS"));
58: } else {
59: try {
60: return readString(new DataInputStream(inStream));
61: } catch (EOFException | UTFDataFormatException exn) {
62: // These exceptions correspond to decoding problems, so change
63: // what kind of exception they're branded as.
64: throw new CoderException(exn);
65: }
66: }
67:}
和ALSE,这里是我跑了DataflowPipelineRunner:
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectId);
options.setStagingLocation(stagingFolderPathInGCS);
options.setWorkerMachineType("n1-highmem-4");
options.setMaxNumWorkers(5);
Pipeline p = Pipeline.create(options);
// read csv from gcs
PCollection<String> lines = p.apply(TextIO.Read.named("csv input")
.from("gs://" + bucketName + "/original/" + fileName).withCoder(MyCoder.of()));
lines.apply(TextIO.Write.named("csv output").to("gs://" + bucketName + "/encoded/" + fileName)
.withCoder(StringUtf8Coder.of()).withoutSharding().withHeader("test Header"));
p.run();
因为Dataflow是一个可扩展的大数据云服务,所以我对这个OOM错误有点困惑,任何人都可以向我解释为什么[OutOfMemoryError]发生了,以及如何解决它?
非常感谢!
你应该尝试通过线和工艺这样TII阅读您的文件一致。在这个时候,如果一个600MB的大文件正在创建一个字符串。在这里,你得到的是异常 – ZeusNet
你好,ZeusNet,谢谢你的回复。我将GCE的机器类型设置为“n1-highmem-4”,这意味着虚拟机的内存应该是13GB,即使是不逐行读取它的问题,600MB的字符串似乎没有达到内存的限制。 .. – xialin
您的管道定义中没有任何内容会导致OOM。使用'withoutSharding()'会严重限制性能,因为写操作无法并行化,但这不会导致崩溃。在你的编码器中调用了什么'readString()'? –