我试图用PyMongo连接器保存一个Spark-DataFrame。 以下是我的代码,但每一次我运行代码我得到一个错误:使用Spark-DataFrame保存HDFS到MongoDB
java.io.IOException: No FileSystem for scheme: mongodb
下面是我的代码:
import pymongo
import pymongo_spark
pymongo_spark.activate()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext=SQLContext(sc)
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
path = "hdfs://localhost:9000/home/hadoop/h_data/sales_ord_univ.csv"
df=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path)
collections=df.collect()
df.write.format('mongodb://localhost:27017/test.sales_order_2').save()
我有一个很天真的代码,因为我是个新手,这,但对此的任何帮助将不胜感激。即时通讯使用火花2.0.0,2.7.6的Python,MongoDB的:3.2.9
这是一个很好的解决方案。但是,我们可以在PySpark中使用此Spark连接器执行异常处理吗?因为,有可能数据框可以轻松地超过MongoDB的文档大小限制16MB –
您可以随时将其放在'try/except'语句中。请注意,CSV行将是单个文档,而不是整个CSV成为单个文档。另请参阅[MongoDB文档]的定义(https://docs.mongodb.com/manual/core/document/)。如果CSV行值超过16MB,则可能需要重新考虑架构/模型。 –