2016-12-29 85 views
-2

我想用spark创建数据报告。 我想要做的概念如下。Spark,Scala - 从rdd映射输出

case class output(txt: String) 
outputList: List[output] = .. 
myrdd 
    .filter(..) 
    .map( 
     some processing 
     outputList ::= output(..) 
    ) 

// this is why I cannot just union rdd with rdd 
anotherRdd.map(
    ...some processing... 
    val rdd = ..make rdd from rdd.. 
    rddinrdd.map( 
     ...some processing... 
     outputList ::= output(..) 
    ) 
) 

// save it as text 
..save outputList somehow.. 

我知道它不会因为outputList工作将被存储的所有输出之前保存的,有没有办法做到这一点?

+0

很难理解的结果。你可以请教一下这段文字。你也可以举一个输入的例子和你想要达到的预期输出吗? – marios

+0

为什么在'anotherRdd'中''处理''处理后''两个RDD' –

回答

1

你正在试图做未在星火支持两件事情:

  1. 变异驾驶员侧的对象(outputList)的RDD改造内部
  2. 工作与RDD改造内部RDDS( rddinrdd不支持 - RDDS在驾驶员侧的存在,他们的转化都是在做执行者)

很难不建议更具体的要求的替代品,但一般来说,你应该将RDD转换为RDD [输出]:这就是Spark的使用方式 - 不要尝试构造outputList,尝试通过转换构建outputRDD

  • 对于第一RDD,看起来简单 - 而不是增加output(..)到列表中,只是使该函数的返回值 - 那么你map操作的结果将是一个RDD[output]

    val outRdd1: RDD[output] = myrdd 
    .filter(..) 
    .map( 
        some processing 
        output(..) 
    ) 
    
  • 对于第二个rdd,您可能需要加入某些键上的两个RDD,假设“rdd中的..make rdd”使用anotherRdd中的当前记录,因此一般情况下它看起来像这样:

    val outRdd2: RDD[output] = anotherRdd 
        .keyBy(..extract join key..) 
        .join(myrdd.keyBy(..extract join key..)) 
        .map(
        ...some processing... 
        output(..) 
    ) 
    
  • 最后,您可以联合所产生的RDDS和保存使用saveAsTextFile