Source code for fastr.core.tool

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A module to maintain a tool.

Exported classes:

* Tool -- A class encapsulating a tool.
* ParameterDescription -- The base class containing the shared description of
  a parameter (both input and ouput).
* InputParameterDescription -- A class containing the description of an input
  parameter.
* Output ParameterDescription -- A class containing the description of an
  output parameter.
"""

import itertools
import os
import platform
import re
import shutil
from collections import namedtuple, OrderedDict
from tempfile import mkdtemp

import jsonschema

from . import vfs_plugin
from .cardinality import create_cardinality
from .interface import InterfaceResult
from .version import Version
from .. import exceptions
from ..abc.baseplugin import PluginState
from ..abc.serializable import Serializable
from ..datatypes import types
from ..helpers import log, config
from ..helpers.checksum import hashsum
from ..helpers import iohelpers
from .. import resources


[docs]class Tool(Serializable): """ The class encapsulating a tool. """ __dataschemafile__ = 'Tool.schema.json' test_spec = namedtuple('TestSpecification', ['input', 'command', 'output']) TOOL_REFERENCE_FILE_NAME = '__fastr_tool_ref__.json' TOOL_RESULT_FILE_NAME = '__fastr_tool_result.pickle.gz' DEFAULT_TARGET_CLASS = { 'MacroNode': 'MacroTarget' }
[docs] def __init__(self, doc=None): """ Create a new Tool :param doc: path of toolfile or a dict containing the tool data :type doc: str or dict """ # Cache value for target binary self._target = None if doc is None: return filename = None if not isinstance(doc, dict): log.debug('Trying to load file: {}'.format(doc)) filename = os.path.expanduser(doc) filename = os.path.abspath(filename) doc = self._loadf(filename) # Check if the doc is a valid Tool structure try: # __unserializer__ is supplied by the serializable meta-class # pylint: disable=no-member doc = Tool.get_serializer().instantiate(doc) except jsonschema.ValidationError: log.error('Could not validate Tool data again the schema!') raise else: log.debug('Tool schema validated!') # Get attributes from root node self.filename = filename #: Identifier for the tool regex = r'^[A-Z][\w\d_]*$' if re.match(regex, doc['id']) is None: message = 'A tool id in Fastr should be UpperCamelCase as enforced' \ ' by regular expression {} (found {})'.format(regex, doc['id']) log.warning(message) self._id = doc['id'] #: The namespace this tools lives in, this will be set by the ToolManager on load self.namespace = None #: Name of the tool, this should be a descriptive, human readable name. self.name = doc.get('name', self._id) #: Version of the tool, not of the underlying software self.version = Version(str(doc.get('version'))) #: Class for of the Node to use self.node_class = doc.get('class', 'Node') if self._id is None or self.name is None or self.version is None: raise exceptions.FastrValueError('Tool should contain an id, name and version!') #: URL to website where this tool can be downloaded from self.url = doc.get('url', '') #: Description of the tool and it's functionality self.description = doc.get('description', '') #: List of authors of the tool. These people wrapped the executable but #: are not responsible for executable itself. self.authors = doc['authors'] #: List of tags for this tool self.tags = doc.get('tags', []) # Parse references field and format them into a dictionary #: A list of documents and in depth reading about the methods used in this tool self.references = doc.get('references', []) #: Requirements for this Tool #: #: .. warning:: Not yet implemented self.requirements = None #: Test for this tool. A test should be a collection of inputs, parameters #: and outputs to verify the proper functioning of the Tool. #: #: The format of the tests is a list of namedtuples, that have 3 fields: #: - input: a dict of the input data #: - command: a list given the expected command-line arguments #: - output: a dict of the output data to validate #: #: .. warning:: Not yet implemented self.tests = doc['tests'] command = doc['command'] # Find commands #: Command is a dictionary contain information about the command which is #: called by this Tool: #: command['interpreter'] holds the (possible) interpreter to use #: command['targets'] holds a per os/arch dictionary of files that should be executed #: command['url'] is the webpage of the command to be called #: command['version'] is the version of the command used #: command['description'] can help a description of the command #: command['authors'] lists the original authors of the command self.command = { 'targets': command['targets'], 'license': command.get('license', ''), 'url': command.get('url', ''), 'version': Version(str(command.get('version'))), 'description': command.get('description', ''), 'authors': command.get('authors', []) } if len(self.command['targets']) == 0: raise exceptions.FastrValueError("No targets defined in tool description.") #: This holds the citation you should use when publishing something based on this Tool self.cite = doc['cite'] #: Man page for the Tool. Here usage and examples can be described in detail self.help = doc['help'] #: Create the Interface based on the class specified in the tool file interface_class = resources.interfaces[doc['interface'].get('class', 'FastrInterface')] if interface_class.status != PluginState.loaded: raise exceptions.FastrPluginNotLoaded( 'Required InterfacePlugin {} was not loaded properly (status {})'.format(interface_class.id, interface_class.status) ) self.interface = interface_class(id_='{}_{}_interface'.format(self.id, self.command['version']), document=doc['interface'])
@property def hash(self): return hashsum(self.__getstate__()) @property def id(self): return '{}:{}'.format(self._id, self.command['version']) @property def ns_id(self): """ The namespace and id of the Tool """ if self.namespace is None: return self.id else: return '{}/{}'.format(self.namespace, self.id) @property def fullid(self): """ The full id of this tool """ return 'fastr:///tools/{}/{}'.format(self.ns_id, self.version) @property def inputs(self): return self.interface.inputs @property def outputs(self): return self.interface.outputs @property def target(self): """ The OS and arch matched target definition. """ if self._target is not None: return self._target # Get platform and os arch = platform.architecture()[0].lower() os_ = platform.system().lower() matching_targets = [x for x in self.command['targets'] if x['os'] in [os_, '*'] and x['arch'] in [arch, '*']] if len(matching_targets) == 0: return None elif len(matching_targets) == 1: target = matching_targets[0] else: # This should give the optimal match for match in matching_targets: match['score'] = 0 if match['os'] == os_: match['score'] += 2 if match['arch'] == arch: match['score'] += 1 matching_targets = sorted(matching_targets, reverse=True, key=lambda x: x['score']) log.debug('Sorted matches: {}'.format(matching_targets)) target = matching_targets[0] # Make sure target is a copy target = dict(target) cls = self.DEFAULT_TARGET_CLASS.get(self.node_class) if cls is None: cls = target.pop('class', 'LocalBinaryTarget') cls = resources.targets[cls] # Create target from curdir old_curdir = os.path.abspath(os.curdir) os.chdir(self.path) # Check if the argument binary exists (or use bin as alternative) if 'binary' not in target and 'bin' in target: target['binary'] = target.pop('bin') # Remove needless fields target.pop('os', None) target.pop('arch', None) # Instantiate target try: self._target = cls(**target) except Exception as exception: log.error('Could not load tool {} because the following exception was encountered: {}'.format( self.ns_id, exception )) raise os.chdir(old_curdir) return self._target def _prepare_payload(self, payload=None, **kwargs): # Allow kwargs to be used instead of payload if payload is None: payload = {'inputs': {}, 'outputs': {}} for key, value in kwargs.items(): if key in self.inputs and key in self.outputs: raise exceptions.FastrValueError( 'Cannot figure out if "{}" is an input or output, please prefix with input_/output_ as needed' ) elif key in self.inputs: payload['inputs'][key] = value elif key in self.outputs: payload['outputs'][key] = value elif key.startswith('input_') and key[6:] in self.inputs: payload['inputs'][key[6:]] = value elif key.startswith('output_') and key[7:] in self.outputs: payload['outputs'][key[7:]] = value else: raise exceptions.FastrValueError('Cannot match key "{}" to any input/output!'.format(key)) # Make sure all values are wrapped in a tuple (for single values) for key, value in payload['inputs'].items(): if not isinstance(value, (tuple, OrderedDict)): payload['inputs'][key] = (value,) for key, value in payload['outputs'].items(): if not isinstance(value, (tuple, OrderedDict)): payload['outputs'][key] = (value,) return payload def _validate_payload(self, payload): for input_ in self.inputs.values(): if input_.id not in payload['inputs']: if input_.required: raise exceptions.FastrValueError('Could not find data for required input {}'.format(input_.id)) else: continue value = payload['inputs'][input_.id] if not isinstance(value, (tuple, OrderedDict)): raise exceptions.FastrTypeError('Value needs to be a tuple or OrderedDict') cardinality_spec = create_cardinality(input_.cardinality, None) if not (not input_.required and len(value) == 0) and \ not cardinality_spec.validate(payload, len(value)): raise exceptions.FastrValueError( 'Input cardinality mismatch for {}. Cardinality "{}" does not match "{!r}"'.format( input_.id, len(value), cardinality_spec, ) ) datatype = types.get(input_.datatype) # If the input has named sub-inputs we need to collapse the structure # before checking the datatypes if isinstance(value, OrderedDict): value = (x for y in value.values() for x in y) if datatype is not None and not all(datatype.isinstance(x) for x in value): raise exceptions.FastrTypeError('Input {} has an incorrect datatype (found {}, expected {})'.format( input_.id, set(type(x) for x in value), datatype ))
[docs] def execute(self, payload=None, **kwargs) -> InterfaceResult: """ Execute a Tool given the payload for a single run :param payload: the data to execute the Tool with :returns: The result of the execution :rtype: InterFaceResult """ payload = self._prepare_payload(payload=payload, **kwargs) # Make sure hte payload is valid self._validate_payload(payload) # Set environment variables if not set os.environ.setdefault('FASTR_SAMPLE_ID', 'UNKNOWN') os.environ.setdefault('FASTR_SAMPLE_INDEX', '0') os.environ.setdefault('FASTR_JOB_ID', 'fastr_interactive_0') os.environ.setdefault('FASTR_WORK_DIR', 'UNKNOWN') target = self.target log.info('Target is {}'.format(target)) if target is None: arch = platform.architecture()[0].lower() os_ = platform.system().lower() raise exceptions.FastrValueError( 'Cannot find a viable target for {}/{} on {} ({} bit)'.format(self.id, self.version, os_, arch) ) log.info('Using payload: {}'.format(payload)) with target: result = self.interface.execute(target, payload) return result
[docs] def __str__(self): """ Get a string version for the Tool :return: the string version :rtype: str """ return '<Tool: {} version: {}>'.format(self.id, self.version)
[docs] def __repr__(self): """ Get a string representation for the Tool. This will show the inputs and output defined in a table-like structure. :return: the string representation :rtype: str """ if self.name is not None and len(self.name) > 0: name_part = ' ({})'.format(self.name) else: name_part = '' return_list = ["Tool {} v{}{}".format(self.id, str(self.command['version']), name_part)] # The "+ [8]" guarantees a minimum of 8 width and avoids empty lists width_input_keys = max([len(x.id) for x in self.inputs.values()] + [8]) width_input_types = max([len(x.datatype) for x in self.inputs.values()] + [8]) + 2 width_output_keys = max([len(x.id) for x in self.outputs.values()] + [8]) width_output_types = max([len(x.datatype) for x in self.outputs.values()] + [8]) + 2 return_list.append('{:^{}} | {:^{}}'.format('Inputs', width_input_types + width_input_keys + 1, 'Outputs', width_output_types + width_output_keys + 1)) return_list.append('-' * (width_input_keys + width_input_types + width_output_keys + width_output_types + 7)) for input_, output in itertools.zip_longest(self.inputs.values(), self.outputs.values()): if input_ is None: input_id = '' input_type = '' else: input_id = input_.id input_type = '({})'.format(input_.datatype) if output is None: output_id = '' output_type = '' else: output_id = output.id output_type = '({})'.format(output.datatype) return_list.append('{:{}} {:{}} | {:{}} {:{}}'.format(input_id, width_input_keys, input_type, width_input_types, output_id, width_output_keys, output_type, width_output_types)) return '\n'.join(return_list)
[docs] def __eq__(self, other): """Compare two Tool instances with each other. :param other: the other instances to compare to :type other: Tool :returns: True if equal, False otherwise """ if not isinstance(other, Tool): return NotImplemented dict_self = dict(self.__dict__) del dict_self['_target'] del dict_self['namespace'] del dict_self['filename'] dict_other = dict(other.__dict__) del dict_other['_target'] del dict_other['namespace'] del dict_other['filename'] return dict_self == dict_other
[docs] def __getstate__(self): """ Retrieve the state of the Tool :return: the state of the object :rtype dict: """ state = dict(vars(self)) state['command'] = {k: v for k, v in list(self.command.items())} state['class'] = state['node_class'] state['interface'] = self.interface.__getstate__() state['command']['version'] = str(self.command['version']) state['version'] = str(self.version) state['id'] = self._id del state['node_class'] del state['_target'] return state
[docs] def serialize(self): """ Prepare data for serialization, this removes some fields from the state that are not needed when serializing to a file """ state = self.__getstate__() # Data stub to be ordered data = { "id": self._id, "name": self.name, "version": str(self.version), "description": self.description, "authors": self.authors, "class": state['class'], "command": state['command'], "interface": state['interface'], } # Set of items to skip skip = {'_id', 'namespace', 'filename'} for key, value in state.items(): if key in skip or key in data: continue data[key] = value return data
[docs] def __setstate__(self, state): """ Set the state of the Tool by the given state. :param dict state: The state to populate the object with """ if 'filename' not in state: state['filename'] = None if 'namespace' not in state: state['namespace'] = None if 'id' in state: state['_id'] = state.pop('id') state['version'] = Version(state['version']) state['command']['version'] = Version(str(state['command']['version'])) state['node_class'] = state['class'] del state['class'] interface_class = resources.interfaces[state['interface'].get('class', 'FastrInterface')] if 'id' not in state['interface']: state['interface']['id'] = '{}_{}_interface'.format(state['_id'], state['command']['version']) if interface_class.status != PluginState.loaded: raise exceptions.FastrPluginNotLoaded( 'Required InterfacePlugin {} was not loaded properly (status {})'.format(interface_class.id, interface_class.status) ) state['interface'] = interface_class.deserialize(state['interface']) self.__dict__.update(state) self._target = None
@property def path(self): """ The path of the directory in which the tool definition file was located. """ return os.path.dirname(self.filename) @property def command_version(self): return self.command['version']
[docs] def create_reference(self, input_data, output_directory, mount_name='__ref_tmp__', copy_input=True): # Make sure the output directory is absolute for later on output_directory = os.path.abspath(os.path.expanduser(output_directory)) # Create output directory if not os.path.exists(output_directory): os.makedirs(output_directory) input_data_dir = os.path.join(output_directory, 'input_data') if not os.path.exists(input_data_dir): os.mkdir(input_data_dir) output_data_dir = os.path.join(output_directory, 'output_data') if not os.path.exists(output_data_dir): os.mkdir(output_data_dir) # Add the temporary mount config.read_config_string('mounts["{}"] = "{}"'.format(mount_name, output_directory)) # If desired, copy input data to data directory in output directory if copy_input: log.info('Copying input data into reference directory...') new_input_data = {} stored_input_data = {} for input_id, input_value in input_data.items(): new_input_value = [] stored_input_value = [] # Make sure we are working with tuples if isinstance(input_value, list): input_value = tuple(input_value) if not isinstance(input_value, tuple): input_value = input_value, # Datatype for this input datatype = types[self.inputs[input_id].datatype] # If needed copy the data for value in input_value: if isinstance(value, str) and os.path.exists(value): filename = os.path.basename(value) destination = 'vfs://{}/input_data/{}'.format(mount_name, filename) vfs_plugin.push_sink_data(value, destination, datatype=datatype) new_input_value.append(vfs_plugin.url_to_path(destination)) stored_input_value.append('$REFDIR/input_data/{}'.format(filename)) else: new_input_value.append(value) stored_input_value.append(value) new_input_data[input_id] = tuple(new_input_value) stored_input_data[input_id] = tuple(stored_input_value) else: new_input_data = input_data stored_input_data = input_data test_data = { 'input_data': stored_input_data, 'used_input_data': new_input_data, 'original_input_data': input_data, 'tool': self.ns_id, 'version': str(self.command_version), } # New replace input_data with new input data input_data = new_input_data # Save the test summary to the reference file log.info('Saving basic reference information') iohelpers.save_json( os.path.join(output_directory, self.TOOL_REFERENCE_FILE_NAME), test_data ) # Cast input data to the desired datatype for input_id, input_value in input_data.items(): _datatype = self.inputs[input_id].datatype input_data[input_id] = tuple( [types[_datatype](input_value[idx]) for idx, val in enumerate(input_value)] ) # Reference payload as a shortcut payload = {'inputs': input_data, 'outputs': {}} # Create output arguments automatically from fastr.execution.job import Job log.info('Determining values for outputs') for id_, argument in self.outputs.items(): # Determine cardinality if not argument.automatic: cardinality = create_cardinality(argument.cardinality, None) cardinality = cardinality.calculate_job_cardinality(payload) else: cardinality = 1 # Create output arguments automatically payload['outputs'][id_] = Job.fill_output_argument(output_spec=argument, cardinality=cardinality, desired_type=types[argument.datatype], requested=True, tmpurl='vfs://{}/output_data'.format(mount_name)) log.info('Resulting test data: {}'.format(test_data)) log.info('Executing tool...') result = self.execute(payload=payload) result.command = result.target_result.command # Translate command to avoid hard links to output_directory log.info('Processing results for use as reference data') # Do no substitute for input data pattern = r'^{}'.format(output_data_dir) result.reference_command = [re.sub(pattern, '$OUTDIR', x, 1) for x in result.target_result.command] result.reference_command = ['$INDIR/{}'.format(os.path.basename(x)) if os.path.exists(x) else x for x in result.reference_command] # Translate results back output_data = {} for key, value in result.result_data.items(): datatype = types[self.outputs[key].datatype] preferred_type = (datatype,) output_data[key] = Job.translate_output_results(value, datatypes=preferred_type, mountpoint=mount_name) result.output_data = output_data config.read_config_string('del mounts["{}"]'.format(mount_name)) log.info('Saving results for reference...') iohelpers.save_gpickle( os.path.join(output_directory, self.TOOL_RESULT_FILE_NAME), result ) log.info('Finishing creating reference data for {}/{}'.format(self.ns_id, self.command_version))
[docs] @staticmethod def compare_output_data(current_output_data, reference_output_data, validation_result, output): log.info('Current output data: {}'.format(current_output_data)) log.info('Reference output data: {}'.format(reference_output_data)) for nr, (current_value, reference_value) in enumerate(zip(current_output_data, reference_output_data)): if current_value != reference_value: validation_result.append('\n'.join(( 'Value for {}/{} was not equal! (found "{}", expected "{}")'.format( output, nr, current_value, reference_value, ), 'Output: [{}] {!r}'.format(type(current_value).__name__, current_value), 'Reference: [{}] {!r}'.format(type(reference_value).__name__, reference_value), )))
[docs] @classmethod def test_tool(cls, reference_data_dir, tool=None, input_data=None): """ Execute the tool with the input data specified and test the results against the refence data. This effectively tests the tool execution. :param str reference_data_dir: The path or vfs url of reference data to compare with :param dict source_data: The source data to use """ if not isinstance(reference_data_dir, str): raise exceptions.FastrTypeError('reference_data_dir should be a string!') if reference_data_dir.startswith('vfs://'): reference_data_dir = vfs_plugin.url_to_path(reference_data_dir) if not os.path.isdir(reference_data_dir): raise exceptions.FastrTypeError('The reference_data_dir should be pointing to an existing directory!' ' {} does not exist'.format(reference_data_dir)) test_data = iohelpers.load_json( os.path.join(reference_data_dir, cls.TOOL_REFERENCE_FILE_NAME) ) if tool is None: tool = resources.tools[test_data['tool'], test_data['version']] log.info('Testing tool {}/{} against {}'.format( tool.ns_id, tool.command_version, reference_data_dir, )) if input_data is None: input_data = {} for key, value in test_data['input_data'].items(): if not isinstance(value, (tuple, list)): value = value, # Set the $REFDIR correctly (the avoid problems with moving the reference dir) value = tuple(x.replace('$REFDIR', reference_data_dir) if isinstance(x, str) else x for x in value) input_data[key] = value temp_results_dir = None reference_result = iohelpers.load_gpickle( os.path.join(reference_data_dir, cls.TOOL_RESULT_FILE_NAME) ) validation_result = [] try: # Create temporary output directory temp_results_dir = os.path.normpath(mkdtemp( prefix='fastr_tool_test_{}_'.format(tool.id), dir=config.mounts['tmp'] )) # Create a new reference for comparison log.info('Running tool and creating new reference data for comparison...') try: tool.create_reference(input_data, temp_results_dir, mount_name='__test_tmp__', copy_input=False) except Exception as exception: log.warning('Encountered exception when trying to run the {}/{} tool!'.format( tool.ns_id, tool.command_version) ) log.warning('Exception: [{}] {}'.format(type(exception).__name__, exception)) validation_result.append('Encountered {}: {}'.format(type(exception).__name__, exception)) return validation_result current_result = iohelpers.load_gpickle(os.path.join(temp_results_dir, cls.TOOL_RESULT_FILE_NAME)) log.info('Comparing current run against reference run...') # First check the command and return code if current_result.reference_command != reference_result.reference_command: validation_result.append('\n'.join(( 'Different command used for execution of tool "{}/{}"'.format(tool.ns_id, tool.command_version), 'Current command: {}'.format(current_result.reference_command), 'Reference command {}'.format(reference_result.reference_command), ))) if current_result.target_result.return_code != reference_result.target_result.return_code: validation_result.append('\n'.join(( 'Different returncode used for execution of tool "{}/{}"'.format(tool.ns_id, tool.returncode_version), 'Current returncode: {}'.format(current_result.target_result.returncode), 'Reference returncode {}'.format(reference_result.target_result.returncode), ))) # Check if the outputs are the same current_outputs = sorted(reference_result.output_data.keys()) reference_outputs = sorted(current_result.output_data.keys()) if current_outputs != reference_outputs: validation_result.append('\n'.join(( 'Different outputs found in tool "{}/{}"'.format(tool.ns_id, tool.command_version), 'Current outputs: {}'.format(current_outputs), 'Reference outputs {}'.format(reference_outputs), ))) # Add the mounts need to find the data config.read_config_string('mounts["__test_tmp__"] = "{}"'.format(temp_results_dir)) config.read_config_string('mounts["__ref_tmp__"] = "{}"'.format(reference_data_dir)) # Check if values for all outputs match for output in reference_outputs: current_output_data = current_result.output_data[output] reference_output_data = reference_result.output_data[output] for idx, _ in enumerate(current_output_data): if not isinstance(current_output_data[idx], type(reference_output_data[idx])): validation_result.append('\n'.join(( 'Different type for output {} found in tool "{}/{}"'.format( tool.ns_id, tool.command_version ), 'Current type: {}'.format(type(current_output_data[idx])), 'Reference type {}'.format(type(reference_output_data[idx])), ))) if isinstance(current_output_data, dict): raise exceptions.FastrNotImplementedError( 'Output comparison of dict structures is not supported yet!' ) else: cls.compare_output_data( current_output_data, reference_output_data, validation_result, output ) # Remove the temporary mounts again config.read_config_string('del mounts["__test_tmp__"]') config.read_config_string('del mounts["__ref_tmp__"]') if len(validation_result) == 0: log.info('Run and reference were equal! Test passed!') else: log.info('Found difference with reference data! Test failed!') for line in validation_result: log.info(line) return validation_result finally: # Clean up log.info('Removing temp result directory {}'.format(temp_results_dir)) if temp_results_dir is not None and os.path.isdir(temp_results_dir): shutil.rmtree(temp_results_dir, ignore_errors=True)
[docs] def test(self, reference=None): """ Run the tests for this tool """ if reference is None: result = [] tool_dir = os.path.dirname(self.filename) for test in self.tests: reference_dir = os.path.abspath(os.path.join(tool_dir, test)) try: result.extend(self.test_tool(reference_data_dir=reference_dir, tool=self)) except exceptions.FastrTypeError: message = 'Reference data in {} is not valid!'.format(reference_dir) log.warning(message) result.append(message) else: result = self.test_tool(reference_data_dir=reference, tool=self) return result