2016-05-27 30 views
-1

我正在将我的Scala代码转换为像下面这样的pyspark,但得到了最终RDD的不同计数。来自Scala Spark和pyspark工作的输出不一致

我的Scala代码:

val scalaRDD = rowRDD.map { 
    row: Row => 
    var rowList: ListBuffer[Row] = ListBuffer() 
    rowList.add(row) 
    (row.getString(1) + "_" + row.getString(6), rowList) 
}.reduceByKey{ (list1,list2) => 

    var rowList: ListBuffer[Row] = ListBuffer() 
    for (i <- 0 to list1.length -1) { 
    val row1 = list1.get(i); 

    var foundMatch = false; 

    breakable { 
     for (j <- 0 to list2.length -1) { 
     var row2 = list2.get(j); 
     val result = mergeRow(row1, row2) 
     if (result._1) { 
      list2.set(j, result._2) 
      foundMatch = true; 
      break; 
     } 
     } // for j loop 
    } // breakable for j 

    if(!foundMatch) { 
     rowList.add(row1); 
    } 
    } 

    list2.addAll(rowList); 

    list2 
}.flatMap { t=> t._2 } 

其中

def mergeRow(row1:Row, row2:Row):(Boolean, Row)= { 
    var z:Array[String] = new Array[String](row1.length) 
    var hasDiff = false 

    for (k <- 1 to row1.length -2){ 
       // k = 0 : ID, always different 
       // k = 43 : last field, which is not important 

     if (row1.getString(0) < row2.getString(0)) { 
     z(0) = row2.getString(0) 
     z(43) = row2.getString(43) 
     } else { 
     z(0) = row1.getString(0) 
     z(43) = row1.getString(43) 
     } 

     if (Option(row2.getString(k)).getOrElse("").isEmpty && !Option(row1.getString(k)).getOrElse("").isEmpty) { 
      z(k) = row1.getString(k) 
      hasDiff = true 
     } else if (!Option(row1.getString(k)).getOrElse("").isEmpty && !Option(row2.getString(k)).getOrElse("").isEmpty && row1.getString(k) != row2.getString(k)) { 
      return (false, null) 
     } else { 
      z(k) = row2.getString(k) 
     } 
    } // for k loop 

    if (hasDiff) { 
     (true, Row.fromSeq(z)) 
    } else { 
     (true, row2) 
    } 
} 

我又试图将它们转换为pyspark如下代码:

pySparkRDD = rowRDD.map (
    lambda row : singleRowList(row) 
).reduceByKey(lambda list1,list2: mergeList(list1,list2)).flatMap(lambda x : x[1]) 

在那里我有:

def mergeRow(row1, row2): 
    z=[] 
    hasDiff = False 

    #for (k <- 1 to row1.length -2){ 
    for k in xrange(1, len(row1) - 2): 
       # k = 0 : ID, always different 
       # k = 43 : last field, which is not important 

     if (row1[0] < row2[0]): 
     z[0] = row2[0] 
     z[43] = row2[43] 
     else: 
     z[0] = row1[0] 
     z[43] = row1[43] 


     if not(row2[k]) and row1[k]: 
      z[k] = row1[k].strip() 
      hasDiff = True 
     elif row1[k] and row2[k] and row1[k].strip() != row2[k].strip(): 
      return (False, None) 
     else: 
      z[k] = row2[k].strip() 



    if hasDiff: 
     return (True, Row.fromSeq(z)) 
    else: 
     return (True, row2) 

def singleRowList(row): 
    myList=[] 
    myList.append(row) 

    return (row[1] + "_" + row[6], myList) 

def mergeList(list1, list2): 
    rowList = [] 
    for i in xrange(0, len(list1)-1): 
    row1 = list1[i] 
    foundMatch = False 
    for j in xrange(0, len(list2)-1): 
     row2 = list2[j] 
     resultBool, resultRow = mergeRow(row1, row2) 
     if resultBool: 
      list2[j] = resultRow 
      foundMatch = True 
      break 

    if foundMatch == False: 
     rowList.append(row1) 

    list2.extend(rowList) 

    return list2 

顺便说一句,rowRDD从数据帧转换。即rowRDD = myDF.rdd

但是,我得到了scalaRDDpySparkRDD的不同计数。我多次查看这些代码,但无法弄清楚我错过了什么。有没有人有任何想法?谢谢!

回答

2

考虑一下:

scala> (1 to 5).length 
res1: Int = 5 

这:

>>> len(xrange(1, 5)) 
4 
+0

在Scala中,你应该使用 “直到” 重现Python的 “范围”: (1〜5) - > 1 ,2,3,4,5 (1至5) - > 1,2,3,4 – FLab