2016-10-10 114 views
1

我试图用PyMongo连接器保存一个Spark-DataFrame。 以下是我的代码,但每一次我运行代码我得到一个错误:使用Spark-DataFrame保存HDFS到MongoDB

java.io.IOException: No FileSystem for scheme: mongodb 

下面是我的代码:

import pymongo 
import pymongo_spark 
pymongo_spark.activate() 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
conf = SparkConf() 
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc) 
from pyspark.sql import SparkSession 
from pyspark.sql import SparkSession 
path = "hdfs://localhost:9000/home/hadoop/h_data/sales_ord_univ.csv" 
df=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path) 
collections=df.collect() 
df.write.format('mongodb://localhost:27017/test.sales_order_2').save() 

我有一个很天真的代码,因为我是个新手,这,但对此的任何帮助将不胜感激。即时通讯使用火花2.0.0,2.7.6的Python,MongoDB的:3.2.9

回答

1

I'm trying to save a Spark-DataFrame using PyMongo connector

你可以尝试使用MongoDB Connector for Spark。使用Apache Spark v2.0.x,Python的v2.7.x和MongoDB为V3.2.x您的安装环境中,你可以做一些象下面这样:

from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("Application Name").getOrCreate() 
dataframe = spark.read.csv("path/to/file.csv", header=True, mode="DROPMALFORMED") 
dataframe.write.format("com.mongodb.spark.sql.DefaultSource")\ 
       .option("spark.mongodb.output.uri", "mongodb://localhost:27017/database.collection")\ 
       .save() 

Python示例文件的完整版本可以在MongoDB PySpark Docker: examples.py找到。其中包括一个在Spark中使用MongoDB Aggregation的示例,以及Spark SQL

如果您熟悉docker,您可以使用docker-compose执行git项目MongoDB PySpark Docker并运行一些PySpark示例。

您可能会发现下面的资源非常有用:

+0

这是一个很好的解决方案。但是,我们可以在PySpark中使用此Spark连接器执行异常处理吗?因为,有可能数据框可以轻松地超过MongoDB的文档大小限制16MB –

+0

您可以随时将其放在'try/except'语句中。请注意,CSV行将是单个文档,而不是整个CSV成为单个文档。另请参阅[MongoDB文档]的定义(https://docs.mongodb.com/manual/core/document/)。如果CSV行值超过16MB,则可能需要重新考虑架构/模型。 –