Skip to content

beanie.odm.interfaces.detector

beanie.odm.interfaces.update

UpdateMethods

class UpdateMethods()

Update methods

UpdateMethods.set

 | set(expression: Dict[Union[ExpressionField, str], Any], session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **kwargs)

Set values

Example:

class Sample(Document):
    one: int

await Document.find(Sample.one == 1).set({Sample.one: 100})

Uses Set operator

Arguments:

values to set - expression: Dict[Union[ExpressionField, str], Any] - keys and - session: Optional[ClientSession] - pymongo session - bulk_writer: Optional[BulkWriter] - bulk writer

Returns:

self

UpdateMethods.current_date

 | current_date(expression: Dict[Union[ExpressionField, str], Any], session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **kwargs)

Set current date

Uses CurrentDate operator

Arguments:

  • expression: Dict[Union[ExpressionField, str], Any]
  • session: Optional[ClientSession] - pymongo session
  • bulk_writer: Optional[BulkWriter] - bulk writer

Returns:

self

UpdateMethods.inc

 | inc(expression: Dict[Union[ExpressionField, str], Any], session: Optional[ClientSession] = None, bulk_writer: Optional[BulkWriter] = None, **kwargs)

Increment

Example:

class Sample(Document):
    one: int

await Document.find(Sample.one == 1).inc({Sample.one: 100})

Uses Inc operator

Arguments:

  • expression: Dict[Union[ExpressionField, str], Any]
  • session: Optional[ClientSession] - pymongo session
  • bulk_writer: Optional[BulkWriter] - bulk writer

Returns:

self

beanie.odm.interfaces.getters

beanie.odm.interfaces.aggregation_methods

AggregateMethods

class AggregateMethods()

Aggregate methods

AggregateMethods.sum

 | async sum(field: Union[str, ExpressionField], session: Optional[ClientSession] = None, ignore_cache: bool = False) -> Optional[float]

Sum of values of the given field

Example:

class Sample(Document):
    price: int
    count: int

sum_count = await Document.find(Sample.price <= 100).sum(Sample.count)

Arguments:

  • field: Union[str, ExpressionField]
  • session: Optional[ClientSession] - pymongo session
  • ignore_cache: bool

Returns:

float - sum. None if there are no items.

AggregateMethods.avg

 | async avg(field, session: Optional[ClientSession] = None, ignore_cache: bool = False) -> Optional[float]

Average of values of the given field

Example:

class Sample(Document):
    price: int
    count: int

avg_count = await Document.find(Sample.price <= 100).avg(Sample.count)

Arguments:

  • field: Union[str, ExpressionField]
  • session: Optional[ClientSession] - pymongo session
  • ignore_cache: bool

Returns:

Optional[float] - avg. None if there are no items.

AggregateMethods.max

 | async max(field: Union[str, ExpressionField], session: Optional[ClientSession] = None, ignore_cache: bool = False) -> Optional[float]

Max of the values of the given field

Example:

class Sample(Document):
    price: int
    count: int

max_count = await Document.find(Sample.price <= 100).max(Sample.count)

Arguments:

  • field: Union[str, ExpressionField]
  • session: Optional[ClientSession] - pymongo session

Returns:

float - max. None if there are no items.

AggregateMethods.min

 | async min(field: Union[str, ExpressionField], session: Optional[ClientSession] = None, ignore_cache: bool = False) -> Optional[float]

Min of the values of the given field

Example:

class Sample(Document):
    price: int
    count: int

min_count = await Document.find(Sample.price <= 100).min(Sample.count)

Arguments:

  • field: Union[str, ExpressionField]
  • session: Optional[ClientSession] - pymongo session

Returns:

float - min. None if there are no items.

beanie.odm.interfaces.session

SessionMethods

class SessionMethods()

Session methods

SessionMethods.set_session

 | set_session(session: Optional[ClientSession] = None)

Set pymongo session

Arguments:

  • session: Optional[ClientSession] - pymongo session

Returns:

beanie.odm.interfaces.aggregate

AggregateInterface

class AggregateInterface()

AggregateInterface.aggregate

 | @classmethod
 | aggregate(cls: Type[DocType], aggregation_pipeline: list, projection_model: Optional[Type[DocumentProjectionType]] = None, session: Optional[ClientSession] = None, ignore_cache: bool = False, **pymongo_kwargs, ,) -> Union[
 |         AggregationQuery[Dict[str, Any]],
 |         AggregationQuery[DocumentProjectionType],
 |     ]

