SqlAlchemyUnitOfWork

Unit of Work implementation for SQLAlchemy databases.

This class manages the lifecycle of a SQLAlchemy session and provides access to configured repositories for database interactions. It supports transactional operations such as commit, flush, rollback, and refresh.

Source code in src/alpha/adapters/sqla_unit_of_work.py
class SqlAlchemyUnitOfWork:
    """Unit of Work implementation for SQLAlchemy databases.

    This class manages the lifecycle of a SQLAlchemy session and provides
    access to configured repositories for database interactions. It supports
    transactional operations such as commit, flush, rollback, and refresh.
    """

    def __init__(
        self,
        db: SqlDatabase,
        repos: list[RepositoryModel[Any]],
    ) -> None:
        """Initialize the Unit of Work with a database and repositories.

        Parameters
        ----------
        db
            The database instance to use.
        repos
            The list of repository models to use.

        Raises
        ------
        TypeError
            If the provided database is not a valid SqlDatabase instance.
        """
        if not isinstance(db, SqlDatabase):  # type: ignore
            raise TypeError("No valid database provided")

        self._db = db
        self._repositories = repos
        self._session: Session | None = None

    def __enter__(self: UOW) -> UOW:
        """Initialize the Unit of Work context.

        Returns
        -------
        UOW
            The Unit of Work instance.

        Raises
        ------
        TypeError
            If the provided repositories list is empty or contains invalid
            models.
        """
        self._session = self._db.get_session()

        for repo in self._repositories:
            name: str = repo.name
            interface: Any = repo.interface

            self.__setattr__(
                name,
                repo.repository(
                    session=self._session, default_model=repo.default_model
                ),
            )

            if interface:
                if not isinstance(getattr(self, name), interface):
                    raise TypeError(f"Repository for {name} has no interface")

        return self

    def __exit__(self, *args: Any) -> None:
        """Finalize the Unit of Work context."""
        if not self._session:
            raise exceptions.DatabaseSessionError(
                "No active database session is defined"
            )
        self._session.close()
        self.rollback()
        self._session = None  # type: ignore

    def commit(self) -> None:
        """Commit the current transaction."""
        if not self._session:
            raise exceptions.DatabaseSessionError(
                "No active database session is defined"
            )
        self._session.commit()

    def flush(self) -> None:
        """Flush the current transaction."""
        if not self._session:
            raise exceptions.DatabaseSessionError(
                "No active database session is defined"
            )
        self._session.flush()

    def rollback(self) -> None:
        """Rollback the current transaction."""
        if not self._session:
            raise exceptions.DatabaseSessionError(
                "No active database session is defined"
            )
        self._session.rollback()

    def refresh(self, obj: object) -> None:
        """Refresh the state of a given object.

        Parameters
        ----------
        obj
            The object to refresh.
        """
        if not self._session:
            raise exceptions.DatabaseSessionError(
                "No active database session is defined"
            )
        self._session.refresh(obj)

    @property
    def session(self) -> Session | None:
        """Get the current database session.

        Returns
        -------
        Session | None
            The current database session.
        """
        return self._session

session property

session

Get the current database session.

Returns:
  • Session | None

    The current database session.

Methods:

__init__

__init__(db, repos)

Initialize the Unit of Work with a database and repositories.

Parameters:
Raises:
  • TypeError

    If the provided database is not a valid SqlDatabase instance.

Source code in src/alpha/adapters/sqla_unit_of_work.py
def __init__(
    self,
    db: SqlDatabase,
    repos: list[RepositoryModel[Any]],
) -> None:
    """Initialize the Unit of Work with a database and repositories.

    Parameters
    ----------
    db
        The database instance to use.
    repos
        The list of repository models to use.

    Raises
    ------
    TypeError
        If the provided database is not a valid SqlDatabase instance.
    """
    if not isinstance(db, SqlDatabase):  # type: ignore
        raise TypeError("No valid database provided")

    self._db = db
    self._repositories = repos
    self._session: Session | None = None

__enter__

__enter__()

Initialize the Unit of Work context.

Returns:
  • UOW

    The Unit of Work instance.

Raises:
  • TypeError

    If the provided repositories list is empty or contains invalid models.

Source code in src/alpha/adapters/sqla_unit_of_work.py
def __enter__(self: UOW) -> UOW:
    """Initialize the Unit of Work context.

    Returns
    -------
    UOW
        The Unit of Work instance.

    Raises
    ------
    TypeError
        If the provided repositories list is empty or contains invalid
        models.
    """
    self._session = self._db.get_session()

    for repo in self._repositories:
        name: str = repo.name
        interface: Any = repo.interface

        self.__setattr__(
            name,
            repo.repository(
                session=self._session, default_model=repo.default_model
            ),
        )

        if interface:
            if not isinstance(getattr(self, name), interface):
                raise TypeError(f"Repository for {name} has no interface")

    return self

__exit__

__exit__(*args)

Finalize the Unit of Work context.

Source code in src/alpha/adapters/sqla_unit_of_work.py
def __exit__(self, *args: Any) -> None:
    """Finalize the Unit of Work context."""
    if not self._session:
        raise exceptions.DatabaseSessionError(
            "No active database session is defined"
        )
    self._session.close()
    self.rollback()
    self._session = None  # type: ignore

commit

commit()

Commit the current transaction.

Source code in src/alpha/adapters/sqla_unit_of_work.py
def commit(self) -> None:
    """Commit the current transaction."""
    if not self._session:
        raise exceptions.DatabaseSessionError(
            "No active database session is defined"
        )
    self._session.commit()

flush

flush()

Flush the current transaction.

Source code in src/alpha/adapters/sqla_unit_of_work.py
def flush(self) -> None:
    """Flush the current transaction."""
    if not self._session:
        raise exceptions.DatabaseSessionError(
            "No active database session is defined"
        )
    self._session.flush()

rollback

rollback()

Rollback the current transaction.

Source code in src/alpha/adapters/sqla_unit_of_work.py
def rollback(self) -> None:
    """Rollback the current transaction."""
    if not self._session:
        raise exceptions.DatabaseSessionError(
            "No active database session is defined"
        )
    self._session.rollback()

refresh

refresh(obj)

Refresh the state of a given object.

Parameters:
  • obj (object) –

    The object to refresh.

Source code in src/alpha/adapters/sqla_unit_of_work.py
def refresh(self, obj: object) -> None:
    """Refresh the state of a given object.

    Parameters
    ----------
    obj
        The object to refresh.
    """
    if not self._session:
        raise exceptions.DatabaseSessionError(
            "No active database session is defined"
        )
    self._session.refresh(obj)