Source code for mattermostautodriver.client

"""
Client for the driver, which holds information about the logged in user
and actually makes the requests to the mattermost server
"""

import logging
import httpx

from .exceptions import (
    InvalidOrMissingParameters,
    NoAccessTokenProvided,
    NotEnoughPermissions,
    ResourceNotFound,
    MethodNotAllowed,
    ContentTooLarge,
    FeatureDisabled,
)

log = logging.getLogger("mattermostautodriver.websocket")
log.setLevel(logging.INFO)


class BaseClient:
    def __init__(self, options):
        self._url = self._make_url(options["scheme"], options["url"], options["port"], options["basepath"])
        self._scheme = options["scheme"]
        self._basepath = options["basepath"]
        self._port = options["port"]
        self._auth = options["auth"]
        if options["debug"]:
            self.activate_verbose_logging()

        self._options = options
        self._token = ""
        self._cookies = None
        self._userid = ""
        self._username = ""
        self._proxies = None
        if options["proxy"]:
            self._proxies = {"all://": options["proxy"]}

    @staticmethod
    def _make_url(scheme, url, port, basepath):
        return f"{scheme:s}://{url:s}:{port:d}{basepath:s}"

    @staticmethod
    def activate_verbose_logging(level=logging.DEBUG):
        # We register handlers for mattermostautodriver which takes care of
        # mattermostautodriver.websocket and mattermostautodriver.api
        #
        # In addition we also add handlers to httpx and httpcore loggers
        # if none are present

        loggers = (
            "mattermostautodriver",
            "httpx",
            "httpcore",
        )

        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(levelname)s [%(asctime)s] %(name)s - %(message)s"))

        for logger in loggers:
            _logger = logging.getLogger(logger)
            _logger.setLevel(level)

            if not _logger.hasHandlers():
                _logger.addHandler(handler)

    @property
    def userid(self):
        """
        :return: The user id of the logged in user
        """
        return self._userid

    @userid.setter
    def userid(self, user_id):
        self._userid = user_id

    @property
    def username(self):
        """
        :return: The username of the logged in user. If none, returns an emtpy string.
        """
        return self._username

    @property
    def request_timeout(self):
        """
        :return: The configured timeout for the requests
        """
        return self._options["request_timeout"]

    @username.setter
    def username(self, username):
        self._username = username

    @property
    def url(self):
        return self._url

    @property
    def cookies(self):
        """
        :return: The cookie given on login
        """
        return self._cookies

    @cookies.setter
    def cookies(self, cookies):
        self._cookies = cookies

    @property
    def token(self):
        """
        :return: The token for the login
        """
        return self._token

    @token.setter
    def token(self, t):
        self._token = t

    def auth_header(self):
        if self._auth:
            return None
        if self._token == "":
            return {}
        return {"Authorization": "Bearer {token:s}".format(token=self._token)}

    def _build_request(self, method, options=None, params=None, data=None, files=None, basepath=None):
        if params is None:
            params = {}
        if data is None:
            data = {}
        if basepath:
            url = self._make_url(self._options["scheme"], self._options["url"], self._options["port"], basepath)
        else:
            url = self.url

        request_params = {"headers": self.auth_header(), "timeout": self.request_timeout}

        if params is not None:
            request_params["params"] = params

        if method in ("post", "put"):
            if options is not None:
                request_params["json"] = options
            if data is not None:
                request_params["data"] = data
            if files is not None:
                request_params["files"] = files

        if self._auth is not None:
            request_params["auth"] = self._auth()

        return self._get_request_method(method, self.client), url, request_params

    @staticmethod
    def _check_response(response):
        try:
            response.raise_for_status()
        except httpx.HTTPStatusError as e:
            try:
                data = e.response.json()
                message = data.get("message", data)
            except ValueError:
                log.debug("Could not convert response to json")
                message = response.text
            log.error(message)

            if e.response.status_code == 400:
                raise InvalidOrMissingParameters(message) from None
            elif e.response.status_code == 401:
                raise NoAccessTokenProvided(message) from None
            elif e.response.status_code == 403:
                raise NotEnoughPermissions(message) from None
            elif e.response.status_code == 404:
                raise ResourceNotFound(message) from None
            elif e.response.status_code == 405:
                raise MethodNotAllowed(message) from None
            elif e.response.status_code == 413:
                raise ContentTooLarge(message) from None
            elif e.response.status_code == 501:
                raise FeatureDisabled(message) from None
            else:
                raise

        log.debug(response)

    @staticmethod
    def _get_request_method(method, client):
        method = method.lower()
        if method == "post":
            return client.post
        elif method == "put":
            return client.put
        elif method == "delete":
            return client.delete
        else:
            return client.get


[docs]class Client(BaseClient): def __init__(self, options): super().__init__(options) self.client = httpx.Client( http2=options.get("http2", False), proxies=self._proxies, verify=options.get("verify", True), ) def make_request(self, method, endpoint, options=None, params=None, data=None, files=None, basepath=None): request, url, request_params = self._build_request(method, options, params, data, files, basepath) response = request(url + endpoint, **request_params) self._check_response(response) return response def __enter__(self): self.client.__enter__() return self def __exit__(self, *exc_info): return self.client.__exit__(*exc_info) def get(self, endpoint, options=None, params=None): response = self.make_request("get", endpoint, options=options, params=params) if response.headers["Content-Type"] != "application/json": log.debug("Response is not application/json, returning raw response") return response try: return response.json() except ValueError: log.debug("Could not convert response to json, returning raw response") return response def post(self, endpoint, options=None, params=None, data=None, files=None): return self.make_request("post", endpoint, options=options, params=params, data=data, files=files).json() def put(self, endpoint, options=None, params=None, data=None): return self.make_request("put", endpoint, options=options, params=params, data=data).json() def delete(self, endpoint, options=None, params=None, data=None): return self.make_request("delete", endpoint, options=options, params=params, data=data).json() def call_webhook(self, hook_id, options=None): return self.make_request("post", "/hooks/" + hook_id, options=options)
class AsyncClient(BaseClient): def __init__(self, options): super().__init__(options) self.client = httpx.AsyncClient( http2=options.get("http2", False), proxies=self._proxies, verify=options.get("verify", True), ) async def __aenter__(self): await self.client.__aenter__() return self async def __aexit__(self, *exc_info): return await self.client.__aexit__(*exc_info) async def make_request(self, method, endpoint, options=None, params=None, data=None, files=None, basepath=None): request, url, request_params = self._build_request(method, options, params, data, files, basepath) response = await request(url + endpoint, **request_params) self._check_response(response) return response async def get(self, endpoint, options=None, params=None): response = await self.make_request("get", endpoint, options=options, params=params) if response.headers["Content-Type"] != "application/json": log.debug("Response is not application/json, returning raw response") return response try: return response.json() except ValueError: log.debug("Could not convert response to json, returning raw response") return response async def post(self, endpoint, options=None, params=None, data=None, files=None): response = await self.make_request("post", endpoint, options=options, params=params, data=data, files=files) return response.json() async def put(self, endpoint, options=None, params=None, data=None): response = await self.make_request("put", endpoint, options=options, params=params, data=data) return response.json() async def delete(self, endpoint, options=None, params=None, data=None): response = await self.make_request("delete", endpoint, options=options, params=params, data=data) return response.json() async def call_webhook(self, hook_id, options=None): response = await self.make_request("post", "/hooks/" + hook_id, options=options) return response.json()