2017-10-09 89 views
4

在分布式处理环境中,通常使用“part-000”等“part”文件名,是否可以编写某种扩展名来重命名个别输出文件名(例如per窗口文件名)Apache Beam?Apache Beam是否支持其输出的自定义文件名?

要做到这一点,人们可能必须能够为窗口指定名称或根据窗口内容推断文件名。我想知道这种方法是否可行。

至于溶液是否应该被流式传输或批次,流模式的例子是优选的

回答

1

是。每documentation of TextIO

如果你想在文件名如何生成比默认政策允许,自定义FilenamePolicy也可以使用TextIO.Write.to(FilenamePolicy)设置更好地控制

+0

你可以给一些示例代码吗?当尝试这种方法时,我得到了一个ClassCastException ... –

+0

请包括您的代码和你得到的错误的完整堆栈跟踪。 – jkff

5

是由JKFF建议您可以使用TextIO.write.to(FilenamePolicy)来实现此目的。

示例如下:

如果你想输出写入特定的本地文件,你可以使用:

lines.apply(TextIO.write()来(“/路径/到/ file.txt的“));

以下是使用前缀link编写输出的简单方法。这个例子是为谷歌存储,而不是你可以使用本地/ s3路径。

public class MinimalWordCountJava8 { 

    public static void main(String[] args) { 
    PipelineOptions options = PipelineOptionsFactory.create(); 
    // In order to run your pipeline, you need to make following runner specific changes: 
    // 
    // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner 
    // or FlinkRunner. 
    // CHANGE 2/3: Specify runner-required options. 
    // For BlockingDataflowRunner, set project and temp location as follows: 
    // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); 
    // dataflowOptions.setRunner(BlockingDataflowRunner.class); 
    // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); 
    // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); 
    // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions} 
    // for more details. 
    // options.as(FlinkPipelineOptions.class) 
    //  .setRunner(FlinkRunner.class); 

    Pipeline p = Pipeline.create(options); 

    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) 
    .apply(FlatMapElements 
     .into(TypeDescriptors.strings()) 
     .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+")))) 
    .apply(Filter.by((String word) -> !word.isEmpty())) 
    .apply(Count.<String>perElement()) 
    .apply(MapElements 
     .into(TypeDescriptors.strings()) 
     .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) 
    // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. 
    .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); 

    p.run().waitUntilFinish(); 
    } 
} 

This example code会给你更多的控制上写输出:

/** 
    * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data 
    * being written. This always includes the shard number and the total number of shards. For 
    * windowed writes, it also includes the window and pane index (a sequence number assigned to each 
    * trigger firing). 
    */ 
    protected static class PerWindowFiles extends FilenamePolicy { 

    private final ResourceId prefix; 

    public PerWindowFiles(ResourceId prefix) { 
     this.prefix = prefix; 
    } 

    public String filenamePrefixForWindow(IntervalWindow window) { 
     String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename(); 
     return String.format(
      "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end())); 
    } 

    @Override 
    public ResourceId windowedFilename(int shardNumber, 
             int numShards, 
             BoundedWindow window, 
             PaneInfo paneInfo, 
             OutputFileHints outputFileHints) { 
     IntervalWindow intervalWindow = (IntervalWindow) window; 
     String filename = 
      String.format(
       "%s-%s-of-%s%s", 
       filenamePrefixForWindow(intervalWindow), 
       shardNumber, 
       numShards, 
       outputFileHints.getSuggestedFilenameSuffix()); 
     return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE); 
    } 

    @Override 
    public ResourceId unwindowedFilename(
     int shardNumber, int numShards, OutputFileHints outputFileHints) { 
     throw new UnsupportedOperationException("Unsupported."); 
    } 
    } 

    @Override 
    public PDone expand(PCollection<InputT> teamAndScore) { 
    if (windowed) { 
     teamAndScore 
      .apply("ConvertToRow", ParDo.of(new BuildRowFn())) 
      .apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix)); 
    } else { 
     teamAndScore 
      .apply("ConvertToRow", ParDo.of(new BuildRowFn())) 
      .apply(TextIO.write().to(filenamePrefix)); 
    } 
    return PDone.in(teamAndScore.getPipeline()); 
    } 
+0

谢谢!看起来这不是在2.1.0版本中。而且2.2.0还没有出来:https://issues.apache.org/jira/projects/BEAM/versions/12341044 –

+0

我已经在本地环境中使用beam 2.1.0版本测试过了,它工作得很好。 –

+1

根据2.1.0文档,可以在其中找到它:https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/TextIO.html –

0

这是完全与梁2.1.0有效的例子。你可以打电话给你的数据(例如PCollection)

import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; 
import org.apache.beam.sdk.io.TextIO; 
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; 
import org.apache.beam.sdk.io.fs.ResourceId; 
import org.apache.beam.sdk.transforms.display.DisplayData; 

@SuppressWarnings("serial") 
public class FilePolicyExample { 

    public static void main(String[] args) { 
     FilenamePolicy policy = new WindowedFilenamePolicy("somePrefix"); 

     //data 
     data.apply(TextIO.write().to("your_DIRECTORY") 
      .withFilenamePolicy(policy) 
      .withWindowedWrites() 
      .withNumShards(4)); 

    } 

    private static class WindowedFilenamePolicy extends FilenamePolicy { 

     final String outputFilePrefix; 

     WindowedFilenamePolicy(String outputFilePrefix) { 
      this.outputFilePrefix = outputFilePrefix; 
     } 

     @Override 
     public ResourceId windowedFilename(
       ResourceId outputDirectory, WindowedContext input, String extension) { 
      String filename = String.format(
        "%s-%s-%s-of-%s-pane-%s%s%s", 
        outputFilePrefix, 
        input.getWindow(), 
        input.getShardNumber(), 
        input.getNumShards() - 1, 
        input.getPaneInfo().getIndex(), 
        input.getPaneInfo().isLast() ? "-final" : "", 
        extension); 
      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); 
     } 

     @Override 
     public ResourceId unwindowedFilename(
       ResourceId outputDirectory, Context input, String extension) { 
      throw new UnsupportedOperationException("Expecting windowed outputs only"); 
     } 

     @Override 
     public void populateDisplayData(DisplayData.Builder builder) { 
      builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix) 
        .withLabel("File Name Prefix")); 
     } 
    } 
}