Source code for canarieapi.monitoring

# -- Standard lib ------------------------------------------------------------
import re
import sqlite3
from typing import Dict, Optional, Tuple, Union
from typing_extensions import Literal, NotRequired, Required, TypedDict

# -- 3rd party modules -------------------------------------------------------
import requests
from requests.exceptions import ConnectionError, Timeout  # pylint: disable=W0622

# -- Project specific --------------------------------------------------------
from canarieapi.app_object import APP
from canarieapi.status import Status
from canarieapi.utility_rest import JSON, get_db, retry_db_error_after_init

[docs]Number = Union[float, int]
[docs]RequestConfig = TypedDict("RequestConfig", { "url": Required[str], "method": NotRequired[Literal[ "GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE", "get", "options", "head", "post", "put", "patch", "delete", ]], "params": NotRequired[Optional[Union[str, Dict[str, str]]]], "headers": NotRequired[Dict[str, str]], "cookies": NotRequired[Dict[str, str]], "json": NotRequired[Optional[JSON]], "data": NotRequired[Optional[str]], "timeout": NotRequired[Optional[Union[Number, Tuple[Number, Number]]]], "proxies": NotRequired[Dict[str, str]], "allow_redirects": NotRequired[bool], "stream": NotRequired[bool], "verify": NotRequired[bool], "cert": NotRequired[Optional[Union[str, Tuple[str, str]]]], }, total=False)
[docs]ResponseConfig = TypedDict("ResponseConfig", { "status_code": NotRequired[Optional[int]], "text": NotRequired[Optional[str]], }, total=True)
@retry_db_error_after_init
[docs]def monitor(*, update_db: bool = True, database: Optional[sqlite3.Connection] = None) -> None: # Load config logger = APP.logger config = APP.config logger.info("Loading configuration") srv_mon = {route: config["SERVICES"][route]["monitoring"] for route in config["SERVICES"]} pf_mon = {route: config["PLATFORMS"][route]["monitoring"] for route in config["PLATFORMS"]} all_mon = srv_mon all_mon.update(pf_mon) logger.info("Checking status of routes...") with APP.app_context(): if update_db: db = database or get_db() cur = db.cursor() query = "insert or replace into status (route, service, status, message) values (?, ?, ?, ?)" for route in all_mon: for service, test_dic in all_mon[route].items(): try: status, message = check_service(request=test_dic["request"], response=test_dic.get("response", {})) except Exception: logger.error("Exception occurs while trying to check status of %s.%s", route, service) raise logger.info("%s.%s : %s", route, service, Status.pretty_msg(status)) if update_db: values = [route, service, status, (message[0:253] + "...") if len(message) > 256 else message] cur.execute(query, values) if update_db: cur.execute("insert or replace into cron (job, last_execution) values ('status', CURRENT_TIMESTAMP)") db.commit() db.close()
[docs]def check_service(request: RequestConfig, response: ResponseConfig) -> Tuple[Status, str]: default_request: RequestConfig = { "timeout": 5, "headers": {}, "params": None, "data": None, "json": None, "method": "get", "url": "http://google.com" } default_request.update(request) default_response: ResponseConfig = { "status_code": 200, "text": None } default_response.update(response) logger = APP.logger try: resp = requests.request(**default_request) except (ConnectionError, Timeout) as exc: url = default_request["url"] message = f"Cannot reach {url} : {exc!s}" logger.warning(message) return Status.down, message if resp.status_code != default_response["status_code"]: message = "Bad return code from {0} (Expecting {1}, Got {2}".format( default_request["url"], default_response["status_code"], resp.status_code) logger.warning(message) return Status.bad, message if default_response["text"]: text_regex = re.compile(default_response["text"]) if not resp.text or not text_regex.match(resp.text): message = "Bad response content from {0} (Expecting : \n{1}\nGot : \n{2}".format( default_request["url"], default_response["text"], resp.text) logger.warning(message) return Status.bad, message return Status.ok, ""
[docs]def cron_job() -> None: logger = APP.logger logger.info("Cron job for monitoring routes status") monitor() logger.info("Done")
if __name__ == "__main__": cron_job()