Source code for fastrinterface

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from collections import namedtuple, OrderedDict, Mapping
import os
import sys
import re

import fastr
from fastr import exceptions
from fastr.core.interface import Interface, InterfaceResult, InputSpec, OutputSpec
from fastr.core.baseplugin import Plugin
from fastr.core.pluginmanager import PluginSubManager
from fastr.core.serializable import Serializable
from fastr.datatypes import EnumType, TypeGroup, URLType


class CollectorPlugin(Plugin):
    """
    :py:class:`CollectorPlugins <fastr.resources.plugins.interfaceplugins.fasterinterface.CollectorPlugin>`
    are used for finding and collecting the output data of outputs part of a
    :py:class:`FastrInterface <fastr.resources.plugins.interfaceplugins.fasterinterface.FasterInterface>`
    """

    # Signal managers that this plugin should be stored instantiated
    _instantiate = True

    def __init__(self):
        """
        Constructor
        """
        super(CollectorPlugin, self).__init__()
        self.job = None

    @property
    def fullid(self):
        """
        The full id of the plugin
        """
        if self.job is not None:
            return 'fastr://plugins/collectorplugins/{}/{}'.format(self.id, self.job.jobid)
        else:
            return 'fastr://plugins/collectorplugins/{}'.format(self.id)

    def collect_results(self, interface, output, result):
        """
        Start the collection of the results. This method calls the actual
        implementation from the subclass (_collect_results) and wraps it
        with some convience functionality.

        :param interface: Job to collect data for
        :param output: Output to collect data for
        """
        self._collect_results(interface, output, result)

    @abstractmethod
    def _collect_results(self, job, output, result):
        """
        Placeholder method for the actual collection of the results. This
        method should implemented by subclasses.

        :param job: Job to collect data for
        :param output: Output to collect data for
        """
        pass

    def _regexp_path(self, path):
        """
        Helper function that searches for a regular expression in a path. There
        can be wildcards in any level of the path.

        :param path: path with regular expressions
        :return: list of paths that match the path pattern
        """
        # Get a clean, absolute path to work with
        path = os.path.expanduser(path)
        path = os.path.abspath(path)

        subpaths = self._full_split(path)
        if subpaths[0] != os.path.sep and not re.match(r'[a-zA-Z]:[/\\]', subpaths[0]):
            raise ValueError('path should always contain an absolute path (subpaths: {})'.format(subpaths))

        basepath = subpaths[0]

        pathlist = [basepath]
        fastr.log.debug('Basepath: {}'.format(basepath))
        for subpath in subpaths[1:]:
            subpath = '^' + subpath + '$'
            # Test the regexp and give a more understandable error if it fails
            try:
                re.compile(subpath)
            except Exception as detail:
                raise ValueError('Error parsing regexp "{}": {}'.format(subpath, detail))

            # Scan new level for matches
            newpathlist = []
            for curpath in pathlist:
                contents = os.listdir(curpath)
                for option in contents:
                    if re.match(subpath, option):
                        newpathlist.append(os.path.join(curpath, option))

            pathlist = newpathlist

        return pathlist

    @staticmethod
    def _full_split(urlpath):
        """
        Split an urls path completely into parts

        :param urlpath: path part of the url
        :return: a list of parts
        """
        parts = []
        while True:
            newpath, tail = os.path.split(urlpath)
            if newpath == urlpath:
                assert not tail
                if urlpath:
                    parts.append(urlpath)
                break
            parts.append(tail)
            urlpath = newpath
        parts.reverse()
        return parts


class CollectorPluginManager(PluginSubManager):
    """
    Container holding all the CollectorPlugins
    """

    def __init__(self):
        """
        Create the Coll
        :param path:
        :param recursive:
        :return:
        """
        super(CollectorPluginManager, self).__init__(parent=fastr.plugin_manager,
                                                     plugin_class=self.plugin_class)
        self._key_map = {}

    @property
    def plugin_class(self):
        """
        The class of the Plugins in the collection
        """
        return CollectorPlugin

    @property
    def _instantiate(self):
        """
        Indicate that the plugins should instantiated before stored
        """
        return True

    def __keytransform__(self, key):
        try:
            return self._key_map[key]
        except KeyError:
            self._key_map.clear()
            for id_, value in self.data.items():
                self._key_map[value.id] = id_
            return self._key_map[key]

    def __iter__(self):
        for value in self.data.values():
            yield value.id


