DatabaseProvider

Bases: JWTProviderMixin

Database Identity Provider implementation.

Source code in src/alpha/providers/database_provider.py
class DatabaseProvider(JWTProviderMixin):
    """Database Identity Provider implementation."""

    protocol = "database"
    token_factory: TokenFactory | None = None

    def __init__(
        self,
        uow: UnitOfWork,
        token_factory: TokenFactory | None = None,
        password_factory: PasswordFactory | None = None,
        user_name_attribute: str = "username",
        users_repository_name: str = "users",
    ) -> None:
        """Database Identity Provider implementation for user authentication
        and management. This provider uses a database to store user information
        and credentials, and provides methods for authenticating users,
        retrieving user information, and changing passwords.

        Parameters
        ----------
        uow
            Unit of work instance to manage database transactions
        token_factory
            Token factory instance to generate and validate tokens
        password_factory
            Password factory instance to handle password hashing and
            verification, by default None. If None, a default PasswordFactory
            will be used.
        user_name_attribute
            Attribute name to identify the user, by default "username"
        users_repository_name
            Repository name for user entities, by default "users"
        """
        self.uow = uow
        self.token_factory = token_factory
        self._password_factory = password_factory or PasswordFactory()
        self._user_name_attribute = user_name_attribute
        self._users_repository_name = users_repository_name

    def authenticate(self, credentials: PasswordCredentials) -> Identity:
        """Authenticate a user using their credentials.

        Parameters
        ----------
        credentials
            Password credentials for the user

        Returns
        -------
            Identity instance representing the authenticated user
        """
        with self.uow:
            users: SqlRepository[User] = getattr(
                self.uow, self._users_repository_name
            )
            user = self._verify_password(
                credentials=credentials, user_repository=users
            )

            return Identity.from_user(user)

    def get_user(self, subject: str) -> Identity:
        """Retrieve a user by their subject identifier.

        Parameters
        ----------
        subject
            The subject identifier of the user

        Returns
        -------
            Identity instance representing the user
        """
        with self.uow:
            users: SqlRepository[User] = getattr(
                self.uow, self._users_repository_name
            )
            user = self._get_user(
                username=subject, user_repository=users, attribute_name="id"
            )

            return Identity.from_user(user)

    def change_password(
        self, credentials: PasswordCredentials, new_password: str
    ) -> None:
        """Change the password for a user.

        Parameters
        ----------
        credentials
            Password credentials for the user
        new_password
            The new password to set for the user
        """
        with self.uow:
            users: SqlRepository[User] = getattr(
                self.uow, self._users_repository_name
            )
            user = self._verify_password(
                credentials=credentials, user_repository=users
            )

            self._update_user_password(user, new_password)
            self.uow.commit()

    def _get_user(
        self,
        username: str,
        user_repository: SqlRepository[User],
        attribute_name: str | None = None,
    ) -> User:
        """Retrieve a user by their username.

        Parameters
        ----------
        username
            The username of the user
        user_repository
            The repository to query for the user
        attribute_name
            The attribute name to use for querying the user, by default None.
            If None, the provider's configured user_name_attribute will be
            used.

        Returns
        -------
            User instance representing the retrieved user

        Raises
        ------
        exceptions.UserNotFoundException
            If the user does not exist
        """
        user = user_repository.get_one_or_none(
            attr=attribute_name or self._user_name_attribute,
            value=username,
        )

        if not user:
            msg = (
                f"User with '{attribute_name or self._user_name_attribute}'="
                f"'{username}' does not exist"
            )
            logging.debug(msg)
            # Disabled lines below for future implementation of logging and
            # unit of work commit
            # self.logger(msg=msg, level=LogLevel.DEBUG)
            # self.uow.commit()
            raise exceptions.UserNotFoundException(msg)

        return user

    def _verify_password(
        self,
        credentials: PasswordCredentials,
        user_repository: SqlRepository[User],
    ) -> User:
        """Verify the password for a user.

        Parameters
        ----------
        credentials
            Password credentials for the user
        user_repository
            The repository to query for the user

        Returns
        -------
            User instance representing the authenticated user

        Raises
        ------
        exceptions.InvalidCredentialsException
            If the provided credentials are invalid
        exceptions.MissingPasswordException
            If the user does not have a password set
        """

        user = self._get_user(credentials.username, user_repository)

        try:
            if not self._password_factory.verify_password(
                credentials.password, user.password
            ):
                msg = (
                    f"The provided password for user "
                    f"'{getattr(user, self._user_name_attribute)}' is "
                    "incorrect"
                )
                logging.debug(msg)
                # Disabled lines below for future implementation of logging and
                # unit of work commit
                # self.logger(msg=msg, level=LogLevel.DEBUG)
                # self.uow.commit()
                raise exceptions.InvalidCredentialsException(msg)
        except exceptions.MissingPasswordException as exc:
            msg = (
                f"No password value to compare for "
                f"'{getattr(user, self._user_name_attribute)}'"
            )
            logging.error(msg)
            # Disabled lines below for future implementation of logging and
            # unit of work commit
            # self.logger(msg=msg, level=LogLevel.ERROR)
            # self.uow.commit()
            raise exceptions.MissingPasswordException(msg) from exc

        return user

    def _update_user_password(
        self,
        user: User,
        new_password: str,
    ) -> None:
        """Change the password for a user.

        Parameters
        ----------
        user
            User instance representing the user to update the password for
        new_password
            The new password to set for the user
        user_repository
            The repository to query for the user
        """
        user.password = self._password_factory.hash_password(new_password)

