# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module defines the Fastr Config class for managing the configuration of
Fastr. The config object is stored directly in the fastr top-level module.
"""
import __builtin__
import collections
import inspect
import json
import logging
import logging.config
import multiprocessing
import os
import tempfile
logging.captureWarnings(True)
[docs]class EmptyDefault(object):
[docs] def __init__(self, data=None):
self._list_data = [NotImplemented]
self._dict_data = collections.defaultdict(EmptyDefault)
if isinstance(data, collections.Mapping):
self._dict_data.update(data)
elif isinstance(data, collections.Sequence):
self._list_data = list(data)
# Any object be added in place (just replaces current value)
[docs] def __iadd__(self, right):
self._list_data += right
return self
[docs] def __add__(self, right):
return EmptyDefault(self._list_data + right)
[docs] def __radd__(self, other):
return EmptyDefault(other + self._list_data)
[docs] def append(self, value):
self._list_data.append(value)
[docs] def prepend(self, value):
self._list_data = [value] + self._list_data
[docs] def extend(self, other):
self._list_data.extend(other)
# Make this act like a dict updated by another dict
[docs] def update(self, other):
self._dict_data.update(other)
[docs] def __getitem__(self, item):
return self._dict_data[item]
[docs] def __setitem__(self, key, value):
self._dict_data[key] = value
[docs] def __delitem__(self, key):
del self._dict_data[key]
# Access to data
[docs] def aslist(self):
return list(self._list_data)
[docs] def asdict(self):
return dict(self._dict_data)
def _find_log_type():
"""
Figure out the logtype to use for this fastr session
:return: log type to us
:rtype: str
"""
# Use multiprocessing to check if we are actually in the MainProcess
# Subprocesses should not log in a standard way
current_process = multiprocessing.current_process()
# Hack setting non-standard log methods/destinations before fastr is imported
_stack = inspect.stack()
for frame in _stack[1:]:
if 'FASTR_LOG_TYPE' in frame[0].f_globals:
fastr_log_type = frame[0].f_globals['FASTR_LOG_TYPE']
# We only want to last definition of fastr_log_type
break
else:
if current_process.name != 'MainProcess':
#fastr_log_type = 'childprocess'
fastr_log_type = 'default'
else:
fastr_log_type = 'default'
return fastr_log_type
USER_DIR = os.path.expanduser(os.path.join('~', '.fastr'))
SYSTEM_DIR = os.path.abspath(os.path.normpath(os.path.dirname(__file__)))
RESOURCE_DIR = os.path.join(SYSTEM_DIR, 'resources')
[docs]class Config(object):
"""
Class contain the fastr configuration
"""
DEFAULT_FIELDS = {
"debug": (
bool,
False,
"Flag to enable/disable debugging"
),
"logtype": (
str,
_find_log_type(),
"Type of logging to use"
),
"systemdir": (
str,
SYSTEM_DIR,
"Fastr installation directory"
),
"userdir": (
str,
USER_DIR,
"Fastr user configuration directory",
"~/.fastr"
),
"logdir": (
str,
os.path.join(USER_DIR, 'logs'),
"Directory where the fastr logs will be placed",
"$userdir/logs"
),
"resourcesdir": (
str,
RESOURCE_DIR,
"Directory containing the fastr system resources",
"$systemdir/resources"
),
"examplesdir": (
str,
os.path.join(SYSTEM_DIR, 'examples'),
"Directory containing the fastr examples",
"$systemdir/examples"
),
"schemadir": (
str,
os.path.join(SYSTEM_DIR, 'resources', 'schemas'),
"Directory containing the fastr data schemas",
"$systemdir/schemas"
),
"executionscript": (
str,
os.path.join(SYSTEM_DIR, 'execution', 'executionscript.py'),
"Execution script location",
"$systemdir/execution/executionscript.py"
),
"types_path": (
list,
[x for x in [os.path.join(USER_DIR, 'datatypes'), os.path.join(RESOURCE_DIR, 'datatypes')] if os.path.exists(x)],
"Directories to scan for datatypes",
['$userdir/datatypes', '$resourcedir/datatypes']
),
"tools_path": (
list,
[x for x in [os.path.join(USER_DIR, 'tools'), os.path.join(RESOURCE_DIR, 'tools')] if os.path.exists(x)],
"Directories to scan for tools",
['$userdir/tools', '$resourcedir/tools']
),
"networks_path": (
list,
[x for x in [os.path.join(USER_DIR, 'networks'), os.path.join(RESOURCE_DIR, 'networks')] if os.path.exists(x)],
"Directories to scan for networks",
['$userdir/networks', '$resourcedir/networks']
),
"plugins_path": (
list,
[x for x in [os.path.join(USER_DIR, 'plugins'), os.path.join(RESOURCE_DIR, 'plugins')] if os.path.exists(x)],
"Directories to scan for plugins",
['$userdir/plugins', '$resourcedir/plugins']
),
"mounts": (
dict,
{
'tmp': tempfile.gettempdir(),
'example_data': os.path.join(SYSTEM_DIR, 'examples', 'data'),
'home': os.path.expanduser('~/'),
},
"A dictionary containing all mount points in the VFS system",
{
'tmp': '$TMPDIR',
'example_data': '$systemdir/examples/data',
'home': '~/'
}
),
"preferred_types": (
list,
[],
"A list indicating the order of the preferred types to use. First item is most preferred."
),
"protected_modules": (
list,
[],
"A list of modules in the environmnet modules that are protected against unloading"
),
"execution_plugin": (
str,
'ProcessPoolExecution',
"The default execution plugin to use"
),
"web_hostname": (
str,
'localhost',
"The hostname to expose the web app for"
),
"web_port": (
str,
'5000',
"The port to expose the web app on"
),
"web_secret_key": (
str,
'VERYSECRETKEY!',
"The secret key to use for the flask web app"
),
"warn_develop": (
bool,
True,
"Warning users on import if this is not a production version of fastr"
),
"source_job_limit": (
int,
4,
"The number of source jobs allowed to run concurrently"
),
"pim_host": (
str,
'',
"Host of the PIM server to report to"
)
}
# pylint: disable=too-many-instance-attributes
# The config has many attributes, because its function is to hold
# this data
[docs] def __init__(self, *configfiles):
#: Trace of the config files read by this object
self.read_config_files = []
self.log = None
#: Raw config of current values
self._fields = collections.OrderedDict(sorted(self.DEFAULT_FIELDS.items()))
self._create_field_properties(self._fields)
self._current_config = collections.defaultdict(EmptyDefault)
self._current_config.update(vars(__builtin__))
# Read default config files if found
if os.environ.get('FASTRHOME'):
if os.path.exists(os.path.join(os.environ['FASTRHOME'], 'config.py')):
self.read_config(os.path.join(os.environ['FASTRHOME'], 'config.py'))
if os.path.exists(os.path.join(self.userdir, 'config.py')):
self.read_config(os.path.join(self.userdir, 'config.py'))
# Read config files as parameters
for filename in configfiles:
if os.path.exists(filename):
self.read_config(filename)
else:
self.log.error('Config file {} does not exist!'.format(filename))
#: The logger used by fastr, set and updated by the Config object
self._update_logging()
[docs] def register_fields(self, fields_spec):
"""
Register extra fields to the configuration manager.
"""
if not isinstance(fields_spec, dict):
raise TypeError("The fields_spec argument should be a dict")
for name, field_spec in fields_spec.items():
if name not in self._fields:
self._fields[name] = field_spec
else:
if self._fields[name] != field_spec:
raise ValueError('Found a conflicting definition for the field {}'.format(name))
self._create_field_properties(fields_spec)
[docs] def get_field(self, item):
if item in self._fields:
# Get the field specification from the field definitions
field_spec = self._fields[item]
# Get the currently stored value of the config
value = self._current_config.get(item, None)
# Use the default in case the value is not found
if value is None:
value = field_spec[1]
# Catch cases without a valid defautt
if value is None:
raise ValueError('No default given for {}'.format(item))
# In case of dict/list, get data and merge with default
if isinstance(value, EmptyDefault):
if field_spec[0] is dict:
new_value = dict(field_spec[1])
new_value.update(value.asdict())
value = new_value
elif field_spec[0] is list:
new_value = []
for x in value.aslist():
if x is not NotImplemented:
new_value.append(x)
else:
new_value.extend(field_spec[1])
value = new_value
else:
raise TypeError('Config value and type do not match!')
# Validate the type against the field specification
if not isinstance(value, field_spec[0]):
raise TypeError('Config value for {} is of wrong type, expected {}, found {}'.format(item,
field_spec[0],
type(value).__name__))
return value
else:
raise KeyError('Field not found in config field specification!')
[docs] def set_field(self, item, value):
if item in self._fields:
# Get the field specification from the field definitions
field_spec = self._fields[item]
if not isinstance(value, field_spec[0]):
raise TypeError('Type mismatch, config field should be a {} found {}'.format(
field_spec[0],
type(value).__name__
))
# Set the value
self._current_config[item] = value
else:
raise KeyError('Field not found in config field specification!')
@classmethod
def _create_field_properties(cls, fields):
"""
Create properties for a dictionary of fields
:param fields: The mapping of the fields to create
"""
for field_name, field_value in fields.items():
if not hasattr(cls, field_name):
prop = cls._field_property(field_name, field_value[2])
setattr(cls, field_name, prop)
@staticmethod
def _field_property(field_name, field_doc=None):
"""
Create a property for a field
:param field_name: the name of the field
:param field_doc: the docstring for the field
:return: property to use
"""
def getter(obj):
return obj.get_field(field_name)
def setter(obj, value):
obj.set_field(field_name, value)
return property(getter, setter, field_doc)
[docs] def __repr__(self):
items = []
for key, spec in self._fields.items():
val = getattr(self, key)
if isinstance(val, (dict, list, basestring)):
val = json.dumps(val, indent=2)
items.append("# [{s[0].__name__}] {s[2]}\n{k} = {v}\n".format(s=spec, k=key, v=val))
return '\n'.join(items)
[docs] def read_config(self, filename):
"""
Read a configuration and update the configuration object accordingly
:param filename: the configuration file to read
"""
execfile(filename, {}, self._current_config)
for key, value in self._current_config.items():
if isinstance(value, (list, dict)):
self._current_config[key] = EmptyDefault(value)
self.read_config_files.append(filename)
self._update_logging()
[docs] def web_url(self):
""" Construct a fqdn from the web['hostname'] and web['port'] settings.
:return: FQDN
:rtype: str
"""
#if self.web['port'] == 80:
# return 'http://{}' .format(self.web['hostname'])
#elif self.web['port'] == 443:
# return 'https://{}' .format(self.web['hostname'])
#else:
# return 'http://{}:{}'.format(self.web['hostname'], self.web['port'])
return 'http://localhost'
def _update_logging(self):
"""
Update the logging using the current config settings.
"""
# Create log dir if needed (make sure it exists before other
# operation requiring logging) but after logdir is known
if not os.path.exists(self.logdir):
os.makedirs(self.logdir)
logging_definition = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'verbose': {
'format': '[%(processName)s::%(threadName)s] %(levelname)s: %(module)s:%(lineno)d >> %(message)s'
},
'console_simple': {
'format': '[%(levelname)s] %(module) 9s:%(lineno)04d >> %(message)s'
},
'console_minimal': {
'format': '%(levelname)s >> %(message)s'
},
},
'handlers': {
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'verbose' if self.debug else 'console_simple',
'stream': 'ext://sys.stdout',
},
'childprocess': {
'level': 'CRITICAL',
'class': 'logging.NullHandler',
},
'error_file': {
'level': 'ERROR',
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'verbose',
'filename': os.path.join(self.logdir, 'error.log'),
'maxBytes': 10 * 1024 * 1024,
'backupCount': 20,
},
'info_file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'verbose',
'filename': os.path.join(self.logdir, 'info.log'),
'maxBytes': 10 * 1024 * 1024,
'backupCount': 20,
},
'server_file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'verbose',
'filename': os.path.join(self.logdir, 'server.log'),
'maxBytes': 10 * 1024 * 1024,
'backupCount': 20,
},
'null_handler': {
'level': 'CRITICAL',
'class': 'logging.NullHandler',
}
},
'loggers': {
'fastr': {
'handlers': ['console', 'info_file', 'error_file'],
'propagate': True,
'level': 'DEBUG',
},
'py:warnings': {
'handlers': ['console', 'info_file', 'error_file'],
'propagate': True,
}
},
'root': {
'handlers': ['null_handler'],
'level': 'DEBUG'
}
}
fastr_log_type_options = {
'default': ['console', 'info_file', 'error_file'],
'server': ['server_file', 'console'],
'daemon': ['server_file'],
'console': ['console'],
'childprocess': ['childprocess'],
'worker': ['worker'],
'none': ['null_handler']
}
logging_definition['loggers']['fastr']['handlers'] = fastr_log_type_options[self.logtype]
if self.debug:
logging_definition['handlers']['console']['level'] = 'DEBUG'
logging_definition['handlers']['server_file']['level'] = 'DEBUG'
logging_definition['handlers']['debug_file'] = {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'verbose',
'filename': os.path.join(self.logdir, 'debug.log'),
'maxBytes': 10 * 1024 * 1024,
'backupCount': 20,
}
logging_definition['handlers']['childprocess'] = {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'verbose',
'stream': 'ext://sys.stdout',
}
logging_definition['loggers']['fastr']['handlers'].append('debug_file')
else:
logging_definition['handlers']['console']['level'] = 'INFO'
logging_definition['handlers']['server_file']['level'] = 'INFO'
try:
del logging_definition['handlers']['debug_file']
except KeyError:
pass
try:
logging_definition['loggers']['fastr']['handlers'].remove('debug_file')
except ValueError:
pass
logging_definition['loggers']['fastr']['level'] = 'INFO'
logging.config.dictConfig(logging_definition)
if self.log is None:
self.log = logging.getLogger('fastr')
self.log.debug('Setting up the FASTR environment')
else:
self.log.debug('Updated fastr logging')
self.log.debug('Log directory: {}'.format(self.logdir))
self.log.debug('Using log type: {} (debug: {})'.format(self.logtype, self.debug))