Aggregate over collection. Returns AggregationQuery query object

Arguments:

:param **pymongo_kwargs: pymongo native parameters for aggregate operation - aggregation_pipeline: list - aggregation pipeline - projection_model: Type[BaseModel] - session: Optional[ClientSession] - ignore_cache: bool

Returns:

AggregationQuery

beanie.odm.interfaces.find

FindInterface

class FindInterface()

FindInterface.find_one

 | @classmethod
 | find_one(cls: Type["DocType"], *args: Union[Mapping[str, Any], bool], *, projection_model: Optional[Type["DocumentProjectionType"]] = None, session: Optional[ClientSession] = None, ignore_cache: bool = False, fetch_links: bool = False, **pymongo_kwargs, ,) -> Union[FindOne["DocType"], FindOne["DocumentProjectionType"]]

Find one document by criteria. Returns FindOne query object. When awaited this will either return a document or None if no document exists for the search criteria.

Arguments:

:param *pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function) - args: Mapping[str, Any] - search criteria - projection_model: Optional[Type[BaseModel]] - projection model - session: Optional[ClientSession] - pymongo session instance - ignore_cache: bool

Returns:

FindOne - find query instance

FindInterface.find_many

 | @classmethod
 | find_many(cls: Type["DocType"], *args: Union[Mapping[str, Any], bool], *, projection_model: Optional[Type["DocumentProjectionType"]] = None, skip: Optional[int] = None, limit: Optional[int] = None, sort: Union[None, str, List[Tuple[str, SortDirection]]] = None, session: Optional[ClientSession] = None, ignore_cache: bool = False, fetch_links: bool = False, **pymongo_kwargs, ,) -> Union[FindMany["DocType"], FindMany["DocumentProjectionType"]]

Find many documents by criteria. Returns FindMany query object

Arguments:

:param *pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function) - args: Mapping[str, Any] - search criteria - skip: Optional[int] - The number of documents to omit. - limit: Optional[int] - The maximum number of results to return. - sort: Union[None, str, List[Tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query. - projection_model: Optional[Type[BaseModel]] - projection model - session: Optional[ClientSession] - pymongo session - ignore_cache: bool

Returns:

FindMany - query instance

FindInterface.find

 | @classmethod
 | find(cls: Type["DocType"], *args: Union[Mapping[str, Any], bool], *, projection_model: Optional[Type["DocumentProjectionType"]] = None, skip: Optional[int] = None, limit: Optional[int] = None, sort: Union[None, str, List[Tuple[str, SortDirection]]] = None, session: Optional[ClientSession] = None, ignore_cache: bool = False, fetch_links: bool = False, **pymongo_kwargs, ,) -> Union[FindMany["DocType"], FindMany["DocumentProjectionType"]]

The same as find_many

FindInterface.find_all

 | @classmethod
 | find_all(cls: Type["DocType"], skip: Optional[int] = None, limit: Optional[int] = None, sort: Union[None, str, List[Tuple[str, SortDirection]]] = None, projection_model: Optional[Type["DocumentProjectionType"]] = None, session: Optional[ClientSession] = None, ignore_cache: bool = False, **pymongo_kwargs, ,) -> Union[FindMany["DocType"], FindMany["DocumentProjectionType"]]

Get all the documents

Arguments:

:param **pymongo_kwargs: pymongo native parameters for find operation (if Document class contains links, this parameter must fit the respective parameter of the aggregate MongoDB function) - skip: Optional[int] - The number of documents to omit. - limit: Optional[int] - The maximum number of results to return. - sort: Union[None, str, List[Tuple[str, SortDirection]]] - A key or a list of (key, direction) pairs specifying the sort order for this query. - projection_model: Optional[Type[BaseModel]] - projection model - session: Optional[ClientSession] - pymongo session

Returns:

FindMany - query instance

FindInterface.all

 | @classmethod
 | all(cls: Type["DocType"], projection_model: Optional[Type["DocumentProjectionType"]] = None, skip: Optional[int] = None, limit: Optional[int] = None, sort: Union[None, str, List[Tuple[str, SortDirection]]] = None, session: Optional[ClientSession] = None, ignore_cache: bool = False, **pymongo_kwargs, ,) -> Union[FindMany["DocType"], FindMany["DocumentProjectionType"]]

the same as find_all

FindInterface.count

 | @classmethod
 | async count(cls) -> int

Number of documents in the collections The same as find_all().count()

Returns:

int