OIDCConnector

OIDC connector for interacting with an OpenID Connect identity provider via OIDC/OAuth2 protocols.

Parameters:
  • token_url (str) –

    Token endpoint URL or relative path.

  • userinfo_url (str | None, default: None ) –

    Optional userinfo endpoint URL or relative path.

  • introspection_url (str | None, default: None ) –

    Optional token introspection endpoint URL or relative path.

  • client_id (str | None, default: None ) –

    OAuth2 client identifier used for token requests.

  • client_secret (str | None, default: None ) –

    OAuth2 client secret used for token requests.

  • scope (str | list[str] | None, default: None ) –

    Space-delimited OAuth2 scopes for standard token requests.

  • verify_tls (bool, default: True ) –

    Whether to verify TLS certificates for HTTP requests.

  • timeout_seconds (int, default: 10 ) –

    Request timeout in seconds.

  • use_basic_auth (bool, default: True ) –

    Whether to send client credentials via HTTP Basic Auth.

  • base_url (str | None, default: None ) –

    Optional base URL to resolve relative endpoints.

  • user_lookup_url_template (str | None, default: None ) –

    Template URL used to look up a user by subject/username.

  • admin_client_id (str | None, default: None ) –

    Optional client identifier for admin token requests.

  • admin_client_secret (str | None, default: None ) –

    Optional client secret for admin token requests.

  • admin_scope (str | list[str] | None, default: None ) –

    Optional scope override for admin token requests.

