如何在pymongo中执行批量上插?我想更新一堆条目,并且一次只做一条,速度很慢。pymongo中的快速或批量上插
这个问题的答案几乎相同的问题就在这里:Bulk update/upsert in MongoDB?
接受的答案实际上并没有回答这个问题。它只是提供了一个链接到mongo CLI进行导入/导出。
我也将开放给别人解释为什么做一个散装UPSERT是没有可能的/没有最好的做法,但我们解释一下这种问题的首选解决方案是什么。
谢谢!
如何在pymongo中执行批量上插?我想更新一堆条目,并且一次只做一条,速度很慢。pymongo中的快速或批量上插
这个问题的答案几乎相同的问题就在这里:Bulk update/upsert in MongoDB?
接受的答案实际上并没有回答这个问题。它只是提供了一个链接到mongo CLI进行导入/导出。
我也将开放给别人解释为什么做一个散装UPSERT是没有可能的/没有最好的做法,但我们解释一下这种问题的首选解决方案是什么。
谢谢!
答案是一样的:散装upserts不支持。
,您可以更新使用多=真符合您查询指定的所有文件。
有一个关于做一个批处理命令你所希望的方式错误here。
的MongoDB 2.6+有批量操作的支持。这包括批量插入,upserts,更新等。这样做的目的是减少/消除按记录操作(按文档逐个记录)的往返等待时间的延迟是正确的。
那么,如何工作的呢?在Python的例子,因为这是我参与的工作。
>>> import pymongo
>>> pymongo.version
'2.7rc0'
要使用此功能,我们创建了一个“散”对象,文档添加到它,然后调用执行它,它会发送所有更新立刻。注意事项:所收集操作的BSON大小(bsonsizes的总和)不得超过16 MB的文档大小限制。当然,操作次数因此可能会有很大差异,您的里程可能会有所不同。
实施例的散装UPSERT操作Pymongo:
import pymongo
conn = pymongo.MongoClient('myserver', 8839)
db = conn['mydbname']
coll = db.myCollection
bulkop = coll.initialize_ordered_bulk_op()
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3})
retval = bulkop.execute()
这是必不可少的方法。更多信息请访问:
是否有任何方法可以避免在每个操作中查找?考虑我想插入一个嵌入到文档中的json对象列表,这样,我必须每次都找到这个文档并插入,这看起来效率不高。 – SpiXel 2016-07-11 10:48:48
这只是MongoDB写作的方式,而不是我的想法。在任何upsert或更新中,您必须找到要更新的文档,并且该查找指定要采取哪些操作。此外,它应该对大量行为有效,但不一定针对您的特定应用程序,因此,正如他们所说,您的里程可能会有所不同。 – 2016-07-21 16:00:08
'initialize_ordered_bulk_op()'语法现在已被弃用:http://api.mongodb.com/python/current/changelog.html?highlight=initialize_ordered_bulk_op – duhaime 2018-01-21 15:58:53
仅在pymongo 2.7+
pymongo的现代版本(比3.X以上)在降级其中一个一致的界面包裹批量操作服务器版本不支持批量操作。这在MongoDB官方支持的驱动程序中现在是一致的。
因此,编码的首选方法是使用bulk_write()
,而不是使用UpdateOne
其他适当的操作操作。现在当然,最好使用自然语言列表,而不是一个具体的建设者
旧机制的文档的直接翻译:
from pymongo import UpdateOne
operations = [
UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True),
UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True),
UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True)
]
result = collection.bulk_write(operations)
或者经典的文档转换循环:
import random
from pymongo import UpdateOne
random.seed()
operations = []
for doc in collection.find():
# Set a random number on every document update
operations.append(
UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })
)
# Send once every 1000 in batch
if (len(operations) == 1000):
collection.bulk_write(operations,ordered=False)
operations = []
if (len(operations) > 0):
collection.bulk_write(operations,ordered=False)
返回的结果为BulkWriteResult
,其中将包含匹配和更新文档的计数器以及发生的任何“upserts”的返回_id
值。
对于批量操作数组的大小存在一些误解。发送到服务器的实际请求不能超过16MB BSON限制,因为该限制也适用于发送到使用BSON格式的服务器的“请求”。
但是,这并不能控制您可以构建的请求数组的大小,因为实际操作只能以批处理方式发送和处理1000。唯一真正的限制是那1000条操作指令本身并不实际创建大于16MB的BSON文档。这确实是一个很高的命令。
批量方法的一般概念是“较少流量”,因为一次发送很多东西,只处理一个服务器响应。减少每个更新请求的开销可以节省大量时间。
不好意思,但我不明白,为什么你把你的批次分成你的循环,如果你知道你的司机自己做。我的意思是在你的例子中,有1000个'UpdateOne({“_id”:doc [“_ id”]},{“$ set”:{“random”:random.randint(0,10)}})'比16 Mb,这是显而易见的,因此pymongo可以拆分您的数据,并且请求将会成功。你为什么用手分割数据? – 2017-04-27 14:54:53
@Budulianin,因为它迭代了一个游标,它是非常多的反模式将整个集合加载到内存中。这基本上是为什么他们在合理的大小批量。此外,这需要重新考虑,特别是考虑到异步环境(基本技术保持不变),您基本上可以通过网络发送数据并在等待确认时构建另一批。如果你认为你通过“一个大批量”而不是“几个合理的”获得了某些东西,那么在大多数情况下,我会说这不太可能 – 2017-06-30 12:24:08
与Python 3.5 +最快的批量更新,电机和ASYNCIO:
import asyncio
import datetime
import logging
import random
import time
import motor.motor_asyncio
import pymongo.errors
async def execute_bulk(bulk):
try:
await bulk.execute()
except pymongo.errors.BulkWriteError as err:
logging.error(err.details)
async def main():
cnt = 0
bulk = db.initialize_unordered_bulk_op()
tasks = []
async for document in db.find({}, {}, no_cursor_timeout=True):
cnt += 1
bulk.find({'_id': document['_id']}).update({'$set': {"random": random.randint(0,10)}})
if not cnt % 1000:
task = asyncio.ensure_future(execute_bulk(bulk))
tasks.append(task)
bulk = db.initialize_unordered_bulk_op()
if cnt % 1000:
task = asyncio.ensure_future(bulk.execute(bulk))
tasks.append(task)
logging.info('%s processed', cnt)
await asyncio.gather(*tasks)
logging.basicConfig(level='INFO')
db = motor.motor_asyncio.AsyncIOMotorClient()['database']['collection']
start_time = time.time()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
execution_time = time.time() - start_time
logging.info('Execution time: %s', datetime.timedelta(seconds=execution_time))
为什么模1000?我认为mongo批量操作“自动”执行2000年的操作... – duhaime 2017-03-16 14:23:43
@duhaime很好的问题。我可以参考[文档](https://docs.mongodb.com/manual/reference/limits/#Bulk-Operation-Size),但看起来像[BSON的16MB限制](http://stackoverflow.com/ a/24238349/4249707)将会更精确。不幸的是,我不知道如何衡量请求的大小 – 2017-03-16 19:31:56
@duhaime顺便说一句,根据最新的电机[文档](http://motor.readthedocs.io/en/stable/examples/bulk.html#bulk-insert)我们可以根本不用担心 – 2017-03-16 19:38:05
如果你有很多的数据,你想,如果存在数据用“_id”的判断,
你可以试试...
import pymongo
from pymongo import UpdateOne
client = pymongo.MongoClient('localhost', 27017)
db=client['sampleDB']
collectionInfo = db.sample
#sample data
datas=[
{"_id":123456,"name":"aaa","N":1,"comment":"first sample","lat":22,"lng":33},
{"_id":234567,"name":"aaa","N":1,"comment":"second sample","lat":22,"lng":33},
{"_id":345678,"name":"aaa","N":1,"comment":"xxx sample","lat":22,"lng":33},
{"_id":456789,"name":"aaa","N":1,"comment":"yyy sample","lat":22,"lng":33},
{"_id":123456,"name":"aaaaaaaaaaaaaaaaaa","N":1,"comment":"zzz sample","lat":22,"lng":33},
{"_id":11111111,"name":"aaa","N":1,"comment":"zzz sample","lat":22,"lng":33}
]
#you should split judge item and other data
ids=[data.pop("_id") for data in datas]
operations=[UpdateOne({"_id":idn},{'$set':data},upsert=True) for idn ,data in zip(ids,datas)]
collectionInfo.bulk_write(operations)
我的英语很差,如果你不明白我说什么,我很抱歉
这是很可悲的。 – ComputationalSocialScience 2011-03-14 16:44:04
这不再是这种情况。看凯文的答案。 – 2016-04-23 05:33:37
Bulk.find.upsert()https://docs.mongodb.com/manual/reference/method/Bulk.find.upsert/index.html – Tanuj 2017-08-25 18:49:40