Methods:

__init__

__init__(uow, token_factory=None, password_factory=None, user_name_attribute='username', users_repository_name='users')

Database Identity Provider implementation for user authentication and management. This provider uses a database to store user information and credentials, and provides methods for authenticating users, retrieving user information, and changing passwords.

Parameters:
  • uow (UnitOfWork) –

    Unit of work instance to manage database transactions

  • token_factory (TokenFactory | None, default: None ) –

    Token factory instance to generate and validate tokens

  • password_factory (PasswordFactory | None, default: None ) –

    Password factory instance to handle password hashing and verification, by default None. If None, a default PasswordFactory will be used.

  • user_name_attribute (str, default: 'username' ) –

    Attribute name to identify the user, by default "username"

  • users_repository_name (str, default: 'users' ) –

    Repository name for user entities, by default "users"

Source code in src/alpha/providers/database_provider.py
def __init__(
    self,
    uow: UnitOfWork,
    token_factory: TokenFactory | None = None,
    password_factory: PasswordFactory | None = None,
    user_name_attribute: str = "username",
    users_repository_name: str = "users",
) -> None:
    """Database Identity Provider implementation for user authentication
    and management. This provider uses a database to store user information
    and credentials, and provides methods for authenticating users,
    retrieving user information, and changing passwords.

    Parameters
    ----------
    uow
        Unit of work instance to manage database transactions
    token_factory
        Token factory instance to generate and validate tokens
    password_factory
        Password factory instance to handle password hashing and
        verification, by default None. If None, a default PasswordFactory
        will be used.
    user_name_attribute
        Attribute name to identify the user, by default "username"
    users_repository_name
        Repository name for user entities, by default "users"
    """
    self.uow = uow
    self.token_factory = token_factory
    self._password_factory = password_factory or PasswordFactory()
    self._user_name_attribute = user_name_attribute
    self._users_repository_name = users_repository_name

authenticate

authenticate(credentials)

Authenticate a user using their credentials.

Parameters:
Returns:
  • Identity instance representing the authenticated user
Source code in src/alpha/providers/database_provider.py
def authenticate(self, credentials: PasswordCredentials) -> Identity:
    """Authenticate a user using their credentials.

    Parameters
    ----------
    credentials
        Password credentials for the user

    Returns
    -------
        Identity instance representing the authenticated user
    """
    with self.uow:
        users: SqlRepository[User] = getattr(
            self.uow, self._users_repository_name
        )
        user = self._verify_password(
            credentials=credentials, user_repository=users
        )

        return Identity.from_user(user)

get_user

get_user(subject)

Retrieve a user by their subject identifier.

Parameters:
  • subject (str) –

    The subject identifier of the user

Returns:
  • Identity instance representing the user
Source code in src/alpha/providers/database_provider.py
def get_user(self, subject: str) -> Identity:
    """Retrieve a user by their subject identifier.

    Parameters
    ----------
    subject
        The subject identifier of the user

    Returns
    -------
        Identity instance representing the user
    """
    with self.uow:
        users: SqlRepository[User] = getattr(
            self.uow, self._users_repository_name
        )
        user = self._get_user(
            username=subject, user_repository=users, attribute_name="id"
        )

        return Identity.from_user(user)

change_password

change_password(credentials, new_password)

Change the password for a user.

Parameters:
  • credentials (PasswordCredentials) –

    Password credentials for the user

  • new_password (str) –

    The new password to set for the user

Source code in src/alpha/providers/database_provider.py
def change_password(
    self, credentials: PasswordCredentials, new_password: str
) -> None:
    """Change the password for a user.

    Parameters
    ----------
    credentials
        Password credentials for the user
    new_password
        The new password to set for the user
    """
    with self.uow:
        users: SqlRepository[User] = getattr(
            self.uow, self._users_repository_name
        )
        user = self._verify_password(
            credentials=credentials, user_repository=users
        )

        self._update_user_password(user, new_password)
        self.uow.commit()