2016-07-21 86 views
0

我正在尝试使用mapGroups执行聚合,它返回SparseMatrix作为其中一列,然后对列进行求和。如何在Spark数据集中创建一个TypedColumn并对其进行操作?

我为映射行创建了一个case class模式以提供列名称。矩阵列输入org.apache.spark.mllib.linalg.Matrix。如果在执行汇总(select(sum("mycolumn"))之前未运行toDF,则会出现一个类型不匹配错误(required: org.apache.spark.sql.TypedColumn[MySchema,?])。如果我包括toDF,我会得到另一个类型不匹配错误:cannot resolve 'sum(mycolumn)' due to data type mismatch: function sum requires numeric types, not org.apache.spark.mllib.linalg.MatrixUDT。那么,正确的做法是什么?

回答

1

看起来你至少在这里遇到两个不同的问题。让我们假设你有Dataset这样的:使用o.a.s.sql.functions.col

ds.select(col("_1").as[String]) 
  • val ds = Seq(
        ("foo", Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))), 
        ("foo", Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))) 
    ).toDS 
    

    选择TypedColumn:使用隐式转换

    • $

      ds.select(col("_1").as[String]) 
      

    添加矩阵:

    • MLLib MatrixMatrixUDT不执行加法。这意味着你将无法sum功能或+
    • 减少你可以使用第三方的线性代数库,但在此不星火SQL支持/星火数据集

    如果你真的想这样做与Datsets你可以尝试做这样的事情:

    ds.groupByKey(_._1).mapGroups(
        (key, values) => { 
        val matrices = values.map(_._2.toArray) 
        val first = matrices.next 
        val sum = matrices.foldLeft(first)(
         (acc, m) => acc.zip(m).map { case (x, y) => x + y } 
        ) 
        (key, sum) 
    }) 
    

    ,并映射回矩阵但我个人只想转换为RDD并使用breeze

  • +0

    谢谢。请你可以建议一个解决方案的附加问题?这就是我现在卡住的地方。 – Emre

    +0

    矩阵密集或稀疏吗?大小是多少? – zero323

    +0

    相当小而稀疏;小到足以放在节点上。 – Emre

    相关问题