class HiddenFieldMap(Mapping):
    def __init__(self, *args, **kwargs):
        self._data = OrderedDict(*args, **kwargs)

    def __getitem__(self, item):
        return self._data[item]

    def __len__(self):
        return len(self._data)

    def __iter__(self):
        for key, value in self._data.items():
            if not (hasattr(value, 'hidden') and value.hidden):
                yield key


class FastrInterface(Interface):
    """
    The default Interface for fastr. For the command-line Tools as used by
    fastr.
    """

    __dataschemafile__ = 'FastrInterface.schema.json'

    collectors = CollectorPluginManager()
    collector_plugin_type = CollectorPlugin

[docs] def __init__(self, id_, document): super(FastrInterface, self).__init__() # Load from file if it is not a dict if not isinstance(document, dict): fastr.log.debug('Trying to load file: {}'.format(document)) filename = os.path.expanduser(document) filename = os.path.abspath(filename) document = self._loadf(filename) else: document = self.get_serializer().instantiate(document) #: The ID of the interface self.id = id_ #: List of input parameter descriptions self.input_map = OrderedDict() #: List of output parameter descriptions self.output_map = OrderedDict() # Parse input and output fields into parameter_description objects for n_order, input_ in enumerate(document['inputs']): self.input_map[input_['id']] = InputParameterDescription(self, input_, n_order) n_inputs = len(self.input_map) for n_order, output in enumerate(document['outputs']): self.output_map[output['id']] = OutputParameterDescription(self, output, n_order + n_inputs) # Create the inputs/outputs spec to expose to the rest of the system self._inputs = HiddenFieldMap((k, InputSpec(id_=v.id, cardinality=v.cardinality, datatype=v.datatype, required=v.required, description=v.description, default=v.default, hidden=v.hidden)) for k, v in self.input_map.items()) self._outputs = HiddenFieldMap((k, OutputSpec(id_=v.id, cardinality=v.cardinality, datatype=v.datatype, automatic=v.automatic, required=v.required, description=v.description, hidden=v.hidden)) for k, v in self.output_map.items())
[docs] def __eq__(self, other): if not isinstance(other, FastrInterface): return NotImplemented return vars(self) == vars(other)
[docs] def __getstate__(self): """ Get the state of the FastrInterface object. :return: state of interface :rtype: dict """ state = { 'id': self.id, 'class': type(self).__name__, 'inputs': [x.__getstate__() for x in self.input_map.values()], 'outputs': [x.__getstate__() for x in self.output_map.values()], } return state
[docs] def __setstate__(self, state): """ Set the state of the Interface """ self.id = state['id'] self.input_map = OrderedDict() self.output_map = OrderedDict() state['inputs'] = state['inputs'] or {} for order, x in enumerate(state['inputs']): self.input_map[x['id']] = InputParameterDescription(self, x, order) state['outputs'] = state['outputs'] or {} for order, x in enumerate(state['outputs']): self.output_map[x['id']] = OutputParameterDescription(self, x, order) # Create the inputs/outputs spec to expose to the rest of the system self._inputs = HiddenFieldMap((k, InputSpec(id_=v.id, cardinality=v.cardinality, datatype=v.datatype, required=v.required, description=v.description, default=v.default, hidden=v.hidden)) for k, v in self.input_map.items()) self._outputs = HiddenFieldMap((k, OutputSpec(id_=v.id, cardinality=v.cardinality, datatype=v.datatype, automatic=v.automatic, required=v.required, description=v.description, hidden=v.hidden)) for k, v in self.output_map.items())
@property def inputs(self): return self._inputs @property def outputs(self): return self._outputs
[docs] def expanding(self): return 0
[docs] def execute(self, target, payload): """ Execute the interface using a specific target and payload (containing a set of values for the arguments) :param target: the target to use :type target: :py:class:`SampleId <fastr.core.target.Target>` :param dict payload: the values for the arguments :return: result of the execution :rtype: InterfaceResult """ fastr.log.info('Execution payload: {}'.format(payload)) command = [target.binary] + self.get_arguments(payload) log_data = target.run_command(command) result = InterfaceResult(result_data={}, log_data=log_data, payload=payload) # TODO: add the collection of results and log store here fastr.log.info('Collecting results') for output in self.outputs.values(): if not output.automatic: if output.id in payload['outputs']: result.result_data[output.id] = payload['outputs'][output.id] elif output.required: raise exceptions.FastrValueError('Required output {} not in payload!'.format(output.id)) try: self.collect_results(result) except exceptions.FastrError: fastr.log.error('Encountered an error when collecting the results') raise return result
[docs] def check_input_id(self, id_): """ Check if an id for an object is valid and unused in the Tool. The method will always returns True if it does not raise an exception. :param str id_: the id to check :return: True :raises FastrValueError: if the id is not correctly formatted :raises FastrValueError: if the id is already in use """ regex = r'^\w[\w\d_]*$' if re.match(regex, id_) is None: raise exceptions.FastrValueError('An id in Fastr should follow' ' the following pattern {}' ' (found {})'.format(regex, id_)) if id_ in self.input_map: raise exceptions.FastrValueError('The id {} is already in use in {}!'.format(id_, self.id)) return True
[docs] def check_output_id(self, id_): """ Check if an id for an object is valid and unused in the Tool. The method will always returns True if it does not raise an exception. :param str id_: the id to check :return: True :raises FastrValueError: if the id is not correctly formatted :raises FastrValueError: if the id is already in use """ regex = r'^\w[\w\d_]*$' if re.match(regex, id_) is None: raise exceptions.FastrValueError('An id in Fastr should follow' ' the following pattern {}' ' (found {})'.format(regex, id_)) if id_ in self.output_map: raise exceptions.FastrValueError('The id {} is already in use in {}!'.format(id_, self.id)) return True
[docs] def get_arguments(self, values): """ Get the argument list for this interface :return: return list of arguments """ # Get the argument list arguments = self.input_map.values() + self.output_map.values() arguments = sorted(arguments, key=lambda x: x.order if x.order >= 0 else sys.maxsize - x.order) argument_list = [] for argument in arguments: id_ = argument.id try: if isinstance(argument, InputParameterDescription): value = values['inputs'] value = value[id_] else: value = values['outputs'] value = value[id_] # TODO: check if this works! #if len(value) != argument.cardinality: # raise exceptions.FastrValueError('Cardinality of argument {} has an incorrect cardinality (expected {} ({}), found {})'.format(argument.id, # argument.cardinality, # type(argument.cardinality).__name__, # len(value))) except KeyError: if argument.default is not None: if isinstance(argument.default, list): value = tuple(argument.default) else: value = (argument.default,) elif argument.required and not (isinstance(argument, OutputParameterDescription) and argument.automatic): raise exceptions.FastrValueError('Required argument "{}" has no value!'.format(argument.id)) else: value = None if value is not None: argument_list.extend(argument.get_commandline_argument(value)) return argument_list
[docs] def collect_results(self, result): """ Collect all results of the interface """ for output in self.output_map.values(): if output.automatic: collector = self.collectors[output.method] collector.collect_results(self, output, result) fastr.log.debug('--= Collected automatic result for {} =--'.format(output.id)) fastr.log.debug(result.result_data[output.id])
[docs] def get_specials(self, payload, output, cardinality_nr): """ Get special attributes. Returns tuples for specials, inputs and outputs that are used for formatting substitutions. :param output: Output for which to get the specials :param int cardinality_nr: the cardinality number """ if output is not None: datatype = fastr.typelist[output.datatype] # Get extension match if issubclass(datatype, URLType): extension = datatype.extension elif issubclass(datatype, TypeGroup): extensions = set() for member in datatype.members: extensions.add(member.extension if member.extension is not None else '') extension = '({})'.format('|'.join(extensions)) else: extension = '' else: extension = '' specials_tuple_type = namedtuple('Specials', ['cardinality', 'extension']) specials = specials_tuple_type(cardinality_nr, extension) parts_tuple_type = namedtuple('Parts', ['directory', 'basename', 'extension']) def split_parts(value): if not isinstance(value, str): value = str(value) try: directory = os.path.dirname(value) basename, ext = os.path.splitext(os.path.basename(value)) if ext == '.gz': # Split .gz extension further basename, ext = os.path.splitext(basename) ext += '.gz' except (ValueError, TypeError): directory, basename, ext = None, None, None return parts_tuple_type(directory=directory, basename=basename, extension=ext) if len(self.input_map) > 0: inputs_tuple_type = namedtuple('Inputs', [x.id for x in self.input_map.values()]) inputs = [payload['inputs'][x.id] if x.id in payload['inputs'] else [x.default] for x in self.input_map.values()] inputs_parts = inputs_tuple_type(*[tuple(split_parts(y) for y in x) for x in inputs]) inputs = inputs_tuple_type(*inputs) else: inputs = () inputs_parts = () output_arguments = [x for x in self.output_map.values() if not x.automatic] if len(output_arguments) > 0: outputs_tuple_type = namedtuple('Outputs', [x.id for x in self.output_map.values()]) outputs = [payload['outputs'][x.id] if x.id in payload['outputs'] else [x.default] for x in self.output_map.values()] outputs_parts = outputs_tuple_type(*[tuple(split_parts(y) for y in x) for x in outputs]) outputs = outputs_tuple_type(*outputs) else: outputs = () outputs_parts = () return specials, inputs, outputs, inputs_parts, outputs_parts
class ParameterDescription(Serializable): """ Description of an input or output parameter used by a Tool. This is the super class for both input and output, containing the shared parts. """ _IS_INPUT = False def __init__(self, parent, element, order=0): """ Instantiate a ParameterDescription :param Tool parent: parent tool :param dict element: description of the parameter :param order: the order in which the parameter was defined in the file """ self.parent = parent if isinstance(element, dict): self.id = element['id'] self.name = element['name'] if 'datatype' in element: self.datatype = element['datatype'] elif 'enum' in element: element['enum'] = [str(x) for x in element['enum']] self.datatype = '__{}__{}__Enum__'.format(parent.id, self.id) fastr.typelist.create_enumtype(self.datatype, tuple(element['enum']), self.datatype) else: raise exceptions.FastrValueError('No valid datatype defined for {}'.format(self.id)) self.prefix = element.get('prefix', None) self.repeat_prefix = element.get('repeat_prefix', False) self.cardinality = element.get('cardinality', 1) self.nospace = element.get('nospace', False) self.format = element.get('format', None) self.required = element.get('required', True) self.default = element.get('default', None) self.description = element.get('description', '') self.order = element.get('order', order) self.hidden = element.get('hidden', False) else: raise exceptions.FastrTypeError('element should be a dict') def __eq__(self, other): """ Compare two ParameterDescription instance with eachother. This function helps ignores the parent, but once tests the values for equality :param other: the other instances to compare to :returns: True if equal, False otherwise """ if not isinstance(other, ParameterDescription): return NotImplemented dict_self = {k: v for k, v in self.__dict__.items()} del dict_self['parent'] dict_other = {k: v for k, v in other.__dict__.items()} del dict_other['parent'] return dict_self == dict_other def __getstate__(self): """ Retrieve the state of the ParameterDescription :return: the state of the object :rtype dict: """ state = {k: v for k, v in self.__dict__.items()} state['parent'] = self.parent.id datatype = fastr.typelist[self.datatype] if issubclass(datatype, EnumType): del state['datatype'] state['enum'] = list(datatype.options) return state def __setstate__(self, state): """ Set the state of the ParameterDescription by the given state. :param dict state: The state to populate the object with """ if 'datatype' not in state: if 'enum' in state: typename = '__{}__{}__Enum__'.format(state['parent'], state['id']) state['datatype'] = typename fastr.typelist.create_enumtype(typename, tuple(state['enum']), typename) else: raise exceptions.FastrValueError('No valid datatype defined for {} in {}'.format(state['id'], state)) self.__dict__.update(state) def get_commandline_argument(self, value): """ Get the commandline argument for this Parameter given the values assigned to it. :param value: the value(s) for this input :return: commandline arguments :rtype: list """ argument = [] if isinstance(value, tuple): for cardinality_nr, value in enumerate(value): value = self.format_argument_value(value) argument.extend(self.format_prefix(value, cardinality_nr)) elif isinstance(value, OrderedDict): for cardinality_nr, (key, value) in enumerate(value.items()): value = ','.join(self.format_argument_value(x) for x in value if x is not None) argument.extend(self.format_prefix('{}={}'.format(key, value), cardinality_nr)) else: raise exceptions.FastrTypeError('Argument should be tuple or OrderedDict!') return argument def format_prefix(self, value, cardinality_nr): extra_argument = [] if cardinality_nr == 0 or self.repeat_prefix: if self.prefix is not None and self.prefix.strip() != '': prefix = self.prefix.replace('#', str(cardinality_nr)) if not self.nospace or value is None: extra_argument.append(prefix) if value is not None: if self.nospace: extra_argument.append('{}{}'.format(self.prefix, value)) else: extra_argument.append(value) return extra_argument def format_argument_value(self, value): datatype = fastr.typelist[self.datatype] if not datatype.isinstance(value): fastr.log.debug('CREATING DATATYPE {!r} for {!r}!'.format(datatype, value)) value = datatype(value) # Format (and validate if required) the input value value.format = self.format if self._IS_INPUT and not value.valid: fastr.log.debug('SELF TYPE: {}, ID {}'.format(type(self).__name__, self.id)) raise exceptions.FastrDataTypeValueError('Value for input {} not a valid instance of type {} (value: {} ({} / {!r}) -> {})'.format(self.id, datatype.id, value, type(value).__name__, value.value, value.valid)) value = str(value) # Filter out boolean flags if not value.startswith('__FASTR_FLAG__'): return value else: return None class InputParameterDescription(ParameterDescription): """ Description of an input parameter used by a Tool. """ _IS_INPUT = True def __init__(self, parent, element, order=0): """ Instantiate an InputParameterDescription :param Tool parent: parent tool :param dict element: description of the parameter :param order: the order in which the parameter was defined in the file """ if isinstance(element, dict): super(InputParameterDescription, self).__init__(parent, element, order) self.parent.check_input_id(element['id']) else: raise exceptions.FastrTypeError('element should be a dict') class OutputParameterDescription(ParameterDescription): """ Description of a output parameter used by a Tool. """ def __init__(self, parent, element, order=0): """ Instantiate an OutputParameterDescription :param Tool parent: parent tool :param dict element: description of the parameter :param order: the order in which the parameter was defined in the file """ if isinstance(element, dict): super(OutputParameterDescription, self).__init__(parent, element, order) self.parent.check_output_id(element['id']) self.automatic = element.get('automatic', False) self.action = element.get('action', None) self.location = element.get('location', None) self.separator = element.get('separator', None) if self.automatic: self.method = element.get('method', 'path') else: self.method = element.get('method', None) else: raise exceptions.FastrTypeError('element should be a dict') def __setstate__(self, state): """ Set the state of the OutputParameterDescription by the given state. :param dict state: The state to populate the object with """ if 'location' not in state: state['location'] = None if 'separator' not in state: state['separator'] = None if 'method' not in state: if 'automatic' not in state or not state['automatic']: state['method'] = None else: state['method'] = 'path' if 'action' not in state: state['action'] = None super(OutputParameterDescription, self).__setstate__(state) def get_commandline_argument(self, values): """ Get the commandline argument for this Parameter given the values assigned to it. :param value: the value(s) for this input :return: commandline arguments :rtype: list """ if self.action is not None: if self.action in self.ACTIONS: action_func = self.ACTIONS[self.action] for value in values: fastr.log.info('Calling action {} for {}'.format(action_func, value)) action_func(self, value) if not self.automatic: return super(OutputParameterDescription, self).get_commandline_argument(values) else: return [] def mkdirs(self, value): """ Create the directory if it does not yet exist :param URLType value: the directory to create """ value = str(value) if not os.path.exists(value): os.makedirs(value) ACTIONS = {'ensure': mkdirs, 'mkdir': mkdirs}