263 lines
8.6 KiB
Python
263 lines
8.6 KiB
Python
"""
|
|
This module contains utilities for allowing dataclasses to be populated by
|
|
user-provided configuration (e.g. from a Yaml file).
|
|
|
|
.. note::
|
|
On naming conventions: this module converts hyphens in key names to
|
|
underscores as a matter of course.
|
|
"""
|
|
|
|
import dataclasses
|
|
import re
|
|
from typing import Optional, Set, Type, Union
|
|
|
|
from asn1crypto.core import BitString, ObjectIdentifier
|
|
|
|
from .errors import ConfigurationError
|
|
|
|
__all__ = [
|
|
'ConfigurableMixin',
|
|
'check_config_keys',
|
|
'OID_REGEX',
|
|
'process_oid',
|
|
'process_oids',
|
|
'process_bit_string_flags',
|
|
]
|
|
|
|
_noneType = type(None)
|
|
|
|
|
|
def _unwrap_type_annot(thing) -> Optional[type]:
|
|
if isinstance(thing, type):
|
|
the_type = thing
|
|
else:
|
|
from typing import get_args, get_origin
|
|
|
|
# is it an optional? (i.e. Union[X, None])
|
|
# if so, retrieve the wrapped type
|
|
if get_origin(thing) is not Union:
|
|
return None
|
|
try:
|
|
type1, type2 = get_args(thing)
|
|
if type2 is not _noneType:
|
|
return None
|
|
except (ValueError, TypeError):
|
|
return None
|
|
the_type = type1
|
|
return the_type if isinstance(the_type, type) else None
|
|
|
|
|
|
def _has_default(f: dataclasses.Field):
|
|
return (
|
|
f.default_factory is not dataclasses.MISSING
|
|
or f.default is not dataclasses.MISSING
|
|
)
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class ConfigurableMixin:
|
|
"""General configuration mixin for dataclasses"""
|
|
|
|
@classmethod
|
|
def process_entries(cls, config_dict):
|
|
"""
|
|
Hook method that can modify the configuration dictionary
|
|
to overwrite or tweak some of their values (e.g. to convert string
|
|
parameters into more complex Python objects)
|
|
|
|
Subclasses that override this method should call
|
|
``super().process_entries()``, and leave keys that they do not
|
|
recognise untouched.
|
|
|
|
:param config_dict:
|
|
A dictionary containing configuration values.
|
|
:raises ConfigurationError:
|
|
when there is a problem processing a relevant entry.
|
|
"""
|
|
pass
|
|
|
|
@classmethod
|
|
def _process_configurable_fields(cls, config_dict):
|
|
# automatically parse values for configurable fields
|
|
for f in dataclasses.fields(cls):
|
|
field_type = _unwrap_type_annot(f.type)
|
|
if field_type is None or not issubclass(
|
|
field_type, ConfigurableMixin
|
|
):
|
|
continue
|
|
# if the field has a value in the config dict, attempt to parse it
|
|
try:
|
|
field_config_dict = config_dict[f.name]
|
|
except KeyError:
|
|
continue
|
|
try:
|
|
field_value = field_type.from_config(field_config_dict)
|
|
except ConfigurationError as e:
|
|
raise ConfigurationError(
|
|
f"Error while processing configurable field '{f.name}': "
|
|
f"{repr(e)}"
|
|
) from e
|
|
config_dict[f.name] = field_value
|
|
|
|
@classmethod
|
|
def check_config_keys(cls, keys_supplied: Set[str]):
|
|
"""
|
|
Check whether all supplied keys are meaningful.
|
|
|
|
:param keys_supplied:
|
|
The keys supplied in the configuration.
|
|
:raises ConfigurationError: if at least one key does not make sense.
|
|
"""
|
|
|
|
check_config_keys(
|
|
cls.__name__,
|
|
{f.name for f in dataclasses.fields(cls)},
|
|
keys_supplied,
|
|
)
|
|
|
|
@classmethod
|
|
def from_config(cls, config_dict):
|
|
"""
|
|
Attempt to instantiate an object of the class on which it is called,
|
|
by means of the configuration settings passed in.
|
|
|
|
First, we check that the keys supplied in the dictionary correspond
|
|
to data fields on the current class.
|
|
Then, the dictionary is processed using the :meth:`process_entries`
|
|
method. The resulting dictionary is passed to the initialiser
|
|
of the current class as a kwargs dict.
|
|
|
|
:param config_dict:
|
|
A dictionary containing configuration values.
|
|
:return:
|
|
An instance of the class on which it is called.
|
|
:raises ConfigurationError:
|
|
when an unexpected configuration key is encountered or left
|
|
unfilled, or when there is a problem processing one of the config
|
|
values.
|
|
"""
|
|
if not isinstance(config_dict, dict):
|
|
raise ConfigurationError(
|
|
f"{cls.__name__} requires a dictionary to initialise."
|
|
)
|
|
cls.check_config_keys(set(config_dict.keys()))
|
|
# in Python we need underscores
|
|
config_dict = {
|
|
key.replace('-', '_'): v for key, v in config_dict.items()
|
|
}
|
|
|
|
cls.process_entries(config_dict)
|
|
|
|
cls._process_configurable_fields(config_dict)
|
|
|
|
enforce_required_keys(
|
|
cls.__name__,
|
|
{f.name for f in dataclasses.fields(cls) if not _has_default(f)},
|
|
config_dict,
|
|
)
|
|
try:
|
|
# noinspection PyArgumentList
|
|
return cls(**config_dict)
|
|
except TypeError as e: # pragma: nocover
|
|
raise ConfigurationError("Failed to instantiate from config") from e
|
|
|
|
|
|
def check_config_keys(config_name, expected_keys, supplied_keys):
|
|
# wrapper function to provide user-friendly errors
|
|
# (mainly intended for the CLI)
|
|
# This does not check whether all required keys are present, that happens
|
|
# later
|
|
# TODO What about type checking?
|
|
unexpected_keys = _check_subset(supplied_keys, expected_keys)
|
|
if unexpected_keys:
|
|
# this is easier to present to the user than a TypeError
|
|
raise ConfigurationError(
|
|
f"Unexpected {'key' if len(unexpected_keys) == 1 else 'keys'} "
|
|
f"in configuration for {config_name}: "
|
|
f"{', '.join(unexpected_keys)}."
|
|
)
|
|
|
|
|
|
def _check_subset(expected_sub, expected_sup):
|
|
# standardise on dashes for the yaml interface
|
|
expected_sub = {key.replace('_', '-') for key in expected_sub}
|
|
expected_sup = {key.replace('_', '-') for key in expected_sup}
|
|
return expected_sub - expected_sup
|
|
|
|
|
|
def enforce_required_keys(config_name, required_keys, config_dict):
|
|
missing_keys = _check_subset(required_keys, config_dict.keys())
|
|
if missing_keys:
|
|
# this is easier to present to the user than a TypeError
|
|
raise ConfigurationError(
|
|
f"Missing required {'key' if len(missing_keys) == 1 else 'keys'} "
|
|
f"in configuration for {config_name}: "
|
|
f"{', '.join(missing_keys)}."
|
|
)
|
|
|
|
|
|
OID_REGEX = re.compile(r'\d(\.\d+)+')
|
|
|
|
|
|
def process_oid(
|
|
asn1crypto_class: Type[ObjectIdentifier], id_string, param_name
|
|
):
|
|
if not isinstance(id_string, str):
|
|
raise ConfigurationError(
|
|
f"Identifier '{repr(id_string)}' in '{param_name}' is not a string."
|
|
)
|
|
if OID_REGEX.fullmatch(id_string):
|
|
# Attempt to translate OID strings to human-readable form
|
|
# whenever possible
|
|
return asn1crypto_class.map(id_string)
|
|
else:
|
|
try:
|
|
# try to see if the usage string maps to an OID before
|
|
# proceeding
|
|
asn1crypto_class.unmap(id_string)
|
|
except ValueError:
|
|
raise ConfigurationError(
|
|
f"'{id_string}' in '{param_name}' is not a valid "
|
|
f"{asn1crypto_class.__name__}"
|
|
)
|
|
return id_string
|
|
|
|
|
|
def _ensure_strings(strings, param_name):
|
|
err_msg = (
|
|
f"'{param_name}' must be specified as a "
|
|
"list of strings, or a string."
|
|
)
|
|
if isinstance(strings, str):
|
|
strings = (strings,)
|
|
elif not isinstance(strings, (list, tuple)):
|
|
raise ConfigurationError(err_msg)
|
|
|
|
return strings
|
|
|
|
|
|
def process_oids(asn1crypto_class: Type[ObjectIdentifier], strings, param_name):
|
|
strings = _ensure_strings(strings, param_name)
|
|
for usage_string in strings:
|
|
yield process_oid(asn1crypto_class, usage_string, param_name)
|
|
|
|
|
|
def process_bit_string_flags(
|
|
asn1crypto_class: Type[BitString], strings, param_name
|
|
):
|
|
strings = _ensure_strings(strings, param_name)
|
|
valid_values = asn1crypto_class._map.values()
|
|
for flag_string in strings:
|
|
if not isinstance(flag_string, str):
|
|
raise ConfigurationError(
|
|
f"Flag identifier '{repr(flag_string)}' is not a string."
|
|
)
|
|
elif flag_string not in valid_values:
|
|
raise ConfigurationError(
|
|
f"'{repr(flag_string)}' is not a valid "
|
|
f"{asn1crypto_class.__name__} flag name."
|
|
)
|
|
# Use yield to keep the API consistent with process_oids, we don't
|
|
# change anything
|
|
yield flag_string
|