Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'pymongo' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
if k.startswith('MONGODB_'):
k = k[len('MONGODB_'):]
k = k.lower()
resolved_settings[k] = v
# Handle uri style connections
if "://" in resolved_settings.get('host', ''):
# this section pulls the database name from the URI
# PyMongo requires URI to start with mongodb:// to parse
# this workaround allows mongomock to work
uri_to_check = resolved_settings['host']
if uri_to_check.startswith('mongomock://'):
uri_to_check = uri_to_check.replace('mongomock://', 'mongodb://')
uri_dict = uri_parser.parse_uri(uri_to_check)
resolved_settings['db'] = uri_dict['database']
# Add a default name param or use the "db" key if exists
if resolved_settings.get('db'):
resolved_settings['name'] = resolved_settings.pop('db')
else:
resolved_settings['name'] = 'test'
# Add various default values.
resolved_settings['alias'] = resolved_settings.get('alias', mongoengine.DEFAULT_CONNECTION_NAME) # TODO do we have to specify it here? MongoEngine should take care of that
resolved_settings['host'] = resolved_settings.get('host', 'localhost') # TODO this is the default host in pymongo.mongo_client.MongoClient, we may not need to explicitly set a default here
resolved_settings['port'] = resolved_settings.get('port', 27017) # TODO this is the default port in pymongo.mongo_client.MongoClient, we may not need to explicitly set a default here
# Default to ReadPreference.PRIMARY if no read_preference is supplied
resolved_settings['read_preference'] = resolved_settings.get('read_preference', ReadPreference.PRIMARY)
def run(self):
current_idx = 0
total_frames = 0
env = False
new_ep = True
solutions = []
## Database connection
client = MongoClient()
db = client.retro_contest
## Put the database variable inside the class instance
self.collection = db[self.current_time]
self.fs = gridfs.GridFS(db)
current_level = self.get_level()
while True:
## Push in the database
if current_idx > PLAYOUTS:
self.add_db()
total_frames += current_idx
current_idx = 0
if total_frames > PLAYOUTS_PER_LEVEL:
for mongod in [master, slave]:
client = MongoClient(port=mongod.port, read_preference=ReadPreference.SECONDARY_PREFERRED)
mongod.dbhash = client.test.command("dbhash")
mongod.dict = mongod.dbhash["collections"]
global lost_in_slave, lost_in_master, screwy_in_slave, replicated_collections
replicated_collections += master.dict.keys()
for coll in replicated_collections:
if coll not in slave.dict and coll not in lost_in_slave:
lost_in_slave.append(coll)
mhash = master.dict[coll]
shash = slave.dict[coll]
if mhash != shash:
mTestDB = MongoClient(port=master.port).test
sTestDB = MongoClient(port=slave.port,
read_preference=ReadPreference.SECONDARY_PREFERRED).test
mCount = mTestDB[coll].count()
sCount = sTestDB[coll].count()
stats = {'hashes': {'master': mhash, 'slave': shash},
'counts':{'master': mCount, 'slave': sCount}}
try:
mDocs = list(mTestDB[coll].find().sort("_id", 1))
sDocs = list(sTestDB[coll].find().sort("_id", 1))
mDiffDocs = list()
sDiffDocs = list()
for left, right in izip(mDocs, sDocs):
if left != right:
mDiffDocs.append(left)
sDiffDocs.append(right)
def db_setup():
"""Set up a database for use by tests"""
c = Connection()
db = c["test"]
servers = db["zoo.servers"]
db.drop_collection(servers)
return servers
def test_reconnect_in_case_connection_closed_by_mongo(self):
cx = self.asyncio_client(maxPoolSize=1, retryReads=False)
yield from cx.admin.command('ping')
# close motor_socket, we imitate that connection to mongo server
# lost, as result we should have AutoReconnect instead of
# IncompleteReadError
pool = get_primary_pool(cx)
socket = pool.sockets.pop()
socket.sock.close()
pool.sockets.appendleft(socket)
with self.assertRaises(pymongo.errors.AutoReconnect):
yield from cx.motor_test.test_collection.find_one()
text_type(db_user),
text_type(db_password),
{},
'admin')
c.delegate._cache_credentials('test', credentials, connect=False)
# Cause a network error on the actual socket.
pool = get_primary_pool(c)
socket_info = one(pool.sockets)
socket_info.sock.close()
# In __check_auth, the client authenticates its socket with the
# new credential, but gets a socket.error. Should be reraised as
# AutoReconnect.
with self.assertRaises(pymongo.errors.AutoReconnect):
yield c.test.collection.find_one()
# No semaphore leak, the pool is allowed to make a new socket.
yield c.test.collection.find_one()
def test_validate_collection(self):
db = self.client.pymongo_test
self.assertRaises(TypeError, db.validate_collection, 5)
self.assertRaises(TypeError, db.validate_collection, None)
db.test.insert_one({"dummy": u"object"})
self.assertRaises(OperationFailure, db.validate_collection,
"test.doesnotexist")
self.assertRaises(OperationFailure, db.validate_collection,
db.test.doesnotexist)
self.assertTrue(db.validate_collection("test"))
self.assertTrue(db.validate_collection(db.test))
self.assertTrue(db.validate_collection(db.test, full=True))
self.assertTrue(db.validate_collection(db.test, scandata=True))
self.assertTrue(db.validate_collection(db.test, scandata=True, full=True))
self.assertTrue(db.validate_collection(db.test, True, True))
def test_fetch_next_exception(self):
coll = self.collection
cursor = coll.find()
cursor.delegate._Cursor__id = 1234 # Not valid on server.
with self.assertRaises(OperationFailure):
yield from cursor.fetch_next
# Avoid the cursor trying to close itself when it goes out of scope.
cursor.delegate._Cursor__id = None
def clear_transactions(self):
try:
commands.rollback()
except OperationFailure as error:
message = utils.get_error_message(error)
if messages.NO_TRANSACTION_ERROR not in message:
raise
def test_mongo_write_failure():
"""Testing MongoDB core handling of writing failure scenarios."""
with pytest.raises(OperationFailure):
val1 = _func_w_bad_mongo(1, 2)
val2 = _func_w_bad_mongo(1, 2)
assert val1 == val2