Skip to content

beanie.odm.queries.update

UpdateQuery

class UpdateQuery(UpdateMethods,  SessionMethods)

Update Query base class

Inherited from:

UpdateQuery.update

 | update(*args: Mapping[str, Any], *, session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **pymongo_kwargs, ,) -> "UpdateQuery"

Provide modifications to the update query.

Arguments:

:param *pymongo_kwargs: pymongo native parameters for update operation - args: Union[dict, Mapping] - the modifications to apply. - session: Optional[ClientSession] - bulk_writer: Optional[BulkWriter]

Returns:

UpdateMany query

UpdateQuery.upsert

 | upsert(*args: Mapping[str, Any], *, on_insert: "DocType", session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **pymongo_kwargs, ,) -> "UpdateQuery"

Provide modifications to the upsert query.

Arguments:

document in the collection :param *pymongo_kwargs: pymongo native parameters for update operation - args: Union[dict, Mapping] - the modifications to apply. - on_insert: DocType - document to insert if there is no matched - session: Optional[ClientSession]

Returns:

UpdateMany query

UpdateQuery.__await__

 | __await__() -> Generator[Any, None, Union[UpdateResult, InsertOneResult]]

Run the query

Returns:

UpdateMany

class UpdateMany(UpdateQuery)

Update Many query class

Inherited from:

UpdateMany.update_many

 | update_many(*args: Mapping[str, Any], *, session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **pymongo_kwargs, ,)

Provide modifications to the update query

Arguments:

:param *pymongo_kwargs: pymongo native parameters for update operation - args: Union[dict, Mapping] - the modifications to apply. - session: Optional[ClientSession] - bulk_writer: "BulkWriter" - Beanie bulk writer

Returns:

UpdateMany query

UpdateOne

class UpdateOne(UpdateQuery)

Update One query class

Inherited from:

UpdateOne.update_one

 | update_one(*args: Mapping[str, Any], *, session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **pymongo_kwargs, ,)

Provide modifications to the update query. The same as update()

Arguments:

:param *pymongo_kwargs: pymongo native parameters for update operation - args: Union[dict, Mapping] - the modifications to apply. - session: Optional[ClientSession] - bulk_writer: "BulkWriter" - Beanie bulk writer

Returns:

UpdateMany query

beanie.odm.queries.cursor

BaseCursorQuery

class BaseCursorQuery(Generic[CursorResultType])

BaseCursorQuery class. Wrapper over AsyncIOMotorCursor, which parse result with model

BaseCursorQuery.to_list

 | async to_list(length: Optional[int] = None) -> List[CursorResultType]

Get list of documents

Arguments:

  • length: Optional[int] - length of the list

Returns:

Union[List[BaseModel], List[Dict[str, Any]]]

beanie.odm.queries.delete

DeleteQuery

class DeleteQuery(SessionMethods)

Deletion Query

DeleteMany

class DeleteMany(DeleteQuery)

DeleteMany.__await__

 | __await__() -> Generator[DeleteResult, None, Optional[DeleteResult]]

Run the query

Returns:

DeleteOne

class DeleteOne(DeleteQuery)

DeleteOne.__await__

 | __await__() -> Generator[DeleteResult, None, Optional[DeleteResult]]

Run the query

Returns:

beanie.odm.queries.aggregation

AggregationQuery

class AggregationQuery(
    Generic[AggregationProjectionType], 
    BaseCursorQuery[AggregationProjectionType], 
    SessionMethods)

Aggregation Query

Inherited from:

beanie.odm.queries.find

FindQuery

class FindQuery(Generic[FindQueryResultType],  UpdateMethods,  SessionMethods)

Find Query base class

Inherited from:

FindQuery.get_filter_query

 | get_filter_query() -> Mapping[str, Any]

Returns: MongoDB filter query

FindQuery.update

 | update(*args: Mapping[str, Any], *, session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **pymongo_kwargs, ,)

Create Update with modifications query and provide search criteria there

