2017-03-07 190 views
3

我有写RDD数据MongoDB的火花应用程序,我得到一个MongoBulkWriteException。以前,我在使用MongoDB标准驱动程序中的bulkWrite()方法,但我已经开始使用MongoSpark驱动程序中的write()方法。MongoSpark节省重复键错误E11000

别的之前,我使用的Apache 星火1.6.0MongoDB的3.2.11

此异常跟踪:

com.mongodb.MongoBulkWriteException: Bulk write operation error on server 
10.1.101.146:27017. Write errors: [BulkWriteError{index=0, code=11000, 
message='E11000 duplicate key error collection: collection-test 
index: _id_ dup key: { : "636253651-2017-03-07" }', details={ }}] 

产生它的代码是:

JavaRDD<Document> rddInsertRecords = rddGrouped.map(new Function<Tuple2<String, BasicRecord>, Document>() { 
private static final long serialVersionUID = 1L; 
    @Override 
    public Document call(Tuple2<String, BasicRecord> tuple2) throws Exception { 
      Document json = tuple2._2.toBSONDocument(); 
      return json; 
     } 
}); 
MongoSpark.save(rddInsertRecords, WriteConfig.create(sc.getConf())); 

我用我的旧代码的替代解决方案,但我想用MongoSpark写。

我在MongoDB的JIRA(https://jira.mongodb.org/browse/SERVER-14322)中看到过这个问题,但我不确定我怎么能绕过这个问题。

UPDATE:我忘了提及第一次没有发生故障(即没有mongodb上的数据,集合为空)。第二次运行作业时失败。从技术上讲,司机应该做一个补充,我是对的吗?

回答

2

Spark连接器不知道如何插入RDD<T>其中T可以是任何类型 - 它如何获得id值?

然而,数据集/ DataFrames具有与它们指示哪个字段是_id字段并能自动被用于upserts模式信息。这在SPARK-66完成。 Datasets/DataFrames的另一个好处是它们更高效,并且可以提高Spark作业的性能。

如果你必须使用RDD的,那么你可以通过编程访问MongoDB的收集,并通过MongoConnector类创建一个更新插入操作。

+0

罗杰那。我不能将RDD切换到数据集,所以我想我会使用MongoConnector方法。感谢您的澄清。 – cabreracanal