MongoDB Adapters¶
This class provides a MongoDB adapter for data operations. It allows for seamless integration with MongoDB databases, enabling efficient data retrieval and manipulation in the most efficient and effective way possible. Here you can find the implementation both for synchronous and asynchronous operations.
Synchronous MongoDB Adapter¶
This class provides a MongoDB adapter for data operations in a synchronous manner.
Implementation¶
- class SyncMongoDB¶
Bases:
object- __init__(URI=None)¶
Initialize the MongoDB connection.
- Parameters:
URI (str) – Connection URI for the MongoDB database. If None it will default read from environment variable MONGODB_URI.
Example
Get an instance of the MongoDB class:
>>> mongo = MongoDB(DB="mydatabase", URI="mongodb://user:pass@host:port/")
- close()¶
Close the MongoDB connection and free up resources.
Example
Close the MongoDB connection:
>>> mongo.close()
- property db_ip¶
Returns the IP address of the MongoDB server we are connected to.
- Returns:
str – The IP address of the MongoDB server we are connected to.
- delete(collection_name, query={}, emptyDelete=False)¶
Delete documents from a MongoDB collection.
- Parameters:
collection_name (str) – Collection name.
query (dict, optional) – Query filter. If empty, emptyDelete must be True.
emptyDelete (bool, optional) – Allow deletion of all documents if True.
- Returns:
DeleteResult – Result of the delete operation.
- Raises:
ValueError – If query is not a dict or empty without emptyDelete.
RuntimeError – If there is an error during deletion.
Example
Delete documents matching a query:
>>> result = mongo.delete(collection="mycollection", query={"field": "value"})
Delete all documents in a collection (use with caution):
>>> result = mongo.delete(collection="mycollection", emptyDelete=True)
- get(collection_name, query={}, projection=None, limit=None, sort=None)¶
Retrieve documents from a MongoDB collection as a list of dictionaries.
- Parameters:
collection_name (str) – Collection name.
query (dict, optional) – Query filter. Defaults to all documents.
projection (dict, optional) – Fields to include/exclude.
limit (int, optional) – Max number of documents to retrieve.
sort (list[tuple[str, int]], optional) – Sorting criteria as a list of (field, direction) tuples.
- Returns:
list[dict] – Retrieved documents.
- Raises:
RuntimeError – If there is an error during retrieval.
Example
Retrieve documents from a collection with a specific query and limit:
>>> docs = mongo.get(collection_name="mycollection", query={"field": "value"}, limit=10)
Retrieve all documents from a collection:
>>> docs = mongo.get(collection_name="mycollection")
Retrieve documents with sorting:
>>> docs = mongo.get(collection_name="mycollection", sort=[("field", 1)])
- get_iter(collection_name, query={}, projection=None, limit=None, sort=None, batch_size=100)¶
Retrieve documents from a MongoDB collection as a generator of dictionaries in batches.
- Parameters:
collection_name (str) – Collection name.
query (dict, optional) – Query filter.
projection (dict, optional) – Fields to include/exclude.
limit (int, optional) – Max number of documents to retrieve.
sort (list[tuple[str, int]], optional) – Sorting criteria as a list of (field, direction) tuples.
batch_size (int, optional) – Number of documents per batch.
- Returns:
Generator[list[dict], None, None] – Iterator yielding batches of documents as lists of dictionaries.
- Raises:
RuntimeError – If there is an error during retrieval.
Example
Retrieve documents in batches of size 50:
>>> for batch in mongo.get_iter(collection_name="mycollection", batch_size=50): ... process(batch) # Process each batch of documents
Retrieve all documents in batches of size 100 and with a limit of 200 documents:
>>> for batch in mongo.get_iter(collection_name="mycollection", batch_size=100, limit=200): ... process(batch) # Process each batch of documents
- insert(collection_name, documents, upsert_fields=None)¶
Insert documents into a MongoDB collection.
- Parameters:
collection_name (str) – Collection name.
documents (dict, DataFrame, or list of dict) – Documents to insert.
upsert_fields (list[str], optional) – Fields for upsert (bulk upsert if provided).
- Returns:
InsertManyResult or BulkWriteResult – Result of the operation.
- Raises:
ValueError – If documents are not in the correct format or empty.
RuntimeError – If there is an error during insertion.
Example
Insert multiple documents into a collection:
>>> result = mongo.insert(collection_name="mycollection", documents=[{"field": "value1"}, {"field": "value2"}])
Upsert documents based on specific fields:
>>> result = mongo.insert(collection_name="mycollection", documents=[{"_id": 1, "field": "value1"}, {"_id": 2, "field": "value2"}], upsert_fields=["_id"])
- select_db(db_name)¶
Select a different database within the same MongoDB server.
- Parameters:
db_name (str) – Name of the database to select.
- update(collection_name, query, update)¶
Update documents in a MongoDB collection.
- Parameters:
collection_name (str) – Collection name.
query (dict) – Query filter.
update (dict) – Update operation.
- Returns:
UpdateResult – Result of the update operation.
- Raises:
ValueError – If update is not a non-empty dict.
RuntimeError – If there is an error during the update.
Example
Update documents matching a query:
>>> result = mongo.update(collection="mycollection", query={"field": "value"}, update={"field": "new_value"})
Update documents using MongoDB update operators:
>>> result = mongo.update(collection="mycollection", query={"field": "value"}, update={"$inc": {"counter": 1}})
Example Usage¶
Asynchronous MongoDB Adapter¶
This class provides a MongoDB adapter for data operations in an asynchronous manner.
Implementation¶
- class AsyncMongoDB¶
Bases:
object- __init__(URI=None)¶
Initialize the MongoDB connection. :Parameters: URI (str) – Connection URI for the MongoDB database. If None it will default read from environment variable MONGODB_URI.
- async close()¶
Close the MongoDB connection and free up resources.
- property db_ip¶
Returns the IP address of the MongoDB server we are connected to.
- async delete(collection_name, query={}, emptyDelete=False)¶
Delete documents from a MongoDB collection. Returns a dict with the result summary.
- Parameters:
collection_name (str) – Name of the MongoDB collection.
query (dict) – The selection criteria for the delete operation.
emptyDelete (bool) – If True, allows deletion of all documents when query is empty.
- Returns:
dict –
- A dictionary containing the result summary with key:
”deleted_count”: Number of documents deleted.
- Raises:
ValueError – If the query is not a dictionary or if it’s empty and emptyDelete is False.
RuntimeError – If the delete operation fails.
- async get(collection_name, query={}, projection=None, limit=None, sort=None)¶
Retrieve documents from a MongoDB collection as a list of dictionaries.
- Parameters:
collection_name (str) – Name of the MongoDB collection.
query (dict) – The selection criteria for the query.
projection (Optional[dict]) – Fields to include or exclude in the returned documents.
limit (Optional[int]) – Maximum number of documents to retrieve.
sort (Optional[list[tuple[str, int]]]) – List of tuples specifying the sort order
- Returns:
list[dict] – A list of documents matching the query.
- Raises:
RuntimeError – If the retrieval operation fails.
- async get_iter(collection_name, query={}, projection=None, limit=None, sort=None, batch_size=100)¶
Retrieve documents from a MongoDB collection as a generator of dictionaries in batches.
- Parameters:
collection_name (str) – Name of the MongoDB collection.
query (dict) – The selection criteria for the query.
projection (Optional[dict]) – Fields to include or exclude in the returned documents.
limit (Optional[int]) – Maximum number of documents to retrieve.
sort (Optional[list[tuple[str, int]]]) – List of tuples specifying the sort order
batch_size (int) – Number of documents to include in each batch.
- Yields:
AsyncGenerator[list[dict], None] – An asynchronous generator yielding lists of documents in batches.
- Raises:
RuntimeError – If the retrieval operation fails.
- async insert(collection_name, documents, upsert_fields=None)¶
Insert documents into a MongoDB collection. Supports upsert operations.
- Parameters:
collection_name (str) – Name of the MongoDB collection.
documents (Union[dict, pd.DataFrame, list[dict]]) – The document(s) to insert.
upsert_fields (Optional[list[str]]) – List of fields to use for upsert matching. If provided, documents will be upserted based on these fields.
- Returns:
Union[dict, None] –
- If upsert_fields is provided, returns a dict with the result summary:
”matched_count”: Number of documents matched by the upsert.
”modified_count”: Number of documents modified.
”upserted_count”: Number of documents upserted.
”upserted_ids”: IDs of the upserted documents.
- If upsert_fields is not provided, returns a dict with:
”inserted_ids”: List of IDs of the inserted documents.
- async select_db(db_name)¶
Select a different database within the same MongoDB server.
- async update(collection_name, query, update)¶
Update documents in a MongoDB collection and return the result summary.
- Parameters:
collection_name (str) – Name of the MongoDB collection.
query (dict) – The selection criteria for the update operation.
update (dict) – The update operations to be applied. If no MongoDB operator is specified, “$set” will be used by default.
- Returns:
dict –
- A dictionary containing the result summary with keys:
”matched_count”: Number of documents matched by the query.
”modified_count”: Number of documents modified.
”upserted_id”: The ID of the upserted document, if any.
- Raises:
ValueError – If the update operation is not a non-empty dictionary.
RuntimeError – If the update operation fails.