Source code in src/alpha/infra/connectors/oidc_connector.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
class OIDCConnector:
    """OIDC connector for interacting with an OpenID Connect identity provider
    via OIDC/OAuth2 protocols.

    Parameters
    ----------
    token_url
        Token endpoint URL or relative path.
    userinfo_url
        Optional userinfo endpoint URL or relative path.
    introspection_url
        Optional token introspection endpoint URL or relative path.
    client_id
        OAuth2 client identifier used for token requests.
    client_secret
        OAuth2 client secret used for token requests.
    scope
        Space-delimited OAuth2 scopes for standard token requests.
    verify_tls
        Whether to verify TLS certificates for HTTP requests.
    timeout_seconds
        Request timeout in seconds.
    use_basic_auth
        Whether to send client credentials via HTTP Basic Auth.
    base_url
        Optional base URL to resolve relative endpoints.
    user_lookup_url_template
        Template URL used to look up a user by subject/username.
    admin_client_id
        Optional client identifier for admin token requests.
    admin_client_secret
        Optional client secret for admin token requests.
    admin_scope
        Optional scope override for admin token requests.
    """

    def __init__(
        self,
        token_url: str,
        userinfo_url: str | None = None,
        introspection_url: str | None = None,
        client_id: str | None = None,
        client_secret: str | None = None,
        scope: str | list[str] | None = None,
        verify_tls: bool = True,
        timeout_seconds: int = 10,
        use_basic_auth: bool = True,
        base_url: str | None = None,
        user_lookup_url_template: str | None = None,
        admin_client_id: str | None = None,
        admin_client_secret: str | None = None,
        admin_scope: str | list[str] | None = None,
    ) -> None:
        self._base_url = base_url
        self._token_url = self._build_url(token_url)
        self._userinfo_url = (
            self._build_url(userinfo_url) if userinfo_url else None
        )
        self._introspection_url = (
            self._build_url(introspection_url) if introspection_url else None
        )
        self._user_lookup_url_template = user_lookup_url_template
        self._client_id = client_id
        self._client_secret = client_secret
        self._scope = self._sanitize_scope(scope)
        self._verify_tls = verify_tls
        self._timeout_seconds = timeout_seconds
        self._use_basic_auth = use_basic_auth
        self._admin_client_id = admin_client_id or client_id
        self._admin_client_secret = admin_client_secret or client_secret
        self._admin_scope = self._sanitize_scope(admin_scope) or self._scope

        self._session = requests.Session()

    def close(self) -> None:
        """Close the underlying HTTP session to release resources."""
        self._session.close()

    def __enter__(self) -> "OIDCConnector":
        """Enter the runtime context related to this object."""
        return self

    def __exit__(self, exc_type, exc, tb) -> None:
        """Exit the runtime context and close the HTTP session."""
        self.close()
    @property
    def userinfo_url(self) -> str | None:
        """Return the configured userinfo endpoint URL.

        Returns
        -------
            The full userinfo endpoint URL or None.
        """
        return self._userinfo_url

    @property
    def introspection_url(self) -> str | None:
        """Return the configured introspection endpoint URL.

        Returns
        -------
            The full introspection endpoint URL or None.
        """
        return self._introspection_url

    @property
    def user_lookup_url_template(self) -> str | None:
        """Return the configured user lookup URL template.

        Returns
        -------
            URL template for user lookup or None.
        """
        return self._user_lookup_url_template

    def request_password_token(
        self, username: str, password: str
    ) -> dict[str, Any]:
        """Request an access token using the password grant.

        Parameters
        ----------
        username
            Resource owner username.
        password
            Resource owner password.

        Returns
        -------
            Token response payload from the identity provider.
        """
        data: dict[str, Any] = {
            "grant_type": "password",
            "username": username,
            "password": password,
        }
        if self._scope:
            data["scope"] = self._scope
        return self._post_token(data)

    def request_client_credentials_token(self) -> dict[str, Any]:
        """Request an access token using the client credentials grant.

        Returns
        -------
            Token response payload from the identity provider.
        """
        data: dict[str, Any] = {"grant_type": "client_credentials"}
        if self._admin_scope:
            data["scope"] = self._admin_scope
        return self._post_token(data, use_admin=True)

    def get_userinfo(self, access_token: str) -> dict[str, Any]:
        """Fetch user profile information for the given access token.

        Parameters
        ----------
        access_token
            Bearer access token.

        Returns
        -------
            Userinfo response payload.

        Raises
        ------
        exceptions.MissingConfigurationException
            If the userinfo endpoint is not configured.
        """
        if not self._userinfo_url:
            raise exceptions.MissingConfigurationException(
                "userinfo_url is not configured"
            )
        return self._request_json(
            "GET",
            self._userinfo_url,
            headers={"Authorization": f"Bearer {access_token}"},
        )

    def introspect_token(self, token: str) -> dict[str, Any]:
        """Introspect a token using the configured endpoint.

        Parameters
        ----------
        token
            Access or refresh token to introspect.

        Returns
        -------
            Introspection response payload.

        Raises
        ------
        exceptions.MissingConfigurationException
            If the introspection endpoint is not configured.
        """
        if not self._introspection_url:
            raise exceptions.MissingConfigurationException(
                "introspection_url is not configured"
            )
        data = {"token": token}
        return self._request_json(
            "POST",
            self._introspection_url,
            data=data,
            auth=self._build_auth(),
        )

    def get_user_by_subject(self, subject: str) -> dict[str, Any]:
        """Look up a user by subject using the admin client.

        Parameters
        ----------
        subject
            Subject or username to look up.

        Returns
        -------
            User representation returned by the provider.

        Raises
        ------
        exceptions.MissingConfigurationException
            If the user lookup URL template is not configured.
        exceptions.IdentityError
            If an admin token cannot be obtained or response is invalid.
        exceptions.UserNotFoundException
            If the user is not found.
        """
        if not self._user_lookup_url_template:
            raise exceptions.MissingConfigurationException(
                "user_lookup_url_template is not configured"
            )

        token_data = self.request_client_credentials_token()
        access_token = token_data.get("access_token")
        if not access_token:
            raise exceptions.IdentityError(
                "Unable to obtain admin access token"
            )

        # URL-encode the subject to prevent format string injection
        url = self._user_lookup_url_template.format(subject=quote(subject, safe=''))
        response: Any = self._request_json(
            "GET",
            url,
            headers={"Authorization": f"Bearer {access_token}"},
        )

        if isinstance(response, list):
            if not response:
                raise exceptions.UserNotFoundException(
                    f"User '{subject}' not found by identity provider"
                )
            first_item = cast(Mapping[str, Any], response[0])
            return dict(first_item)

        if isinstance(response, Mapping):
            return dict(cast(Mapping[str, Any], response))

        raise exceptions.IdentityError(
            "Unexpected response while fetching user"
        )

    def _post_token(
        self, data: dict[str, Any], use_admin: bool = False
    ) -> dict[str, Any]:
        """Request a token at the token endpoint.

        Parameters
        ----------
        data
            Form payload to send to the token endpoint.
        use_admin
            Whether to use admin client credentials, by default False.

        Returns
        -------
            Token response payload from the identity provider.

        Raises
        ------
        exceptions.MissingConfigurationException
            If the required client identifier is missing.
        """
        if use_admin:
            client_id = self._admin_client_id
            client_secret = self._admin_client_secret
        else:
            client_id = self._client_id
            client_secret = self._client_secret

        if not client_id:
            raise exceptions.MissingConfigurationException(
                "client_id is not configured"
            )

        if not self._token_url:
            raise exceptions.MissingConfigurationException(
                "token_url is not configured"
            )

        if not self._use_basic_auth:
            data["client_id"] = client_id
            if client_secret:
                data["client_secret"] = client_secret

        auth = (
            HTTPBasicAuth(client_id, client_secret)
            if self._use_basic_auth and client_secret
            else None
        )

        return self._request_json(
            method="POST",
            url=self._token_url,
            data=data,
            auth=auth,
        )

    def _request_json(
        self,
        method: str,
        url: str,
        **kwargs: Any,
    ) -> Any:
        """Execute an HTTP request and return a JSON response.

        Parameters
        ----------
        method
            HTTP method (GET, POST, etc.).
        url
            Absolute URL for the request.
        **kwargs
            Additional arguments passed to `requests.request()`.

        Returns
        -------
            Parsed JSON response payload.

        Raises
        ------
        exceptions.IdentityError
            If the request fails or returns a non-JSON response.
        exceptions.InvalidCredentialsException
            If the identity provider rejects credentials.
        """
        try:
            response = self._session.request(
                method,
                url,
                timeout=self._timeout_seconds,
                verify=self._verify_tls,
                **kwargs,
            )
        except requests.RequestException as exc:
            raise exceptions.IdentityError(
                f"OAuth2 request failed: {exc}"
            ) from exc

        if response.status_code >= 400:
            message = self._extract_error_message(response)
            if response.status_code in (400, 401, 403):
                raise exceptions.InvalidCredentialsException(message)
            raise exceptions.IdentityError(message)

        try:
            return response.json()
        except ValueError as exc:
            raise exceptions.IdentityError(
                "OAuth2 response did not include JSON payload"
            ) from exc

    @staticmethod
    def _extract_error_message(response: requests.Response) -> str:
        """Extract a provider error message from an HTTP response.

        Parameters
        ----------
        response
            HTTP response returned by the provider.

        Returns
        -------
            Human-readable error message.
        """
        try:
            payload = response.json()
            error = payload.get("error_description") or payload.get("error")
            if error:
                return str(error)
        except ValueError:
            # If the response body is not valid JSON, fall back to a generic message.
            pass
        return f"OAuth2 request failed with status {response.status_code}"

    def _build_url(self, url: str | None) -> str | None:
        """Resolve a relative endpoint against the configured base URL.

        Parameters
        ----------
        url
            Absolute or relative URL.

        Returns
        -------
            Absolute URL when possible, otherwise None.
        """
        if url is None:
            return None
        if self._base_url and not url.startswith("http"):
            return urljoin(self._base_url.rstrip("/") + "/", url)
        return url

    def _build_auth(self) -> HTTPBasicAuth | None:
        """Create HTTP Basic Auth credentials for introspection requests.

        Returns
        -------
            Basic auth instance when configured, otherwise None.
        """
        if not self._client_id or not self._client_secret:
            return None
        if not self._use_basic_auth:
            return None
        return HTTPBasicAuth(self._client_id, self._client_secret)

    def _sanitize_scope(self, scope: str | list[str] | None) -> str | None:
        """Sanitize the scope parameter to be a space-delimited string.

        Parameters
        ----------
        scope
            Scope as a string or list of strings.

        Returns
        -------
            Space-delimited scope string or None.
        """
        if scope is None:
            return None
        if isinstance(scope, list):
            return " ".join(scope)
        return scope

