#!/usr/bin/env python
# coding:utf-8
"""
REST API utilities.
This module is a collection of utility functions used mainly by the rest_route
module and which are placed here to keep the rest_route module as clean as possible.
"""
# -- Standard lib ------------------------------------------------------------
import configparser
import functools
import http.client
import inspect
import os
import re
import sqlite3
from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union
from typing_extensions import Literal, Protocol, TypeAlias
# -- 3rd party ---------------------------------------------------------------
from flask import Response, current_app, g, jsonify, redirect, render_template, request
from flask.typing import ResponseReturnValue
from werkzeug.datastructures import MIMEAccept
from werkzeug.exceptions import BadRequest, HTTPException, NotFound
from werkzeug.routing import BaseConverter, Map
# -- Project specific --------------------------------------------------------
from canarieapi.app_object import APP
[docs]APIType = Literal["platform", "service"]
[docs]_JSON: TypeAlias = "JSON" # pylint: disable=C0103
[docs]JSON = Union[ # pylint: disable=C0103
Dict[
str,
Union[
Dict[str, _JSON],
List[_JSON],
_JSON,
float,
int,
str,
bool,
None
]
],
_JSON
]
[docs]ReturnType = TypeVar("ReturnType") # pylint: disable=C0103
[docs]def request_wants_json() -> bool:
"""
Check if the request type is of type JSON considering both the ``Accept`` header and ``f`` query parameter.
The default Media-Type ``*/*`` will be interpreted as JSON.
Omitting a preferred type entirely will also default to JSON.
"""
# Best will be JSON if it's in accepted mimetypes and has a quality greater or equal to HTML.
# For */* both JSON and HTML will have the same quality so JSON still win.
# Unspecified type is usually the case for scripts (requests, curl, etc.).
# In the case of web browsers, the HTML with low quality is usually provided by default.
# This way, the returned type matches by default most of the expected values in both contexts.
accept = request.accept_mimetypes
fmt = str(request.args.get("f", "")).lower()
if fmt in ["json", "html"]:
return fmt == "json"
choices = ["application/json", "text/html"]
best = accept.best_match(choices)
return accept.accept_json and best == "application/json"
[docs]def set_html_as_default_response() -> None:
"""
Set the default response Media-Type, with fallback to JSON.
By default, if the accepted mimetypes contains */*, JSON format will be used.
By calling this function, the */* mimetype will be changed explicitly into
text/html so that it becomes the mimetype used by default.
This is useful for automatically rendering HTML by web browsers that do not
provide explicitly the desired mimetype.
"""
# Best will be HTML if it's in accept mimetypes and
# has a quality greater or equal to JSON.
# For */* both JSON and HTML will have the same quality so HTML still wins
best = request.accept_mimetypes.best_match(["text/html",
"application/json"])
# Replace any */* by HTML so that JSON isn't picked by default
if best == "text/html":
accept = MIMEAccept([("text/html", request.accept_mimetypes["text/html"])])
request.accept_mimetypes = accept # noqa
[docs]def get_config(route_name: str, api_type: APIType) -> JSON:
"""
Return the config for the particular service/platform associated with the given route name.
:param route_name: Route name of the service/platform coming from the URL e.g. :
['pavics', 'node', 'bias', etc.]
:param api_type: Api type of the route which must be one of platform or service
:raises: Exception if the route is unknown
"""
try:
conf = APP.config[api_type.upper() + "S"]
except KeyError:
raise BadRequest(f"The request has been made for an unknown type: [{api_type}]")
try:
return conf[route_name]
except KeyError:
raise NotFound(f"The request has been made for a {api_type} that is not supported: [{route_name}]")
[docs]def validate_route(route_name: str, api_type: APIType) -> None:
"""
Check if the route name is a value amongst known services/platforms in the configuration.
:param route_name: Route name of the service/platform coming from the URL e.g. :
['pavics', 'node', 'bias', etc.]
:param api_type: Api type of the route which must be one of platform or service
:raises: Exception if the route is unknown
"""
get_config(route_name, api_type)
[docs]def get_api_title(route_name: str, api_type: APIType) -> str:
"""
Get the API title to be shown in rendered html page.
:param route_name: Route name of the service/platform coming from the URL e.g. :
['pavics', 'node', 'bias', etc.]
:param api_type: Api type of the route which must be one of platform or service
:returns: An API title
"""
title = api_type.capitalize()
try:
name = get_config(route_name, api_type)["info"]["name"]
title = f"{title}: {name}"
except (KeyError, HTTPException):
pass
return title
[docs]def get_canarie_api_response(route_name: str, api_type: APIType, api_request: str) -> ResponseReturnValue:
"""
Provide a valid response for the CANARIE API request based on the service route.
:param route_name: Route name of the service/platform coming from the URL e.g. :
['pavics', 'node', 'bias', etc.]
:param api_type: Api type of the route which must be one of platform or service
:param api_request: The request specified in the URL
:returns: A valid HTML response
"""
# Factsheet is not part of the service API, so it's expected that the config will not be found
if api_type == "service" and api_request == "factsheet":
return make_error_response(http_status=404)
try:
cfg_val = get_config(route_name, api_type)["redirect"][api_request]
if cfg_val.find("http") == 0:
return redirect(cfg_val)
except KeyError:
pass
msg = (
f"The {api_type} does not provide in its configuration file a "
f"valid source for the CANARIE request {api_request}"
)
raise configparser.Error(msg)
[docs]def make_error_response(
http_status: Optional[int] = None,
http_status_response: Optional[str] = None,
) -> Tuple[Union[Response, str], int]:
"""
Make an error response based on the request type and given information.
:param http_status: HTTP status
:param http_status_response: Standard message associated with a status
code. Obtained via :py:data:`http.client.responses` if not
provided.
"""
if http_status is None or http_status < 100:
http_status = 500
# If the status response is None use the one provide by http.client
if http_status_response is None:
http_status_response = http.client.responses[http_status]
# Else, check if http_status_response already contains the HTML status code
else:
match = re.search("^([0-9]*):? *(.*)$", http_status_response)
if match and match.group(1) == str(http_status):
# In which case it is removed from the response
http_status_response = match.group(2)
if request_wants_json():
response = {
"status": http_status,
"description": http_status_response
}
return jsonify(response), http_status
html_response_header = f"{http_status} : {http_status_response}"
template = render_template("error.html",
Main_Title="Canarie API",
Title="Error",
html_response=html_response_header)
return template, http_status
[docs]def get_db(allow_cache: bool = True, connect: bool = True) -> sqlite3.Connection:
"""
Get a connection to an existing database.
If the database does not exist, create a connection to local sqlite3 file.
If the local sqlite3 file doesn't exist, initialize it using a schema.
Stores the established connection in the application's global context to reuse it whenever required.
"""
database = getattr(g, "_database", None)
if database is not None and allow_cache:
APP.logger.info("Database found. Reusing cached connection...")
elif connect:
APP.logger.info("Database not defined. Establishing connection...")
database_fn = APP.config["DATABASE"]["filename"]
APP.logger.debug("Using configured filename: [%s]", database_fn)
if not os.path.isabs(database_fn):
database_fn = os.path.join(APP.root_path, database_fn)
database_fn = os.path.abspath(database_fn)
APP.logger.debug("Setup database connection with filename: [%s]", database_fn)
db_exists = os.path.isfile(database_fn) # must resolve before connect otherwise file already created
try:
database = g._database = sqlite3.connect(database_fn)
except Exception as exc:
APP.logger.error(
"Error [%s] occurred during database connection with filename: [%s].",
str(exc), database_fn, exc_info=exc
)
APP.logger.debug("Reraise for error reporting.")
raise
APP.logger.debug("Initialize database with filename: [%s]", database_fn)
if db_exists:
APP.logger.debug("Skipping database initialization: [%s] (already exists)", database_fn)
else:
try:
init_db(database)
except Exception as exc:
APP.logger.error(
"Error [%s] occurred during database initialization with filename: [%s].",
str(exc), database_fn, exc_info=exc
)
APP.logger.debug("Closing database.")
database.close()
APP.logger.debug("Deleting database filename (reset for recreation): [%s].", database_fn)
os.remove(database_fn)
APP.logger.debug("Reraise for error reporting.")
raise
return database
[docs]def init_db(database: sqlite3.Connection) -> None:
"""
Initialize a database from a schema.
"""
APP.logger.debug("Initializing database")
with current_app.app_context():
dbs_fn = "database_schema.sql"
if os.path.isabs(dbs_fn):
schema_fn = dbs_fn
else:
schema_fn = os.path.join(APP.root_path, dbs_fn)
APP.logger.debug("Using schema filename : %s", schema_fn)
with current_app.open_resource(schema_fn, mode="r") as schema_f:
database.cursor().executescript(schema_f.read())
database.commit()
[docs]class DatabaseRetryFunction(Protocol):
[docs] def __call__(self, *args: Any, database: Optional[sqlite3.Connection] = None, **kwargs: Any) -> ReturnType:
...
[docs]def retry_db_error_after_init(func: DatabaseRetryFunction) -> DatabaseRetryFunction:
"""
Decorator that will retry a failing operation if an error related to database initialization occurred.
"""
@functools.wraps(func)
def retry(*args: Any, database: sqlite3.Connection = None, **kwargs: Any) -> ReturnType:
db_param = inspect.signature(func).parameters.get("database")
db = None
with APP.app_context():
if db_param and "sqlite3.Connection" in str(db_param.annotation):
db = database or get_db()
kwargs["database"] = db
try:
return func(*args, **kwargs)
except sqlite3.OperationalError as exc:
mod = getattr(func, "__module__", "")
mod = f"{mod}." if mod else ""
name = f"{mod}.{func.__name__}"
if "no such table" in str(exc):
if not db:
APP.logger.debug(
"Missing database parameter to retry operation [%s] after initialization.",
name,
)
else:
APP.logger.warning(
"Error from database [%s] during [%s] operation. Retrying after initialization.",
name, exc,
)
init_db(db)
return func(*args, **kwargs)
APP.logger.error(
"Error from database: [%s] during [%s] operation. Could not recover.",
name, exc,
)
raise
return retry
[docs]class AnyIntConverter(BaseConverter):
"""
Matches one of the items provided.
Items must be integer and comma separated with a space to avoid confusion
with floating point value in the parser.
For example::
1, 2, 3
And not::
1,2,3
Since it would parse as float 1,2 and 3.
"""
def __init__(self, mapping: Map, *items: Union[int, str]) -> None:
BaseConverter.__init__(self, mapping)
# Start by enforcing that x is an integer then convert it to string
[docs] self.regex = f"(?:{'|'.join([str(int(x)) for x in items])})"