我喜欢在spark中将数据框写入dynamodb。 因此我使用rdd.saveAsHadoopDataset(JobConf)
。但rdd类型不匹配。它需要hadoopRDD类型的rdd。因此我喜欢将数据帧转换为rdd.I使用了df.rdd
,它给出了rdd而不是hadoopRDD。我正在使用spark-scala API。如果有更好的方法将Dataframe写入Dyanmodb的火花,这将有所帮助。如何将Spark中的DataFrame转换为HadoopRDD
1
A
回答
1
你不需要转换你的RDD。
因为Hadoop API是围绕键值对构建的,所以Spark围绕RDD自动包装PairRDDFunctions(它增加了额外的功能),其中数据存储在Tuple2
对象中。所以你只需要把你的数据放入RDD[(T,V)]
,那么你将有saveAsHadoopDataset
方法可用。
下面是一个例子:
import org.apache.hadoop.mapred.JobConf
val tupleRDD : RDD[(Int, Int)] = sc.parallelize(Array((1,2), (3,4), (5,6)))
val jobConf = new JobConf()
设置任何需要的设定。
tupleRDD.saveAsHadoopDataset(jobConf)
0
如果有人正在寻找从spark-scala到dyanmodb的数据框。然后下面可能会有所帮助。
import com.amazonaws.services.dynamodbv2.document.Item
import com.amazonaws.services.dynamodbv2.document.DynamoDB
var json_arr=df.toJSON.collect() //Convert dataframe to json array
val table = dynamoDB.getTable("table_name") //dynamoDB is connection to dynamodb
for (element <- json_arr) {
val item = Item.fromJSON(element)
table.putItem(item)
}
+0
dynamoDB是如何实例化的? –
相关问题
- 1. 如何将Spark Dataframe转换为JSONObject
- 2. 如何将Cassandra ResultSet转换为Spark DataFrame?
- 3. 如何将Spark Dataframe中的列从矢量转换为集合?
- 4. 如何将Spark DataFrame转换为Java中POJO的RDD
- 5. 如何将JavaPairInputDStream转换为Spark中的DataSet/DataFrame
- 6. Spark 2.0 - 将DataFrame转换为DataSet
- 7. 将Java ResultSet转换为Spark DataFrame
- 8. 将JDBC ResultSet转换为Spark RDD/DataFrame
- 9. 将Spark Dataframe转换为Scala Map集合
- 10. 将Spark Dataframe转换为XML文件
- 11. 将case类的DStream转换为joda.DateTime转换为Spark DataFrame
- 12. 在Spark中,如何使用SparseVector将DataFrame转换为RDD [Vector]?
- 13. 如何将HH:MM:SS:Ms的Spark Dataframe列转换为秒值?
- 14. Apache Spark:如何将Spark DataFrame转换为类型为RDD [(Type1,Type2,...)]的RDD?
- 15. 如何使用Scala/spark将矩阵转换为DataFrame?
- 16. 如何将Ignite队列转换为Spark Dataframe?
- 17. 如何将随机化转换应用于Spark中的DataFrame列?
- 18. 将变化的元组的RDD转换为Spark中的DataFrame
- 19. 将列表或RDD的列表转换为Spark-Scala中的DataFrame
- 20. 如何将JSON的RDD转换为Dataframe?
- 21. 如何将Pandas DataFrame转换为TimeSeries?
- 22. 如何将Pandas DataFrame转换为列表?
- 23. Spark Scala Dataframe将Struct的Array列转换为Map列
- 24. Spark Scala - 将一个记录和一列的Dataframe转换为Double
- 25. 将包含BigInt的RDD转换为Spark Dataframe
- 26. pandas,将DataFrame转换为MultiIndex'ed DataFrame
- 27. 如何将三重DataFrame转换为不重复行的新DataFrame?
- 28. 如何将带有字符串的DataFrame转换为带有Scala(Spark 2.0)中的向量的DataFrame
- 29. 将pandas.core.groupby.SeriesGroupBy转换为dataframe
- 30. 将Pandas Dataframe转换为RCV
谢谢你的答案。你有什么Spark-Scala的例子上面。其实我是新手火花,所以它会有所帮助。提前感谢。 – Yogesh
增加了一个什么样的RDD可以工作的例子。 – jamborta