userinfo_url property

userinfo_url

Return the configured userinfo endpoint URL.

Returns:
  • The full userinfo endpoint URL or None.

introspection_url property

introspection_url

Return the configured introspection endpoint URL.

Returns:
  • The full introspection endpoint URL or None.

user_lookup_url_template property

user_lookup_url_template

Return the configured user lookup URL template.

Returns:
  • URL template for user lookup or None.

Methods:

close

close()

Close the underlying HTTP session to release resources.

Source code in src/alpha/infra/connectors/oidc_connector.py
def close(self) -> None:
    """Close the underlying HTTP session to release resources."""
    self._session.close()

__enter__

__enter__()

Enter the runtime context related to this object.

Source code in src/alpha/infra/connectors/oidc_connector.py
def __enter__(self) -> "OIDCConnector":
    """Enter the runtime context related to this object."""
    return self

__exit__

__exit__(exc_type, exc, tb)

Exit the runtime context and close the HTTP session.

Source code in src/alpha/infra/connectors/oidc_connector.py
def __exit__(self, exc_type, exc, tb) -> None:
    """Exit the runtime context and close the HTTP session."""
    self.close()

request_password_token

request_password_token(username, password)

Request an access token using the password grant.

Parameters:
  • username (str) –

    Resource owner username.

  • password (str) –

    Resource owner password.