Arguments:

  • args: *Mapping[str,Any] - the modifications to apply.
  • session: Optional[ClientSession]
  • bulk_writer: Optional[BulkWriter]

Returns:

UpdateMany query

FindQuery.upsert

 | upsert(*args: Mapping[str, Any], *, on_insert: "DocType", session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **pymongo_kwargs, ,)

Create Update with modifications query and provide search criteria there

Arguments:

document in the collection - args: *Mapping[str,Any] - the modifications to apply. - on_insert: DocType - document to insert if there is no matched - session: Optional[ClientSession]

Returns:

UpdateMany query

FindQuery.delete

 | delete(session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **pymongo_kwargs, ,) -> Union[DeleteOne, DeleteMany]

Provide search criteria to the Delete query

Arguments:

  • session: Optional[ClientSession]

Returns:

Union[DeleteOne, DeleteMany]

FindQuery.project

 | project(projection_model)

Apply projection parameter

Arguments:

  • projection_model: Optional[Type[BaseModel]] - projection model

Returns:

self

FindQuery.count

 | async count() -> int

Number of found documents

Returns:

int

FindQuery.exists

 | async exists() -> bool

If find query will return anything

Returns:

bool

FindMany

class FindMany(
    FindQuery[FindQueryResultType], 
    BaseCursorQuery[FindQueryResultType], 
    AggregateMethods)

Find Many query class

Inherited from:

FindMany.find_many

 | find_many(*args: Union[Mapping[str, Any], bool], *, projection_model: Optional[Type[FindQueryProjectionType]] = None, skip: Optional[int] = None, limit: Optional[int] = None, sort: Union[None, str, List[Tuple[str, SortDirection]]] = None, session: Optional[ClientSession] = None, ignore_cache: bool = False, fetch_links: bool = False, **pymongo_kwargs, ,) -> Union[
 |         "FindMany[FindQueryResultType]", "FindMany[FindQueryProjectionType]"
 |     ]

Find many documents by criteria

Arguments:

or a list of (key, direction) pairs specifying the sort order for this query. :param *pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function) - args: Mapping[str, Any] - search criteria - skip: Optional[int] - The number of documents to omit. - limit: Optional[int] - The maximum number of results to return. - sort: Union[None, str, List[Tuple[str, SortDirection]]] - A key - projection_model: Optional[Type[BaseModel]] - projection model - session: Optional[ClientSession] - pymongo session - ignore_cache: bool

Returns:

FindMany - query instance

FindMany.project

 | project(projection_model: Optional[Type[FindQueryProjectionType]]) -> Union[
 |         "FindMany[FindQueryResultType]", "FindMany[FindQueryProjectionType]"
 |     ]

Apply projection parameter

Arguments:

  • projection_model: Optional[Type[BaseModel]] - projection model

Returns:

self

FindMany.find

 | find(*args: Union[Mapping[str, Any], bool], *, projection_model: Optional[Type[FindQueryProjectionType]] = None, skip: Optional[int] = None, limit: Optional[int] = None, sort: Union[None, str, List[Tuple[str, SortDirection]]] = None, session: Optional[ClientSession] = None, ignore_cache: bool = False, fetch_links: bool = False, **pymongo_kwargs, ,) -> Union[
 |         "FindMany[FindQueryResultType]", "FindMany[FindQueryProjectionType]"
 |     ]

The same as find_many(...)

FindMany.sort

 | sort(*args: Optional[
 |             Union[
 |                 str, Tuple[str, SortDirection], List[Tuple[str, SortDirection]]
 |             ]
 |         ]) -> "FindMany[FindQueryResultType]"

Add sort parameters

Arguments:

