Source code for wxc_sdk.integration

An OAuth integration
import concurrent.futures
import http.server
import json
import logging
import socketserver
import threading
import urllib.parse
import uuid
import webbrowser
from import Callable
from dataclasses import dataclass
from typing import Union, Optional

import requests
import yaml

from import dump_response
from ..tokens import Tokens

log = logging.getLogger(__name__)

__all__ = ['Integration']

[docs] @dataclass(init=False) class Integration: """ An OAuth integration """ #: integration's client id, obtained from client_id: str #: integration's client secret, obtained from client_secret: str #: OAuth scopes of the integration. scopes: list[str] #: redirect URL of the integration redirect_url: str #: URL of the authorization service; used as part of the URL to start an OAuth flow auth_service: str #: base URL of the access token service token_service: str #: callback to initiate auth flow initiate_flow_callback: Callable[[str], None] def __init__(self, *, client_id: str, client_secret: str, scopes: Union[str, list[str]], redirect_url: str, auth_service: str = None, token_service: str = None, initiate_flow_callback: Callable[[str], None] = None): """ :param client_id: integration's client id, obtained from :param client_secret: integration's client secret, obtained from :param scopes: integration's scopes. Can be a list of strings or a string containing a list of space separated scopes :param redirect_url: integration's redirect URL :param auth_service: authorization service to be used in the authorization URL. Default: '' :param token_service: URL of token service to use to obtain tokens from. Default: '' :param initiate_flow_callback: callback to initiate flow. Default: """ self.client_id = client_id self.client_secret = client_secret if isinstance(scopes, list): self.scopes = scopes else: scopes: str self.scopes = scopes.split() self.redirect_url = redirect_url self.auth_service = auth_service or '' self.token_service = token_service or '' self.initiate_flow_callback = initiate_flow_callback or
[docs] def auth_url(self, state: str) -> str: """ Get the URL to start an OAuth flow for the integration :param state: state in redirect URL :type state: str :return: URL to start the OAuth flow :rtype: str """ scopes = self.scopes if isinstance(scopes, list): scopes = ' '.join(scopes) params = { 'client_id': self.client_id, 'response_type': 'code', 'redirect_uri': self.redirect_url, 'scope': scopes, 'state': state } full_url = f'{self.auth_service}?{urllib.parse.urlencode(params)}' return full_url
[docs] def tokens_from_code(self, code: str) -> Tokens: """ Get a new set of tokens from code at end of OAuth flow :param code: code obtained at end of SAML 2.0 OAuth flow :type code: str :return: new tokens :rtype: Tokens """ url = self.token_service data = { 'grant_type': 'authorization_code', 'client_id': self.client_id, 'client_secret': self.client_secret, 'code': code, 'redirect_uri': self.redirect_url } with requests.Session() as session: response =, data=data) dump_response(response, dump_log=log) response.raise_for_status() json_data = response.json() tokens = Tokens.model_validate(json_data) tokens.set_expiration() return tokens
[docs] def refresh(self, tokens: Tokens): """ Try to get a new access token using the refresh token. :param tokens: Tokens. Access token and expirations get updated in place. :raise: :class:`requests.HTTPError`: if request to obtain new access token fails """ data = { 'grant_type': 'refresh_token', 'client_id': self.client_id, 'client_secret': self.client_secret, 'refresh_token': tokens.refresh_token } try: url = self.token_service with requests.Session() as session: with, data=data) as response: dump_response(response=response, dump_log=log) response.raise_for_status() json_data = response.json() except requests.HTTPError: tokens.access_token = None raise else: new_tokens = Tokens.model_validate(json_data) new_tokens: Tokens new_tokens.set_expiration() tokens.update(new_tokens)
[docs] def validate_tokens(self, tokens: Tokens, min_lifetime_seconds: int = 300) -> bool: """ Validate tokens If remaining life time is to small then try to get a new access token using the existing refresh token. If no new access token can be obtained using the refresh token then the access token is set to None and True is returned :param tokens: current OAuth tokens. Get updated if new tokens are created :type tokens: Tokens :param min_lifetime_seconds: minimal remaining lifetime in seconds. Default: 300 seconds :type min_lifetime_seconds: int :return: True -> min lifetime reached and tried to get new access token. :rtype: bool """ if tokens.remaining >= min_lifetime_seconds: return False log.debug(f'Getting new access token, valid until {tokens.expires_at}, remaining {tokens.remaining}') try: self.refresh(tokens=tokens) except requests.HTTPError: # ignore HTTPErrors pass return True
[docs] def get_tokens_from_oauth_flow(self) -> Optional[Tokens]: """ Initiate an OAuth flow to obtain new tokens. start a local webserver on port 6001 o serve the last step in the OAuth flow :param self: Integration to use for the flow :type: :class:`wxc_sdk.integration.Integration` :return: set of new tokens if successful, else None :rtype: :class:`wxc_sdk.tokens.Tokens`` """ def serve_redirect(): """ Temporarily start a web server to serve the redirect URI at http://localhost:6001/redirect' :return: parses query of the GET on the redirect URI """ # mutable to hold the query result oauth_response = dict() class RedirectRequestHandler(http.server.BaseHTTPRequestHandler): # handle the GET request on the redirect URI # noinspection PyPep8Naming def do_GET(self): # serve exactly one GET on the redirect URI and then we are done parsed = urllib.parse.urlparse(self.path) if parsed.path == '/redirect': log.debug('serve_redirect: got GET on /redirect') query = urllib.parse.parse_qs(parsed.query) oauth_response['query'] = query # we are done self.shutdown(self.server) self.send_response(200) self.flush_headers() @staticmethod def shutdown(server: socketserver.BaseServer): log.debug('serve_redirect: shutdown of local web server requested') threading.Thread(target=server.shutdown, daemon=True).start() httpd = http.server.HTTPServer(server_address=('', 6001), RequestHandlerClass=RedirectRequestHandler) log.debug('serve_redirect: starting local web server for redirect URI') httpd.serve_forever() httpd.server_close() log.debug(f'serve_redirect: server terminated, result {oauth_response["query"]}') return oauth_response['query'] state = str(uuid.uuid4()) auth_url = self.auth_url(state=state) with concurrent.futures.ThreadPoolExecutor() as executor: # start web server fut = executor.submit(serve_redirect) # open authentication URL in local webbrowser self.initiate_flow_callback(auth_url) # wait for GET on redirect URI and get the result (parsed query of redirect URI) try: result = fut.result(timeout=120) except concurrent.futures.TimeoutError: # noinspection PyBroadException try: # post a dummy response to the redirect URI to stop the server with requests.Session() as session: session.get(self.redirect_url, params={'code': 'foo'}) except Exception: pass log.warning('Authorization did not finish in time (60 seconds)') return code = result['code'][0] response_state = result['state'][0] assert response_state == state # get access tokens new_tokens = self.tokens_from_code(code=code) if new_tokens is None: log.error('Failed to obtain tokens') return None return new_tokens
[docs] def get_cached_tokens(self, read_from_cache: Callable[[], Optional[Tokens]], write_to_cache: Callable[[Tokens], None], force_new: bool = False) -> Optional[Tokens]: """ Get tokens. Tokens are read from cache and then verified. If needed an OAuth flow is initiated to get a new set of tokens. For this the redirect URL http://localhost:6001/redirect is expected. :param read_from_cache: callback to read tokens from cache. Called without parameters and is expected to return a :class:`wxc_sdk.tokens.Tokens` instance with the cached tokens. If cached tokens cannot be provided then None should be returned. :param write_to_cache: callback to write updated tokens back to cache. The callback is called with a :class:`wxc_sdk.tokens.Tokens` instance as only argument. :param force_new: flag, don't read cached tokens and always get tokens from OAuth flow :type force_new: bool :return: set of tokens or None :rtype: :class:`wxc_sdk.tokens.Tokens` """ # read tokens from cache tokens = not force_new and read_from_cache() or None if tokens: # validate tokens changed = self.validate_tokens(tokens=tokens) if not tokens.access_token: tokens = None elif changed: write_to_cache(tokens) if not tokens: # get new tokens via integration if needed tokens = self.get_tokens_from_oauth_flow() if tokens: write_to_cache(tokens) return tokens
[docs] def get_cached_tokens_from_yml(self, yml_path: str, force_new: bool = False) -> Optional[Tokens]: """ Get tokens. Tokens are read from given YML file and then verified. If needed an OAuth flow is initiated to get a new set of tokens. For this the redirect URL http://localhost:6001/redirect is expected. :param yml_path: path to YML file to be used to cache tokens :param force_new: flag, don't read cached tokens and always get tokens from OAuth flow :type force_new: bool :return: set of tokens or None :rtype: :class:`wxc_sdk.tokens.Tokens` """ def write_tokens(tokens_to_cache: Tokens): with open(yml_path, mode='w') as f: yaml.safe_dump(json.loads(tokens_to_cache.model_dump_json()), f) return def read_tokens() -> Optional[Tokens]: try: with open(yml_path, mode='r') as f: data = yaml.safe_load(f) tokens_read = Tokens.model_validate(data) except Exception as e:'failed to read tokens from file: {e}') tokens_read = None return tokens_read return self.get_cached_tokens(read_from_cache=read_tokens, write_to_cache=write_tokens, force_new=force_new)