使用Mongo-Hadoop连接器通过Apache Spark更新MongoDb中的集合

我想通过Spark in Java更新MongoDb中的特定集合。 我正在使用MongoDB Connector for Hadoop在Java中检索Apache Spark到MongoDb的信息。

在关注Sampo Niskanen关于通过Spark检索和保存MongoDb集合的优秀post后,我对更新集合感到困惑 。

MongoOutputFormat.java包含一个构造函数,它使用String [] updateKeys,我猜这是指一个可能的键列表,用于比较现有集合并执行更新。 但是,使用Spark的saveAsNewApiHadoopFile()方法和参数MongoOutputFormat.class ,我想知道如何使用该更新构造函数。

 save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config); 

在此之前, MongoUpdateWritable.java用于执行集合更新。 从我在Hadoop上看到的例子来看,这通常是在mongo.job.output.valuemongo.job.output.value ,在Spark中可能是这样的:

 save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config); 

但是,我仍然想知道如何在MongoUpdateWritable.java指定更新密钥。

不可否认,作为一种hacky方式,我将对象的“_id”设置为我的文档的KeyValue,以便在执行保存时,集合将覆盖与_id具有相同KeyValue的文档。

 JavaPairRDD analyticsResult; //JavaPairRdd of (mongoObject,result) JavaPairRDD save = analyticsResult.mapToPair(s -> { BSONObject o = (BSONObject) s._1; //for all keys, set _id to key:value_ String id = ""; for (String key : o.keySet()){ id += key + ":" + (String) o.get(key) + "_"; } o.put("_id", id); o.put("result", s._2); return new Tuple2(null, o); }); save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config); 

我想使用MongoOutputFormatMongoUpdateWritableConfiguration通过Spark执行mongodb集合更新,理想情况下使用saveAsNewAPIHadoopFile()方法。 可能吗? 如果没有,有没有其他方法不涉及专门设置_id到我想要更新的键值?

我尝试了几个config.set("mongo.job.output.value","....")和几个组合的组合

 .saveAsNewAPIHadoopFile( "file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], mongo_config ) 

而且他们都没有工作。

我通过使用MongoUpdateWritable类作为map方法的输出使其工作:

 items.map(row => { val mongo_id = new ObjectId(row("id").toString) val query = new BasicBSONObject() query.append("_id", mongo_id) val update = new BasicBSONObject() update.append("$set", new BasicBSONObject().append("field_name", row("new_value"))) val muw = new MongoUpdateWritable(query,update,false,true) (null, muw) }) .saveAsNewAPIHadoopFile( "file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], mongo_config ) 

在mongo中执行的原始查询是这样的:

 2014-11-09T13:32:11.609-0800 [conn438] update db.users query: { _id: ObjectId('5436edd3e4b051de6a505af9') } update: { $set: { value: 10 } } nMatched:1 nModified:0 keyUpdates:0 numYields:0 locks(micros) w:24 3ms