1
我是Flink的新手,我使用DataSet API工作。经过大量处理作为最后阶段之后,我需要将其中一个值除以其最大值来标准化。所以,我用.max()
运算符来取最大值,后来我将结果作为构造函数的参数传递给MapFunction。Flink执行数据流两次
这有效,但所有的处理都执行两次。执行一个作业来查找最大值,然后执行另一个作业以创建最终结果(从头开始执行)......是否有任何解决方法只能执行一次整个数据流?
final List<Tuple6<...>> maxValues = result.max(2).collect();
assert maxValues.size() == 1;
result.map(new NormalizeAttributes(maxValues.get(0))).writeAsCsv(...)
@FunctionAnnotation.ForwardedFields("f0; f1; f3; f4; f5")
@FunctionAnnotation.ReadFields("f2")
private static class NormalizeAttributes implements MapFunction<Tuple6<...>, Tuple6<...>> {
private final Tuple6<...> maxValues;
public NormalizeAttributes(Tuple6<...> maxValues) {
this.maxValues = maxValues;
}
@Override
public Tuple6<...> map(Tuple6<...> value) throws Exception {
value.f2 /= maxValues.f2;
return value;
}
}
非常感谢! ;) – kaser