在分布式处理环境中,通常使用“part-000”等“part”文件名,是否可以编写某种扩展名来重命名个别输出文件名(例如per窗口文件名)Apache Beam?Apache Beam是否支持其输出的自定义文件名?
要做到这一点,人们可能必须能够为窗口指定名称或根据窗口内容推断文件名。我想知道这种方法是否可行。
至于溶液是否应该被流式传输或批次,流模式的例子是优选的
在分布式处理环境中,通常使用“part-000”等“part”文件名,是否可以编写某种扩展名来重命名个别输出文件名(例如per窗口文件名)Apache Beam?Apache Beam是否支持其输出的自定义文件名?
要做到这一点,人们可能必须能够为窗口指定名称或根据窗口内容推断文件名。我想知道这种方法是否可行。
至于溶液是否应该被流式传输或批次,流模式的例子是优选的
如果你想在文件名如何生成比默认政策允许,自定义FilenamePolicy也可以使用TextIO.Write.to(FilenamePolicy)设置更好地控制
是由JKFF建议您可以使用TextIO.write.to(FilenamePolicy)来实现此目的。
示例如下:
如果你想输出写入特定的本地文件,你可以使用:
lines.apply(TextIO.write()来(“/路径/到/ file.txt的“));
以下是使用前缀link编写输出的简单方法。这个例子是为谷歌存储,而不是你可以使用本地/ s3路径。
public class MinimalWordCountJava8 {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
// In order to run your pipeline, you need to make following runner specific changes:
//
// CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
// or FlinkRunner.
// CHANGE 2/3: Specify runner-required options.
// For BlockingDataflowRunner, set project and temp location as follows:
// DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
// dataflowOptions.setRunner(BlockingDataflowRunner.class);
// dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
// dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
// For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
// for more details.
// options.as(FlinkPipelineOptions.class)
// .setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
// CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
.apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
p.run().waitUntilFinish();
}
}
This example code会给你更多的控制上写输出:
/**
* A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
* being written. This always includes the shard number and the total number of shards. For
* windowed writes, it also includes the window and pane index (a sequence number assigned to each
* trigger firing).
*/
protected static class PerWindowFiles extends FilenamePolicy {
private final ResourceId prefix;
public PerWindowFiles(ResourceId prefix) {
this.prefix = prefix;
}
public String filenamePrefixForWindow(IntervalWindow window) {
String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
return String.format(
"%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
}
@Override
public ResourceId windowedFilename(int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
OutputFileHints outputFileHints) {
IntervalWindow intervalWindow = (IntervalWindow) window;
String filename =
String.format(
"%s-%s-of-%s%s",
filenamePrefixForWindow(intervalWindow),
shardNumber,
numShards,
outputFileHints.getSuggestedFilenameSuffix());
return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
int shardNumber, int numShards, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Unsupported.");
}
}
@Override
public PDone expand(PCollection<InputT> teamAndScore) {
if (windowed) {
teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix));
} else {
teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(TextIO.write().to(filenamePrefix));
}
return PDone.in(teamAndScore.getPipeline());
}
谢谢!看起来这不是在2.1.0版本中。而且2.2.0还没有出来:https://issues.apache.org/jira/projects/BEAM/versions/12341044 –
我已经在本地环境中使用beam 2.1.0版本测试过了,它工作得很好。 –
根据2.1.0文档,可以在其中找到它:https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/TextIO.html –
这是完全与梁2.1.0有效的例子。你可以打电话给你的数据(例如PCollection)
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.display.DisplayData;
@SuppressWarnings("serial")
public class FilePolicyExample {
public static void main(String[] args) {
FilenamePolicy policy = new WindowedFilenamePolicy("somePrefix");
//data
data.apply(TextIO.write().to("your_DIRECTORY")
.withFilenamePolicy(policy)
.withWindowedWrites()
.withNumShards(4));
}
private static class WindowedFilenamePolicy extends FilenamePolicy {
final String outputFilePrefix;
WindowedFilenamePolicy(String outputFilePrefix) {
this.outputFilePrefix = outputFilePrefix;
}
@Override
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext input, String extension) {
String filename = String.format(
"%s-%s-%s-of-%s-pane-%s%s%s",
outputFilePrefix,
input.getWindow(),
input.getShardNumber(),
input.getNumShards() - 1,
input.getPaneInfo().getIndex(),
input.getPaneInfo().isLast() ? "-final" : "",
extension);
return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context input, String extension) {
throw new UnsupportedOperationException("Expecting windowed outputs only");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix)
.withLabel("File Name Prefix"));
}
}
}
你可以给一些示例代码吗?当尝试这种方法时,我得到了一个ClassCastException ... –
请包括您的代码和你得到的错误的完整堆栈跟踪。 – jkff