Returns:
  • Token response payload from the identity provider.
Source code in src/alpha/infra/connectors/oidc_connector.py
def request_password_token(
    self, username: str, password: str
) -> dict[str, Any]:
    """Request an access token using the password grant.

    Parameters
    ----------
    username
        Resource owner username.
    password
        Resource owner password.

    Returns
    -------
        Token response payload from the identity provider.
    """
    data: dict[str, Any] = {
        "grant_type": "password",
        "username": username,
        "password": password,
    }
    if self._scope:
        data["scope"] = self._scope
    return self._post_token(data)

request_client_credentials_token

request_client_credentials_token()

Request an access token using the client credentials grant.

Returns:
  • Token response payload from the identity provider.
Source code in src/alpha/infra/connectors/oidc_connector.py
def request_client_credentials_token(self) -> dict[str, Any]:
    """Request an access token using the client credentials grant.

    Returns
    -------
        Token response payload from the identity provider.
    """
    data: dict[str, Any] = {"grant_type": "client_credentials"}
    if self._admin_scope:
        data["scope"] = self._admin_scope
    return self._post_token(data, use_admin=True)

get_userinfo

get_userinfo(access_token)

Fetch user profile information for the given access token.

Parameters:
  • access_token (str) –

    Bearer access token.

Returns:
  • Userinfo response payload.
Raises:
Source code in src/alpha/infra/connectors/oidc_connector.py
def get_userinfo(self, access_token: str) -> dict[str, Any]:
    """Fetch user profile information for the given access token.

    Parameters
    ----------
    access_token
        Bearer access token.

    Returns
    -------
        Userinfo response payload.

    Raises
    ------
    exceptions.MissingConfigurationException
        If the userinfo endpoint is not configured.
    """
    if not self._userinfo_url:
        raise exceptions.MissingConfigurationException(
            "userinfo_url is not configured"
        )
    return self._request_json(
        "GET",
        self._userinfo_url,
        headers={"Authorization": f"Bearer {access_token}"},
    )

introspect_token

introspect_token(token)

Introspect a token using the configured endpoint.

Parameters:
  • token (str) –

    Access or refresh token to introspect.

Returns:
  • Introspection response payload.
Raises:
Source code in src/alpha/infra/connectors/oidc_connector.py
def introspect_token(self, token: str) -> dict[str, Any]:
    """Introspect a token using the configured endpoint.

    Parameters
    ----------
    token
        Access or refresh token to introspect.

    Returns
    -------
        Introspection response payload.

    Raises
    ------
    exceptions.MissingConfigurationException
        If the introspection endpoint is not configured.
    """
    if not self._introspection_url:
        raise exceptions.MissingConfigurationException(
            "introspection_url is not configured"
        )
    data = {"token": token}
    return self._request_json(
        "POST",
        self._introspection_url,
        data=data,
        auth=self._build_auth(),
    )

get_user_by_subject

get_user_by_subject(subject)

Look up a user by subject using the admin client.

Parameters:
  • subject (str) –

    Subject or username to look up.

Returns:
  • User representation returned by the provider.
Raises:
Source code in src/alpha/infra/connectors/oidc_connector.py
def get_user_by_subject(self, subject: str) -> dict[str, Any]:
    """Look up a user by subject using the admin client.

    Parameters
    ----------
    subject
        Subject or username to look up.

    Returns
    -------
        User representation returned by the provider.

    Raises
    ------
    exceptions.MissingConfigurationException
        If the user lookup URL template is not configured.
    exceptions.IdentityError
        If an admin token cannot be obtained or response is invalid.
    exceptions.UserNotFoundException
        If the user is not found.
    """
    if not self._user_lookup_url_template:
        raise exceptions.MissingConfigurationException(
            "user_lookup_url_template is not configured"
        )

    token_data = self.request_client_credentials_token()
    access_token = token_data.get("access_token")
    if not access_token:
        raise exceptions.IdentityError(
            "Unable to obtain admin access token"
        )

    # URL-encode the subject to prevent format string injection
    url = self._user_lookup_url_template.format(subject=quote(subject, safe=''))
    response: Any = self._request_json(
        "GET",
        url,
        headers={"Authorization": f"Bearer {access_token}"},
    )

    if isinstance(response, list):
        if not response:
            raise exceptions.UserNotFoundException(
                f"User '{subject}' not found by identity provider"
            )
        first_item = cast(Mapping[str, Any], response[0])
        return dict(first_item)

    if isinstance(response, Mapping):
        return dict(cast(Mapping[str, Any], response))

    raise exceptions.IdentityError(
        "Unexpected response while fetching user"
    )