This module defines the Fastr Config class for managing the configuration of
Fastr. The config object is stored directly in the fastr top-level module.

import builtins
import collections
import importlib
import inspect
import json
import logging
from logging import StreamHandler, NullHandler, Formatter
from logging.handlers import RotatingFileHandler
import os
import pkg_resources
import sys
import tempfile

from .events import FastrLogEventHandler
from ..globals import get_current_run


class FastrLogRecordFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
        current_run = get_current_run()
        if current_run is not None:
            record.run_id =
            record.tracking_id = current_run.tracking_id or ''
        else:
            record.run_id = ''
            record.tracking_id = ''
        return True
class EmptyDefault(object):
    """
    Empty defaultdict.
    """
def __init__(self, data=None):
        self._list_data = [NotImplemented]
        self._dict_data = collections.defaultdict(EmptyDefault)

        if isinstance(data, collections.Mapping):
            self._dict_data.update(data)
        elif isinstance(data, collections.Sequence):
            self._list_data = list(data)
# Any object be added in place (just replaces current value)
def __iadd__(self, right):
        self._list_data += right
        return self
def __add__(self, right):
        return EmptyDefault(self._list_data + right)
def __radd__(self, other):
        return EmptyDefault(other + self._list_data)
def append(self, value):
        self._list_data.append(value)
def prepend(self, value):
        self._list_data = [value] + self._list_data
def extend(self, other):
        self._list_data.extend(other)
# Make this act like a dict updated by another dict
def update(self, other):
        self._dict_data.update(other)
def merge_default(self, field_spec):
        """
        Merge the default into this EmptyDefault given the field spec

        :param field_spec: Field specification
        :return: Merged value
        """
        if field_spec[0] is dict:
            new_value = dict(field_spec[1])
            new_value.update(self.asdict())
            value = new_value
        elif field_spec[0] is list:
            new_value = []
            for x in self.aslist():
                if x is not NotImplemented:
                    new_value.append(x)
                else:
                    new_value.extend(field_spec[1])
            value = new_value
        else:
            raise TypeError('Config value and type do not match!')

        return value
def __getitem__(self, item):
        return self._dict_data[item]
def __setitem__(self, key, value):
        self._dict_data[key] = value
def __delitem__(self, key):
        del self._dict_data[key]
# Access to data
def aslist(self):
        return list(self._list_data)
def asdict(self):
        return dict(self._dict_data)
