Unit of Work implementation for SQLAlchemy databases.
This class manages the lifecycle of a SQLAlchemy session and provides
access to configured repositories for database interactions. It supports
transactional operations such as commit, flush, rollback, and refresh.
Source code in src/alpha/adapters/sqla_unit_of_work.py
| class SqlAlchemyUnitOfWork:
"""Unit of Work implementation for SQLAlchemy databases.
This class manages the lifecycle of a SQLAlchemy session and provides
access to configured repositories for database interactions. It supports
transactional operations such as commit, flush, rollback, and refresh.
"""
def __init__(
self,
db: SqlDatabase,
repos: list[RepositoryModel[Any]],
) -> None:
"""Initialize the Unit of Work with a database and repositories.
Parameters
----------
db
The database instance to use.
repos
The list of repository models to use.
Raises
------
TypeError
If the provided database is not a valid SqlDatabase instance.
"""
if not isinstance(db, SqlDatabase): # type: ignore
raise TypeError("No valid database provided")
self._db = db
self._repositories = repos
self._session: Session | None = None
def __enter__(self: UOW) -> UOW:
"""Initialize the Unit of Work context.
Returns
-------
UOW
The Unit of Work instance.
Raises
------
TypeError
If the provided repositories list is empty or contains invalid
models.
"""
self._session = self._db.get_session()
for repo in self._repositories:
name: str = repo.name
interface: Any = repo.interface
self.__setattr__(
name,
repo.repository(
session=self._session, default_model=repo.default_model
),
)
if interface:
if not isinstance(getattr(self, name), interface):
raise TypeError(f"Repository for {name} has no interface")
return self
def __exit__(self, *args: Any) -> None:
"""Finalize the Unit of Work context."""
if not self._session:
raise exceptions.DatabaseSessionError(
"No active database session is defined"
)
self._session.close()
self.rollback()
self._session = None # type: ignore
def commit(self) -> None:
"""Commit the current transaction."""
if not self._session:
raise exceptions.DatabaseSessionError(
"No active database session is defined"
)
self._session.commit()
def flush(self) -> None:
"""Flush the current transaction."""
if not self._session:
raise exceptions.DatabaseSessionError(
"No active database session is defined"
)
self._session.flush()
def rollback(self) -> None:
"""Rollback the current transaction."""
if not self._session:
raise exceptions.DatabaseSessionError(
"No active database session is defined"
)
self._session.rollback()
def refresh(self, obj: object) -> None:
"""Refresh the state of a given object.
Parameters
----------
obj
The object to refresh.
"""
if not self._session:
raise exceptions.DatabaseSessionError(
"No active database session is defined"
)
self._session.refresh(obj)
@property
def session(self) -> Session | None:
"""Get the current database session.
Returns
-------
Session | None
The current database session.
"""
return self._session
|
session
property
Get the current database session.
| Returns: |
-
Session | None
–
The current database session.
|
Methods:
__init__
Initialize the Unit of Work with a database and repositories.
| Raises: |
-
TypeError
–
If the provided database is not a valid SqlDatabase instance.
|
Source code in src/alpha/adapters/sqla_unit_of_work.py
| def __init__(
self,
db: SqlDatabase,
repos: list[RepositoryModel[Any]],
) -> None:
"""Initialize the Unit of Work with a database and repositories.
Parameters
----------
db
The database instance to use.
repos
The list of repository models to use.
Raises
------
TypeError
If the provided database is not a valid SqlDatabase instance.
"""
if not isinstance(db, SqlDatabase): # type: ignore
raise TypeError("No valid database provided")
self._db = db
self._repositories = repos
self._session: Session | None = None
|
__enter__
Initialize the Unit of Work context.
| Returns: |
-
UOW
–
The Unit of Work instance.
|
| Raises: |
-
TypeError
–
If the provided repositories list is empty or contains invalid
models.
|
Source code in src/alpha/adapters/sqla_unit_of_work.py
| def __enter__(self: UOW) -> UOW:
"""Initialize the Unit of Work context.
Returns
-------
UOW
The Unit of Work instance.
Raises
------
TypeError
If the provided repositories list is empty or contains invalid
models.
"""
self._session = self._db.get_session()
for repo in self._repositories:
name: str = repo.name
interface: Any = repo.interface
self.__setattr__(
name,
repo.repository(
session=self._session, default_model=repo.default_model
),
)
if interface:
if not isinstance(getattr(self, name), interface):
raise TypeError(f"Repository for {name} has no interface")
return self
|
__exit__
Finalize the Unit of Work context.
Source code in src/alpha/adapters/sqla_unit_of_work.py
| def __exit__(self, *args: Any) -> None:
"""Finalize the Unit of Work context."""
if not self._session:
raise exceptions.DatabaseSessionError(
"No active database session is defined"
)
self._session.close()
self.rollback()
self._session = None # type: ignore
|
commit
Commit the current transaction.
Source code in src/alpha/adapters/sqla_unit_of_work.py
| def commit(self) -> None:
"""Commit the current transaction."""
if not self._session:
raise exceptions.DatabaseSessionError(
"No active database session is defined"
)
self._session.commit()
|
flush
Flush the current transaction.
Source code in src/alpha/adapters/sqla_unit_of_work.py
| def flush(self) -> None:
"""Flush the current transaction."""
if not self._session:
raise exceptions.DatabaseSessionError(
"No active database session is defined"
)
self._session.flush()
|
rollback
Rollback the current transaction.
Source code in src/alpha/adapters/sqla_unit_of_work.py
| def rollback(self) -> None:
"""Rollback the current transaction."""
if not self._session:
raise exceptions.DatabaseSessionError(
"No active database session is defined"
)
self._session.rollback()
|
refresh
Refresh the state of a given object.
Source code in src/alpha/adapters/sqla_unit_of_work.py
| def refresh(self, obj: object) -> None:
"""Refresh the state of a given object.
Parameters
----------
obj
The object to refresh.
"""
if not self._session:
raise exceptions.DatabaseSessionError(
"No active database session is defined"
)
self._session.refresh(obj)
|