2016-08-23 230 views
1

我是Flink的新手,我使用DataSet API工作。经过大量处理作为最后阶段之后,我需要将其中一个值除以其最大值来标准化。所以,我用.max()运算符来取最大值,后来我将结果作为构造函数的参数传递给MapFunction。Flink执行数据流两次

这有效,但所有的处理都执行两次。执行一个作业来查找最大值,然后执行另一个作业以创建最终结果(从头开始执行)......是否有任何解决方法只能执行一次整个数据流?

final List<Tuple6<...>> maxValues = result.max(2).collect(); 
    assert maxValues.size() == 1; 
    result.map(new NormalizeAttributes(maxValues.get(0))).writeAsCsv(...) 

@FunctionAnnotation.ForwardedFields("f0; f1; f3; f4; f5") 
@FunctionAnnotation.ReadFields("f2") 
private static class NormalizeAttributes implements MapFunction<Tuple6<...>, Tuple6<...>> { 

    private final Tuple6<...> maxValues; 

    public NormalizeAttributes(Tuple6<...> maxValues) { 
     this.maxValues = maxValues; 
    } 

    @Override 
    public Tuple6<...> map(Tuple6<...> value) throws Exception { 
     value.f2 /= maxValues.f2; 
     return value; 
    } 
} 

回答

0

collect()立即触发程序的执行直到由collect()请求的数据集。如果您稍后再次拨打env.execute()collect(),则该程序将再次执行。

除了执行的副作用之外,使用collect()将值分配给后续转换还具有数据传输到客户端以及稍后返回到群集中的缺点。 Flink提供所谓的广播变量,将DataSet作为侧面输入发送到另一个转换中。在你的程序

使用广播变量将如下所示:

DataSet maxValues = result.max(2); 
result 
    .map(new NormAttrs()).withBroadcastSet(maxValues, "maxValues") 
    .writeAsCsv(...); 

NormAttrs功能应该是这样的:

private static class NormAttr extends RichMapFunction<Tuple6<...>, Tuple6<...>> { 

    private Tuple6<...> maxValues; 

    @Override 
    public void open(Configuration config) { 
    maxValues = (Tuple6<...>)getRuntimeContext().getBroadcastVariable("maxValues").get(1); 
    } 

    @Override 
    public PredictedLink map(Tuple6<...> value) throws Exception { 
    value.f2 /= maxValues.f2; 
    return value; 
    } 
} 

您可以找到有关在documentation广播变量的详细信息。

+0

非常感谢! ;) – kaser