List[Tuple[str, SortDirection]]] - A key or a tuple (key, direction) or a list of (key, direction) pairs specifying the sort order for this query. - args: Union[str, Tuple[str, SortDirection],

Returns:

self

FindMany.skip

 | skip(n: Optional[int]) -> "FindMany[FindQueryResultType]"

Set skip parameter

Arguments:

  • n: int

Returns:

self

FindMany.limit

 | limit(n: Optional[int]) -> "FindMany[FindQueryResultType]"

Set limit parameter

Arguments:

  • n: int

Returns:

FindMany.update_many

 | update_many(*args: Mapping[str, Any], *, session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **pymongo_kwargs, ,) -> UpdateMany

Provide search criteria to the UpdateMany query

Arguments:

  • args: *Mappingp[str,Any] - the modifications to apply.
  • session: Optional[ClientSession]

Returns:

UpdateMany query

FindMany.delete_many

 | delete_many(session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **pymongo_kwargs, ,) -> DeleteMany

Provide search criteria to the DeleteMany query

Arguments:

  • session:

Returns:

DeleteMany query

FindMany.aggregate

 | aggregate(aggregation_pipeline: List[Any], projection_model: Optional[Type[FindQueryProjectionType]] = None, session: Optional[ClientSession] = None, ignore_cache: bool = False, **pymongo_kwargs, ,) -> Union[
 |         AggregationQuery[Dict[str, Any]],
 |         AggregationQuery[FindQueryProjectionType],
 |     ]

Provide search criteria to the AggregationQuery

Arguments:

https://docs.mongodb.com/manual/core/aggregation-pipeline/ - aggregation_pipeline: list - aggregation pipeline. MongoDB doc: - projection_model: Type[BaseModel] - Projection Model - session: Optional[ClientSession] - PyMongo session - ignore_cache: bool

Returns:

AggregationQuery

FindMany.first_or_none

 | async first_or_none() -> Optional[FindQueryResultType]

Returns the first found element or None if no elements were found

FindOne

class FindOne(FindQuery[FindQueryResultType])

Find One query class

Inherited from:

FindOne.project

 | project(projection_model: Optional[Type[FindQueryProjectionType]] = None) -> Union[
 |         "FindOne[FindQueryResultType]", "FindOne[FindQueryProjectionType]"
 |     ]

Apply projection parameter

Arguments:

  • projection_model: Optional[Type[BaseModel]] - projection model

Returns:

self

FindOne.find_one

 | find_one(*args: Union[Mapping[str, Any], bool], *, projection_model: Optional[Type[FindQueryProjectionType]] = None, session: Optional[ClientSession] = None, ignore_cache: bool = False, fetch_links: bool = False, **pymongo_kwargs, ,) -> Union[
 |         "FindOne[FindQueryResultType]", "FindOne[FindQueryProjectionType]"
 |     ]

Find one document by criteria

Arguments:

:param *pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function) - args: Mapping[str, Any] - search criteria - projection_model: Optional[Type[BaseModel]] - projection model - session: Optional[ClientSession] - pymongo session - ignore_cache: bool

Returns:

FindOne - query instance

FindOne.update_one

 | update_one(*args: Mapping[str, Any], *, session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **pymongo_kwargs, ,) -> UpdateOne

Create UpdateOne query using modifications and provide search criteria there

Arguments:

  • args: *Mapping[str,Any] - the modifications to apply
  • session: Optional[ClientSession] - PyMongo sessions

Returns:

UpdateOne query

FindOne.delete_one

 | delete_one(session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **pymongo_kwargs, ,) -> DeleteOne

Provide search criteria to the DeleteOne query

Arguments:

  • session: Optional[ClientSession] - PyMongo sessions

Returns:

DeleteOne query

FindOne.replace_one

 | async replace_one(document: "DocType", session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None) -> Optional[UpdateResult]

Replace found document by provided

Arguments:

  • document: Document - document, which will replace the found one
  • session: Optional[ClientSession] - PyMongo session
  • bulk_writer: Optional[BulkWriter] - Beanie bulk writer

Returns:

UpdateResult

FindOne.__await__

 | __await__() -> Generator[Coroutine, Any, Optional[FindQueryResultType]]

Run the query

Returns:

BaseModel