回答
假设您有一个N × M矩阵。
如果N和M都很小,以至于在存储器中可以容纳N个× M项目,那么使用RDD没什么意义。但换位很容易:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
val transposed = sc.parallelize(rdd.collect.toSeq.transpose)
如果N或M是如此之大,你不能在内存中保留N或M个条目,那么你就不能有这种规模的RDD线。在这种情况下,原始矩阵或转置矩阵不可能表示。
N和M可能是中等大小:您可以在内存中保存N或M个条目,但不能容纳N × M条目。在这种情况下,你必须炸毁矩阵,并把它再次一起:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
// Split the matrix into one number per line.
val byColumnAndRow = rdd.zipWithIndex.flatMap {
case (row, rowIndex) => row.zipWithIndex.map {
case (number, columnIndex) => columnIndex -> (rowIndex, number)
}
}
// Build up the transposed matrix. Group and sort by column index first.
val byColumn = byColumnAndRow.groupByKey.sortByKey().values
// Then sort by row index.
val transposed = byColumn.map {
indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)
}
的第一份草案,而无需使用收集(),所以一切运行者侧,并没有什么在驱动程序进行:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position
.map(v => (v._2, v._1)) // key by column position
.groupByKey.sortByKey // regroup on column position, thus all elements from the first column will be in the first row
.map(_._2) // discard the key, keep only value
该解决方案的问题在于,如果在分布式系统中执行操作,转置矩阵中的列将最终混洗。会想到一个改进版本
我的想法是,除了将'列号'附加到矩阵的每个元素,我们还附加'行号'。因此,我们可以按列位置键和按键重新分组,然后我们可以对行号中的每一行重新排序,然后从结果中去除行数/列数。 我只是没有办法在将文件导入RDD时知道行号。
您可能认为将矩阵列和行号附加到每个矩阵元素很重要,但我想这是付出代价才有可能以分布式方式处理您的输入,从而处理巨大矩阵。
当我找到排序问题的解决方案时,将更新答案。
我的遗漏部分Daniel的答案中的zipWithIndex是什么。不知道这个,所以谢谢让我学到一些东西。 没有测试他的解决方案,但确实zipWithIndex给你缺少的行号信息,因此可以用来重新排序转置的行。 – Martin 2015-04-01 14:04:57
我已经试过丹尼尔的解决方案,它是正确的。你说你错过了zipWithIndex.Thank你的答案~~ – 2015-04-07 03:38:51
伟大的解决方案@Martin。您能否告诉我如何为Java-7编写相同的代码(没有lambda表达式)。 – 2015-05-01 23:20:11
从Spark 1.6开始,您可以使用DataFrame上的pivot operation,具体取决于数据的实际形状,如果将其放入DF中,您可以将列转换为行,以下databricks blog非常有用,因为它详细描述一些代码示例的透视用例
你的第一个链接已经死了(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedData) – 2017-06-14 15:14:56
- 1. 在星火RDD
- 2. 如何测试星火RDD
- 3. 星火RDD容错
- 4. 星火RDD不Elasticsearch
- 5. 星火:保存RDD在HDFS
- 6. 星火RDD写入HBase的
- 7. 星火RDD:设置差异
- 8. 星火RDD外部存储
- 9. 星火多维RDD分区
- 10. 转换RDD在星火数据帧/斯卡拉
- 11. 转换RDD到数据集在Java中星火
- 12. 星火:一个RDD的格式转换成数据帧
- 13. 星火:RDD(按键,列表)来RDD(键,值)的扩展
- 14. 转换一个RDD成数据帧星火
- 15. 读取XML嵌套标记成星火RDD,并转化成JSON
- 16. 如何在Apache的星火
- 17. 如何星火
- 18. 星火:按多个值的RDD在一个元组/列
- 19. 获取RDD的类型在斯卡拉/星火
- 20. 如何处理在星火
- 21. 如何将矩阵转换为火花中的RDD [矢量]
- 22. 从RDD扫描星火采用指数在Hadoop数据库表
- 23. 数据分布,而在星火重新分区RDD
- 24. 删除的RDD的第一个和最后一行星火
- 25. 如何将选项[RDD]转换为RDD
- 26. 星火1.5.1,斯卡拉2.10.5:如何扩大的RDD [数组[字符串],矢量]
- 27. 保存Neo4j的数据以星火RDD(或)数据帧
- 28. 如何将火花RDD转换为mahout DRM?
- 29. 筛选火花RDD
- 30. 火花:RDD列出
实际上,我在文本文件中有一个非常大的矩阵,100000 * 100000。 – 2015-04-01 13:09:54
在文本文件中,就像我在问题中所说的一样,1 2 3 \ n 4 5 6 \ n 7 8 9这样,现在我必须调换textFile,我不认为你的方法可以工作,它可能内存不足(100000 * 100000的数组)对于内存来说太大。你有另一种方法吗? – 2015-04-01 13:12:32
你说得对,我没有考虑过这种情况。我会更新答案,希望有用的东西! – 2015-04-01 13:16:36