def _find_log_type(): """ Figure out the logtype to use for this fastr session :return: log type to us :rtype: str """ # Hack setting non-standard log methods/destinations before fastr is imported _stack = inspect.stack() for frame in _stack[1:]: if 'FASTR_LOG_TYPE' in frame[0].f_globals: fastr_log_type = frame[0].f_globals['FASTR_LOG_TYPE'] # We only want to last definition of fastr_log_type break else: fastr_log_type = 'default' return fastr_log_type USER_DIR = os.environ.get('FASTRHOME', os.path.expanduser(os.path.join('~', '.fastr'))) FASTR_CONFIG_DIRS = os.environ.get('FASTR_CONFIG_DIRS', '').split(os.pathsep) SYSTEM_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) RESOURCE_DIR = os.path.join(SYSTEM_DIR, 'resources')
class Config(object):
    """
    Class contain the fastr configuration
    """

    DEFAULT_FIELDS = {
        "logging_config": (
            dict,
            {},
            "Python logger config"
        ),
        "extra_config_dirs": (
            list,
            FASTR_CONFIG_DIRS,
            "Extra configuration directories to read"
        ),
        "debug": (
            bool,
            False,
            "Flag to enable/disable debugging"
        ),
        "logtype": (
            str,
            _find_log_type(),
            "Type of logging to use"
        ),
        "log_to_file": (
            bool,
            False,
            "Indicate if default logging settings should log to files or not"
        ),
        "loglevel": (
            int,
            20,
            "The log level to use (as int), INFO is 20, WARNING is 30, etc"
        ),
        "systemdir": (
            str,
            SYSTEM_DIR,
            "Fastr installation directory",
            'Directory of the top-level fastr package'
        ),
        "userdir": (
            str,
            USER_DIR,
            "Fastr user configuration directory",
            "$FASTRHOME or ~/.fastr"
        ),
        "logdir": (
            str,
            os.path.join(USER_DIR, 'logs'),
            "Directory where the fastr logs will be placed",
            "$userdir/logs"
        ),
        "resourcesdir": (
            str,
            RESOURCE_DIR,
            "Directory containing the fastr system resources",
            "$systemdir/resources"
        ),
        "examplesdir": (
            str,
            os.path.join(SYSTEM_DIR, 'examples'),
            "Directory containing the fastr examples",
            "$systemdir/examples"
        ),
        "schemadir": (
            str,
            os.path.join(SYSTEM_DIR, 'resources', 'schemas'),
            "Directory containing the fastr data schemas",
            "$systemdir/schemas"
        ),
        "executionscript": (
            str,
            os.path.join(SYSTEM_DIR, 'execution', ''),
            "Execution script location",
            "$systemdir/execution/"
        ),
        "types_path": (
            list,
            [x for x in [os.path.join(USER_DIR, 'datatypes'), os.path.join(RESOURCE_DIR, 'datatypes')] if
             os.path.exists(x)],
            "Directories to scan for datatypes",
            ['$userdir/datatypes', '$resourcedir/datatypes']
        ),
        "tools_path": (
            list,
            [x for x in [os.path.join(USER_DIR, 'tools'), os.path.join(RESOURCE_DIR, 'tools')] if os.path.exists(x)],
            "Directories to scan for tools",
            ['$userdir/tools', '$resourcedir/tools']
        ),
        "networks_path": (
            list,
            [x for x in [os.path.join(USER_DIR, 'networks'), os.path.join(RESOURCE_DIR, 'networks')] if
             os.path.exists(x)],
            "Directories to scan for networks",
            ['$userdir/networks', '$resourcedir/networks']
        ),
        "plugins_path": (
            list,
            [x for x in [os.path.join(USER_DIR, 'plugins'), os.path.join(RESOURCE_DIR, 'plugins')] if
             os.path.exists(x)],
            "Directories to scan for plugins",
            ['$userdir/plugins', '$resourcedir/plugins']
        ),
        "mounts": (
            dict,
            {
                'tmp': tempfile.gettempdir(),
                'example_data': os.path.join(SYSTEM_DIR, 'examples', 'data'),
                'home': os.path.expanduser('~'),
            },
            "A dictionary containing all mount points in the VFS system",
            {
                'tmp': '$TMPDIR',
                'example_data': '$systemdir/examples/data',
                'home': '~/'
            }
        ),
        "preferred_types": (
            list,
            [],
            "A list indicating the order of the preferred types to use. First item is most preferred."
        ),
        "protected_modules": (
            list,
            [],
            "A list of modules in the environmnet modules that are protected against unloading"
        ),
        "execution_plugin": (
            str,
            'ProcessPoolExecution',
            "The default execution plugin to use"
        ),
        "reporting_plugins": (
            list,
            ['SimpleReport'],
            "The reporting plugins to use, is a list of all plugins to be activated"
        ),
        "web_hostname": (
            str,
            'localhost',
            "The hostname to expose the web app for"
        ),
        "warn_develop": (
            bool,
            True,
            "Warning users on import if this is not a production version of fastr"
        ),
        "source_job_limit": (
            int,
            0,
            "The number of source jobs allowed to run concurrently"
        ),
        "job_cleanup_level": (
            str,
            'no_cleanup',
            "The level of cleanup required, options: all, no_cleanup, non_failed",
            'no_cleanup',
            lambda x: x in ['all', 'no_cleanup', 'non_failed'],
        ),
        "filesynchelper_url": (
            str,
            '',
            'Redis url e.g. redis://localhost:6379'
        ),
        "queue_report_interval": (
            int,
            0,
            "Interval in which to report the number of queued jobs (default is 0, no reporting)"
        )
    }

    # pylint: disable=too-many-instance-attributes
    # The config has many attributes, because its function is to hold
    # this data
