2017-04-06 77 views
1

我喜欢在spark中将数据框写入dynamodb。 因此我使用rdd.saveAsHadoopDataset(JobConf)。但rdd类型不匹配。它需要hadoopRDD类型的rdd。因此我喜欢将数据帧转换为rdd.I使用了df.rdd,它给出了rdd而不是hadoopRDD。我正在使用spark-scala API。如果有更好的方法将Dataframe写入Dyanmodb的火花,这将有所帮助。如何将Spark中的DataFrame转换为HadoopRDD

回答

1

你不需要转换你的RDD。

因为Hadoop API是围绕键值对构建的,所以Spark围绕RDD自动包装PairRDDFunctions(它增加了额外的功能),其中数据存储在Tuple2对象中。所以你只需要把你的数据放入RDD[(T,V)],那么你将有saveAsHadoopDataset方法可用。

下面是一个例子:

import org.apache.hadoop.mapred.JobConf 
val tupleRDD : RDD[(Int, Int)] = sc.parallelize(Array((1,2), (3,4), (5,6))) 
val jobConf = new JobConf() 

设置任何需要的设定。

tupleRDD.saveAsHadoopDataset(jobConf) 
+0

谢谢你的答案。你有什么Spark-Scala的例子上面。其实我是新手火花,所以它会有所帮助。提前感谢。 – Yogesh

+0

增加了一个什么样的RDD可以工作的例子。 – jamborta

0

如果有人正在寻找从spark-scala到dyanmodb的数据框。然后下面可能会有所帮助。

import com.amazonaws.services.dynamodbv2.document.Item 
import com.amazonaws.services.dynamodbv2.document.DynamoDB 

var json_arr=df.toJSON.collect() //Convert dataframe to json array 
val table = dynamoDB.getTable("table_name") //dynamoDB is connection to dynamodb 
for (element <- json_arr) { 
     val item = Item.fromJSON(element) 
     table.putItem(item) 
    } 
+0

dynamoDB是如何实例化的? –

相关问题