Source code for canarieapi.logparser

# -- Standard lib ------------------------------------------------------------
import re
import sqlite3
from datetime import datetime, timezone
from typing import Dict, Optional, Union

# -- 3rd party ---------------------------------------------------------------
from dateutil.parser import parse as dt_parse

# -- Project specific --------------------------------------------------------
from canarieapi.app_object import APP
from canarieapi.utility_rest import get_db, retry_db_error_after_init

[docs]RouteStatistics = Dict[str, Dict[str, Union[str, int]]]
[docs]def parse_datetime(dt_str: str) -> datetime: """ Parse datetime string from log and return it with TimeZone awareness. """ dt = dt_parse(dt_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt
[docs]def parse_log(filename: str, database: Optional[sqlite3.Connection] = None) -> RouteStatistics: # Load config logger = APP.logger logger.info("Loading configuration") config = APP.config srv_stats = {route: config["SERVICES"][route]["stats"] for route in config["SERVICES"]} pf_stats = {route: config["PLATFORMS"][route]["stats"] for route in config["PLATFORMS"]} all_stats = srv_stats all_stats.update(pf_stats) route_stats = {} for route in all_stats: route_regex = all_stats[route] try: route_stats[route] = { "method_regex": re.compile(route_regex["method"]), "route_regex": re.compile(route_regex["route"]), "count": 0, "last_access": None, } except Exception: logger.error("Exception occurs while trying to compile regex of %s", route) raise # get the last entry from the logs in order to not duplicate entries if the same log file is read multiple times with APP.app_context(): db = database or get_db() cur = db.cursor() cur.execute("select last_access from stats order by last_access desc limit 1") records = cur.fetchone() if records: last_access = parse_datetime(records[0]) else: last_access = None # Load access log logger.info("Loading log file : %s", filename) log_regex = re.compile(r".*\[(?P<datetime>.*)\] \"(?P<method>[A-Z]+) (?P<route>/.*) .*") # pylint: disable=C4001 log_records = [] with open(filename, mode="r", encoding="utf-8") as f: for line in f: match = log_regex.match(line) if match: records = match.groupdict() if last_access is None or parse_datetime(records["datetime"]) > last_access: log_records.append(records) # Compile stats logger.info("Compiling stats from %s records", len(log_records)) for record in log_records: for route, value in route_stats.items(): if value["route_regex"].match(record["route"]) and value["method_regex"].match(record["method"]): value["count"] = value["count"] + 1 value["last_access"] = record["datetime"] break return route_stats
@retry_db_error_after_init
[docs]def update_db(route_stats: RouteStatistics, database: Optional[sqlite3.Connection] = None) -> None: # Update stats in database logger = APP.logger logger.info("Updating database") with APP.app_context(): db = database or get_db() cur = db.cursor() for route, value in route_stats.items(): if not value["count"]: continue logger.info("Adding %s invocations to route %s", value["count"], route) query = ( "insert or replace into stats (route, invocations, last_access) values (" "?, " " ifnull((select invocations from stats where route=?),0) + ?, " "?)" ) # sqlite can take the date as a string as long as it is formatted using ISO-8601 cur.execute(query, [route, route, value["count"], value["last_access"]]) cur.execute("insert or replace into cron (job, last_execution) values ('log', CURRENT_TIMESTAMP)") db.commit() db.close()
[docs]def cron_job() -> None: if APP.config.get("PARSE_LOGS", True): logger = APP.logger logger.info("Cron job for parsing server log") access_log_fn = APP.config["DATABASE"]["access_log"] update_db(parse_log(access_log_fn)) logger.info("Done")
if __name__ == "__main__": cron_job()