def __init__(self, *configfiles):
        #: Trace of the config files read by this object
        self.read_config_files = []
        self.log = None

        #: Raw config of current values
        self._fields = collections.OrderedDict(sorted(self.DEFAULT_FIELDS.items()))
        self._create_field_properties(self._fields)

        self._current_config = collections.defaultdict(EmptyDefault)
        self._current_config.update(vars(builtins))
        self._current_config['SYSTEM_DIR'] = SYSTEM_DIR
        self._current_config['USER_DIR'] = USER_DIR

        # Read default config files if found
        if os.path.exists(os.path.join(self.userdir, '')):
            self.read_config(os.path.join(self.userdir, ''))

        extra_configs = [os.path.join(self.userdir, 'config.d')]
        entry_point_dirs = []
        for entrypoint in pkg_resources.iter_entry_points('fastr.config'):
            if entrypoint.startswith('func:'):
                # Import function and execute with no arguments
                func_spec = entrypoint[5:]
                try:
                    module_spec, func_name = func_spec.rsplit('.', 1)
                    mod = importlib.import_module(module_spec)
                    func = getattr(mod, func_name)
                    entry_point_dirs.append(func())
                # Broad exception because this runs untrusted code and we do
                # not want to crash fastr due to other packages fucking up
                except Exception as exception:
                    self.log.error(f'Encountered errors when importing config from entrypoint {entrypoint}: {exception}')
            elif entrypoint.startswith('dir:'):
                entry_point_dirs.append(entrypoint[4:])

        extra_configs.extend(entry_point_dirs)
        extra_configs.extend(self.extra_config_dirs)

        for config_d in extra_configs:
            if os.path.isdir(config_d):
                for potential_config in sorted(os.listdir(config_d)):
                    candidate = os.path.join(config_d, potential_config)
                    if candidate.endswith('.py') and os.path.isfile(candidate):
                        self.read_config(candidate)

        # Read config files as parameters
        for filename in configfiles:
            if os.path.exists(filename):
                self.read_config(filename)
            else:
                self.log.error('Config file {} does not exist!'.format(filename))

        #: The logger used by fastr, set and updated by the Config object
        self._update_logging()
def register_fields(self, fields_spec):
        """
        Register extra fields to the configuration manager.
        """
        if not isinstance(fields_spec, dict):
            raise TypeError("The fields_spec argument should be a dict")

        for name, field_spec in fields_spec.items():
            if name not in self._fields:
                self._fields[name] = field_spec
            else:
                if self._fields[name] != field_spec:
                    raise ValueError('Found a conflicting definition for the field {}'.format(name))

        self._create_field_properties(fields_spec)
def get_field(self, item):
        if item in self._fields:
            # Get the field specification from the field definitions
            field_spec = self._fields[item]

            # Get the currently stored value of the config
            value = self._current_config.get(item, None)

            # Use the default in case the value is not found
            if value is None:
                value = field_spec[1]

            # Catch cases without a valid defautt
            if value is None:
                raise ValueError('No default given for {}'.format(item))

            # In case of dict/list, get data and merge with default
            if isinstance(value, EmptyDefault):
                value = value.merge_default(field_spec)

            # Validate the type against the field specification
            if not isinstance(value, field_spec[0]):
                raise TypeError(
                    'Config value for {} is of wrong type, expected {}, found {}'.format(item, field_spec[0],
                                                                                          type(value).__name__))

            return value
        else:
            raise KeyError('Field not found in config field specification!')
def set_field(self, item, value):
        if item in self._fields:
            # Get the field specification from the field definitions
            field_spec = self._fields[item]

            if not isinstance(value, field_spec[0]):
                raise TypeError('Type mismatch, config field should be a {} found {}'.format(
                    field_spec[0], type(value).__name__
                ))

            # Set the value
            self._current_config[item] = value
            self._update_logging()
        else:
            raise KeyError('Field not found in config field specification!')
