2015-04-01 67 views

回答

15

假设您有一个N × M矩阵。

如果N和M都很小,以至于在存储器中可以容纳N个× M项目,那么使用RDD没什么意义。但换位很容易:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9))) 
val transposed = sc.parallelize(rdd.collect.toSeq.transpose) 

如果N或M是如此之大,你不能在内存中保留N或M个条目,那么你就不能有这种规模的RDD线。在这种情况下,原始矩阵或转置矩阵不可能表示。

N和M可能是中等大小:您可以在内存中保存N或M个条目,但不能容纳N × M条目。在这种情况下,你必须炸毁矩阵,并把它再次一起:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9))) 
// Split the matrix into one number per line. 
val byColumnAndRow = rdd.zipWithIndex.flatMap { 
    case (row, rowIndex) => row.zipWithIndex.map { 
    case (number, columnIndex) => columnIndex -> (rowIndex, number) 
    } 
} 
// Build up the transposed matrix. Group and sort by column index first. 
val byColumn = byColumnAndRow.groupByKey.sortByKey().values 
// Then sort by row index. 
val transposed = byColumn.map { 
    indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2) 
} 
+1

实际上,我在文本文件中有一个非常大的矩阵,100000 * 100000。 – 2015-04-01 13:09:54

+1

在文本文件中,就像我在问题中所说的一样,1 2 3 \ n 4 5 6 \ n 7 8 9这样,现在我必须调换textFile,我不认为你的方法可以工作,它可能内存不足(100000 * 100000的数组)对于内存来说太大。你有另一种方法吗? – 2015-04-01 13:12:32

+0

你说得对,我没有考虑过这种情况。我会更新答案,希望有用的东西! – 2015-04-01 13:16:36

4

的第一份草案,而无需使用收集(),所以一切运行者侧,并没有什么在驱动程序进行:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9))) 

rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position 
    .map(v => (v._2, v._1)) // key by column position 
    .groupByKey.sortByKey // regroup on column position, thus all elements from the first column will be in the first row 
    .map(_._2)    // discard the key, keep only value 

该解决方案的问题在于,如果在分布式系统中执行操作,转置矩阵中的列将最终混洗。会想到一个改进版本

我的想法是,除了将'列号'附加到矩阵的每个元素,我们还附加'行号'。因此,我们可以按列位置键和按键重新分组,然后我们可以对行号中的每一行重新排序,然后从结果中去除行数/列数。 我只是没有办法在将文件导入RDD时知道行号。

您可能认为将矩阵列和行号附加到每个矩阵元素很重要,但我想这是付出代价才有可能以分布式方式处理您的输入,从而处理巨大矩阵。

当我找到排序问题的解决方案时,将更新答案。

+0

我的遗漏部分Daniel的答案中的zipWithIndex是什么。不知道这个,所以谢谢让我学到一些东西。 没有测试他的解决方案,但确实zipWithIndex给你缺少的行号信息,因此可以用来重新排序转置的行。 – Martin 2015-04-01 14:04:57

+0

我已经试过丹尼尔的解决方案,它是正确的。你说你错过了zipWithIndex.Thank你的答案~~ – 2015-04-07 03:38:51

+0

伟大的解决方案@Martin。您能否告诉我如何为Java-7编写相同的代码(没有lambda表达式)。 – 2015-05-01 23:20:11

4

从Spark 1.6开始,您可以使用DataFrame上的pivot operation,具体取决于数据的实际形状,如果将其放入DF中,您可以将列转换为行,以下databricks blog非常有用,因为它详细描述一些代码示例的透视用例

+0

你的第一个链接已经死了(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedData) – 2017-06-14 15:14:56