2016-04-25 77 views
0

我有型Seq[Seq[(Double, Double)]]的变量:如何使用期望RDD [(Double,Double)]作为Seq [Seq [(Double,Double)]]的输入的函数?

val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) 

现在我想申请功能RegressionMetrics这需要RDD[(Double, Double)]作为输入:

val metrics = new RegressionMetrics(output) 

如何变换Seq[Seq[(Double, Double)]]到RDD [(双人间, Double)]`为了能够使用类RegressionMetrics的函数?

回答

1

RDD就是Apache斯巴克的一个分布式弹性数据集

要创建一个RDD你需要的SparkContext一个实例,它可以被看作是一个“连接”或“处理”到抽象运行Apache Spark的集群

假设

  • 你有一个实例SparkContext
  • 你要正确对待你的输入作为(Double, Double)值的“扁平化”的序列,忽略的方式,这些都是目前“拆分”成在Seq[Seq[(Double, Double)]]中的子序列

您可以创建一个RDD,如下所示:

val sc: SparkContext = ??? 
val output: Seq[Seq[(Double, Double)]] = ??? 

val rdd: RDD[(Double, Double)] = sc.parallelize(output.flatten) 
+0

好的,谢谢。我正在使用Seq进行测试。那么,我是否理解正确,如果使用磁盘上存储的一些数据,我可以将它读入RDD并替换Seq? – Klue

+0

正确!对于测试 - 首先,您可以使用Spark的本地模式轻松创建独立的SparkContext;其次,实际上你可以按照这里所建议的使用'parallelize',或者直接通过'SparkContext.textFile'从文件中加载数据 –

相关问题