@classmethod def _create_field_properties(cls, fields): """ Create properties for a dictionary of fields :param fields: The mapping of the fields to create """ for field_name, field_value in fields.items(): if not hasattr(cls, field_name): prop = cls._field_property(field_name, field_value[2]) setattr(cls, field_name, prop) @staticmethod def _field_property(field_name, field_doc=None): """ Create a property for a field :param field_name: the name of the field :param field_doc: the docstring for the field :return: property to use """ def getter(obj): return obj.get_field(field_name) def setter(obj, value): obj.set_field(field_name, value) return property(getter, setter, field_doc)
def __repr__(self):
        items = []
        for key, spec in self._fields.items():
            val = getattr(self, key)
            if isinstance(val, (dict, list, str)):
                val = json.dumps(val, indent=2)
            items.append("# [{s[0].__name__}] {s[2]}\n{k} = {v}\n".format(s=spec, k=key, v=val))
        return '\n'.join(items)
def
[docs] def read_config_string(self, value): self.read_config_files.append('from string:\n{}'.format(value)) exec(value, {}, self._current_config) for key, value in self._current_config.items(): if isinstance(value, (list, dict)): self._current_config[key] = EmptyDefault(value) self._update_logging()
[docs] def web_url(self): """ Construct a fqdn from the web['hostname'] and web['port'] settings. :return: FQDN :rtype: str """ return 'http://localhost'
def _update_logging(self): """ Update the logging using the current config settings. """ # Create log dir if needed (make sure it exists before other # operation requiring logging) but after logdir is known if not os.path.exists(self.logdir): os.makedirs(self.logdir) # Set formatters verbose_formatter = Formatter( '%(asctime)s [%(processName)s::%(threadName)s] %(levelname)s: %(module)s:%(lineno)d >> %(message)s' ) console_simple_formatter = Formatter('%(tracking_id)s [%(levelname)s] %(module) 9s:%(lineno)04d >> %(message)s') report_handler = FastrLogEventHandler() report_handler.setLevel('INFO') console_handler = StreamHandler(sys.stdout) console_handler.setLevel('DEBUG' if self.debug else self.loglevel) if self.debug: console_handler.setFormatter(verbose_formatter) else: console_handler.setFormatter(console_simple_formatter) handlers = ['console'] extra_default_handlers = [] if self.log_to_file: info_file_handler = RotatingFileHandler( filename=os.path.join(self.logdir, 'info.log'), maxBytes=10*1024**2, backupCount=20, ) info_file_handler.setLevel('INFO') info_file_handler.setFormatter(verbose_formatter) error_file_handler = RotatingFileHandler( filename=os.path.join(self.logdir, 'error.log'), maxBytes=10*1024**2, backupCount=20, ) error_file_handler.setLevel('ERROR') error_file_handler.setFormatter(verbose_formatter) handlers += ['info_file', 'error_file'] extra_default_handlers += [info_file_handler, error_file_handler] null_handler = NullHandler() null_handler.setLevel('CRITICAL') logging_definition = { 'version': 1, 'loggers': { 'fastr': { 'handlers': handlers, 'propagate': True, 'level': 'DEBUG', }, 'py:warnings': { 'handlers': handlers, 'propagate': True, } }, 'root': { 'handlers': ['null_handler'], 'level': 'DEBUG' } } fastr_log_type_options = { 'default': [console_handler, report_handler] + extra_default_handlers, 'console': [console_handler, report_handler], 'childprocess': [null_handler], 'none': [null_handler] } if self.log is None: self.log = logging.getLogger('fastr') self.log.propagate = False self.log.setLevel('DEBUG' if self.debug else 'INFO') self.log.debug('Setting up the FASTR environment') for handler in fastr_log_type_options[self.logtype]: self.log.addHandler(handler) record_filter = FastrLogRecordFilter() self.log.addFilter(record_filter) warning_logger = logging.getLogger('py.warnings') warning_logger.propagate = False warning_logger.setLevel('WARNING') for handler in fastr_log_type_options[self.logtype]: warning_logger.addHandler(handler) else: self.log.debug('Updated fastr logging') self.log.debug('Log config: {}'.format(logging_definition)) self.log.debug('Log directory: {}'.format(self.logdir)) self.log.debug('Using log type: {} (debug: {})'.format(self.logtype, self.debug)) def _deep_update(self, dict_left, dict_right): for key, value in dict_right.items(): if isinstance(value, dict): result = self._deep_update(dict_left.get(key, {}), value) dict_left[key] = result else: dict_left[key] = dict_right[key] return dict_left