Source code for fastr.execution.noderun

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

A module to maintain a run of a network node.

import weakref
from abc import ABCMeta
from collections import OrderedDict

import fastr
import fastr.exceptions as exceptions
from fastr.core.samples import SampleItem
from fastr.core.inputgroupcombiner import DefaultInputGroupCombiner, MergingInputGroupCombiner
from fastr.core.node import InputGroup
from fastr.core.serializable import Serializable
from fastr.core.tool import Tool
from fastr.core.updateable import Updateable
from fastr.datatypes import DataType
from fastr.execution import node_run_mapping
from fastr.execution.inputoutputrun import BaseInputRun, InputRun, OutputRun
from fastr.execution.job import Job

__all__ = ['NodeRun']

[docs]class NodeRun(Updateable, Serializable): """ The class encapsulating a node in the network. The node is responsible for setting and checking inputs and outputs based on the description provided by a tool instance. """ __metaclass__ = ABCMeta __dataschemafile__ = 'NodeRun.schema.json' _InputType = InputRun _OutputType = OutputRun _JobType = Job
[docs] def __init__(self, node, parent): """ Instantiate a node. :param node: The node to base the noderun on :type node: :py:class:`Tool <fastr.core.node.NodeRun>` :param parent: the parent network of the node :type parent: :py:class:`Network <>` :return: the newly created NodeRun """ super(NodeRun, self).__init__() self._tool = node.tool self._parent = weakref.ref(parent) self._id = self.nodegroup = node.nodegroup self.drained = False inputlist = [] outputlist = [] # Create Input/Output objects for input_ in node.inputs.values(): # It can happen that this has been done by a subclass, check first inputobj = self._InputType(self, input_) inputlist.append((, inputobj)) for output in node.outputs.values(): # It can happen that this has been done by a subclass, check first outputobj = self._OutputType(self, output) outputlist.append((, outputobj)) self.inputs = OrderedDict(inputlist) self.outputs = OrderedDict(outputlist) self._input_groups = OrderedDict() # Set the job requirements self._required_cores = node.required_cores self._required_memory = node.required_memory self._required_time = node.required_time # Set the flow control self.input_group_combiner = None self.merge_dimensions = node.merge_dimensions # Set the input groups self.update_input_groups() self.input_group_combiner.update() = {} # Update Inputs and self (which calls Outputs) self.update()
[docs] def __repr__(self): """ Get a string representation for the NodeRun :return: the string representation :rtype: str """ return str(self)
[docs] def __str__(self): """ Get a string version for the NodeRun :return: the string version :rtype: str """ return "<{}: {}>".format(type(self).__name__,
[docs] def __eq__(self, other): """Compare two Node instances with each other. This function ignores the parent and update status, but tests rest of the dict for equality. equality :param other: the other instances to compare to :type other: NodeRun :returns: True if equal, False otherwise """ if not isinstance(other, NodeRun): return NotImplemented dict_self = {k: v for k, v in self.__dict__.items()} del dict_self['_parent'] del dict_self['_status'] del dict_self['_input_groups'] del dict_self['jobs'] del dict_self['input_group_combiner'] dict_other = {k: v for k, v in other.__dict__.items()} del dict_other['_parent'] del dict_other['_status'] del dict_other['_input_groups'] del dict_other['jobs'] del dict_other['input_group_combiner'] return dict_self == dict_other
[docs] def __getstate__(self): """ Retrieve the state of the NodeRun :return: the state of the object :rtype dict: """ state = super(NodeRun, self).__getstate__() # Make id prettier in the result state['id'] = # Add the class of the NodeRun in question state['class'] = type(self).__name__ # Get all input and output state['inputs'] = [x.__getstate__() for x in self.inputs.values()] state['outputs'] = [x.__getstate__() for x in self.outputs.values()] if self._tool is not None: state['tool'] = [self._tool.ns_id, str(self._tool.command['version'])] else: state['tool'] = None # Add resource requirements state['required_cores'] = self._required_cores state['required_memory'] = self._required_memory state['required_time'] = self._required_time state['merge_dimensions'] = self._merge_dimensions return state
[docs] def __setstate__(self, state): """ Set the state of the NodeRun by the given state. :param dict state: The state to populate the object with :return: None """ # Make sure the NodeRun classes are aligned (and warn if not so) if 'class' in state and state['class'] != type(self).__name__: fastr.log.warning('Attempting to set the state of a {} to {}'.format( state['class'], type(self).__name__ )) = None if not hasattr(self, '_input_groups'): self._input_groups = OrderedDict() if 'id' in state: self._id = state['id'] if 'parent' in state: parent = state['parent'] del state['parent'] else: parent = None if state['tool'] is not None: self._tool = fastr.toollist[tuple(state['tool'])] else: self._tool = None # Create Input/Output objects inputlist = [] for input_ in state['inputs']: if 'node' in input_: # Check if the expected NodeRun id matches our current id if input_['node'] != state['id']: raise exceptions.FastrParentMismatchError('This Input has different parent node!') del input_['node'] # It can happen that this has been done by a subclass, check first if not isinstance(input_, BaseInputRun): description = self.tool.inputs[input_['id']] inputobj = self._InputType(self, description) inputobj._node = self inputobj.__setstate__(input_) else: inputobj = input_ inputlist.append((, inputobj)) outputlist = [] for output in state['outputs']: if '_node' in output: # Check if the expected NodeRun id matches our current id if output['_node'] != state['_id']: raise exceptions.FastrParentMismatchError('This Input has different parent node!') del output['_node'] # It can happen that this has been done by a subclass, check first if not isinstance(output, OutputRun): description = self.tool.outputs[output['id']] outputobj = self._OutputType(self, description) outputobj.__setstate__(output) else: outputobj = output outputlist.append((, outputobj)) self.inputs = OrderedDict(inputlist) self.outputs = OrderedDict(outputlist) super(NodeRun, self).__setstate__(state) self._parent = None if parent is not None: self.parent = parent else: message = 'parent argument is None, need a parent Network to function!' raise exceptions.FastrValueError(message) self._required_cores = state['required_cores'] self._required_memory = state['required_memory'] self._required_time = state['required_time'] self.merge_dimensions = state['merge_dimensions'] self.update()
@property def merge_dimensions(self): return self._merge_dimensions @merge_dimensions.setter def merge_dimensions(self, value): if isinstance(value, (str, unicode)): options = ['all', 'none'] if value not in options: raise exceptions.FastrValueError('Invalid option {} given (valid options: {})'.format(value, options)) self._merge_dimensions = value if value == 'none': self.input_group_combiner = DefaultInputGroupCombiner(self) elif value == 'all': self.input_group_combiner = MergingInputGroupCombiner(self, value) else: self._merge_dimensions = value self.input_group_combiner = MergingInputGroupCombiner(self, tuple(value))
[docs] def update_input_groups(self): """ Update all input groups in this node """ input_groups = OrderedDict() for input_ in self.inputs.values(): if input_.input_group not in input_groups: input_groups[input_.input_group] = InputGroup(self, input_.input_group) input_groups[input_.input_group][] = input_ self._input_groups = input_groups
[docs] @classmethod def createobj(cls, state, network=None): if 'parent' not in state or not isinstance(state['parent'], fastr.Network): if network is not None: fastr.log.debug('Setting network to: {}'.format(network)) state['parent'] = network else: fastr.log.debug('No network given for de-serialization') else: fastr.log.debug('Parent is already defined as: {}'.format(network)) state = dict(state) return super(NodeRun, cls).createobj(state, network)
@property def blocking(self): """ Indicate that the results of this NodeRun cannot be determined without first executing the NodeRun, causing a blockage in the creation of jobs. A blocking Nodes causes the Chunk borders. """ for output in self.outputs.values(): if output.blocking: fastr.log.debug('Blocking because Output {} has cardinality {}'.format(output.fullid, output.cardinality())) return True return False @property def dimnames(self): """ Names of the dimensions in the NodeRun output. These will be reflected in the SampleIdList of this NodeRun. """ if hasattr(self, '_dimnames') and self._dimnames is not None: return self._dimnames else: return self.input_group_combiner.dimnames @dimnames.setter def dimnames(self, value): if isinstance(value, str): value = value, if not isinstance(value, tuple) or not all(isinstance(x, str) for x in value): raise exceptions.FastrTypeError('Dimnames has to be a tuple of str!') fastr.log.warning('You are overriding the dimnames of a NodeRun, beware this is possible but not encouraged and can lead to strange results!') self._dimnames = value @dimnames.deleter def dimnames(self): del self._dimnames @property def fullid(self): """ The full defining ID for the NodeRun inside the network """ return '{}/nodelist/{}'.format(self.parent.fullid, @property def global_id(self): """ The global defining ID for the Node from the main network (goes out of macro nodes to root network) """ return '{}/nodelist/{}'.format(self.parent.global_id, @property def input_groups(self): """ A list of input groups for this NodeRun. An input group is InputGroup object filled according to the NodeRun """ return self._input_groups @property def outputsize(self): """ Size of the outputs in this NodeRun """ # Get sizes of all input groups return self.input_group_combiner.size @property def id(self): """ The id of the NodeRun """ return self._id @property def listeners(self): """ All the listeners requesting output of this node, this means the listeners of all Outputs and SubOutputs """ return {l for output in self.outputs.values() for l in output.listeners} @property def name(self): """ Name of the Tool the NodeRun was based on. In case a Toolless NodeRun was used the class name is given. """ if hasattr(self, '_tool') and isinstance(self._tool, Tool): return else: return self.__class__.__name__ @property def parent(self): """ The parent network of this node. """ return self._parent() @parent.setter def parent(self, value): """ The parent network of this node. (setter) """ if self._parent() is value: return # Setting to same value doesn't do anything if self._parent() is not None: raise exceptions.FastrAttributeError('Cannot reset attribute once set') self._parent = weakref.ref(value) self._parent().add_node(self) @property def required_cores(self): """ Number of cores required for the execution of this NodeRun """ return self._required_cores @property def required_memory(self): """ Amount of memory required for the execution of this NodeRun. Follows the format \\d+[mMgG] so 500M or 4g would be valid ways to specify 500 megabytes or 4 gigabyte of memory. """ return self._required_memory @property def required_time(self): """ Amount of time required for the execution of this NodeRun. Follows the format of a number of second or H:M:S, with H the number of hours, M the number of minutes and S the number of seconds. """ return self._required_time @property def status(self): return self._status @property def tool(self): return self._tool
[docs] def execute(self): """ Execute the node and create the jobs that need to run :return: list of jobs to run :rtype: list of :py:class:`Jobs <fastr.execution.job.Job>` """ self.update(False, False) # Make sure a NodeRun is valid if not self.valid: message = 'NodeRun {} is not valid'.format(self.fullid) fastr.log.error(message) fastr.log.error('Messages:\n{}'.format('\n'.join(self.messages))) raise exceptions.FastrNodeNotValidError(message) input_groups = self.input_groups'Generating jobs for node "{}" with dimensions: {}'.format(, ', '.join('[{}: {}]'.format(name, size) for name, size in zip(self.dimnames, self.outputsize)) )) # Iterate over all combinations of input_groups to create sets of data job_list = [] fastr.log.debug('InputGroups: {}'.format(input_groups.values())) fastr.log.debug('Inputs: {}'.format([x for ig in input_groups.values() for x in ig.values()])) fastr.log.debug('Sources: {}'.format([x.source for ig in input_groups.values() for x in ig.values()])) for sample_index, sample_id, job_data, job_depends, failed_annotations in self.input_group_combiner: fastr.log.debug('----- START -----') fastr.log.debug('INDEX: {}'.format(sample_index)) fastr.log.debug('SAMPLE_ID: {} {}'.format(repr(sample_id), sample_id)) fastr.log.debug('JOBDATA: {}'.format(job_data)) fastr.log.debug('JOBDEPS: {}'.format(job_depends)) fastr.log.debug('FAILS: {}'.format(failed_annotations)) fastr.log.debug('------ END ------') job_list.append(self.create_job(sample_id, sample_index, job_data, job_depends)) fastr.log.debug('joblist: {}'.format(job_list)) fastr.log.debug('===== END execute_node =====') self.drained = True yield job_list
[docs] def set_result(self, job, failed_annotation): """ Incorporate result of a job into the NodeRun. :param Type job: job of which the result to store :param failed_annotation: A set of annotations, None if no errors else containing a tuple describing the errors """ sample_id, sample_index = job.sample_id, job.sample_index # Replace following code by node.set_data(job.output_data) ? or something like it? for output in self.outputs.values(): if not in job.output_data and len(output.listeners) > 0 and len(failed_annotation) == 0: error_message = 'Could not find required data for {} in {}!'.format(output.fullid, job.output_data) fastr.log.error(error_message) for output in self.outputs.values(): # No Errors and No samples in output if not failed_annotation and self.blocking: fastr.log.debug('>>>> >>>> No Errors and No samples in output in sample[{};{}]'.format(sample_id, sample_index)) if not in job.output_data: # There is not data, skip this output, if this was a problem, # a failure should have been detected anyways, but probably it # was a non-required output continue output_data = job.output_data[] fastr.log.debug('Setting data for blocking node: {} sample: {} with annotation: {}'.format(output.fullid, sample_id, failed_annotation)) output_values = tuple(job.get_deferred(, c) for c in range(len(output_data))) fastr.log.debug('Setting collected for {} sample_id {} data: {}'.format(output.fullid, sample_id, output_values)) output[sample_id, sample_index] = SampleItem(sample_index, sample_id, OrderedDict({0: tuple(output_values)}), {job}, failed_annotation) # Errors and no samples elif failed_annotation and self.blocking: output_values = (job.get_deferred(, 0),) fastr.log.debug('Setting data for blocking node: {} sample: {} with annotation: {}'.format(output.fullid, sample_id, failed_annotation)) fastr.log.debug('>>>> >>>> Errors and No samples in output in sample[{};{}]'.format(sample_id, sample_index)) output[sample_id, sample_index] = SampleItem(sample_index, sample_id, OrderedDict({0: tuple(output_values)}), {job}, failed_annotation) fastr.log.debug('$$ new annotation: {}'.format(output[sample_index].failed_annotations)) # Errors and samples elif failed_annotation and not self.blocking: fastr.log.debug('>>>> >>>> Errors and samples in output in sample[{};{}]'.format(sample_id, sample_index)) output[sample_index].failed_annotations.update(failed_annotation) else: fastr.log.debug(">>>> >>>> No errors and samples in output in sample[{};{}]".format(sample_id, sample_index))[sample_id] = job
[docs] def create_job(self, sample_id, sample_index, job_data, job_dependencies, **kwargs): """ Create a job based on the sample id, job data and job dependencies. :param sample_id: the id of the corresponding sample :type sample_id: :py:class:`SampleId <fastr.core.sampleidlist.SampleId>` :param sample_index: the index of the corresponding sample :type sample_index: :py:class:`SampleIndex <fastr.core.sampleidlist.SampleIndex>` :param dict job_data: dictionary containing all input data for the job :param job_dependencies: other jobs that need to finish before this job can run :return: the created job :rtype: :py:class:`Job <fastr.execution.job.Job>` """'Creating job for node {} sample id {!r}, index {!r}'.format(self.global_id, sample_id, sample_index)) fastr.log.debug('Creating job for sample {} with data {}'.format(sample_id, job_data)) # Get the arguments input_arguments, output_arguments = self._wrap_arguments(job_data, sample_id, sample_index) preferred_types = { output.preferred_types for output in self.outputs.values()} job = self._JobType(node=self, sample_id=sample_id, sample_index=sample_index, input_arguments=input_arguments, output_arguments=output_arguments, hold_jobs=job_dependencies, status_callback=self.parent.job_status_callback, preferred_types=preferred_types, **kwargs)[sample_id] = job # Check which outputs are required or connected and set them if not self.blocking: for output in self.outputs.itervalues(): # Not that this always has to happen, as we need the samples # to possibly annotate errors later, even if the output will # not be used later because of a lack of listeners fastr.log.debug('Preparing output {}'.format( fastr.log.debug('Cardinality request: spec: {}, job_data: {}, and index: {}'.format(output.cardinality_spec, job_data, sample_index)) cardinality = output.cardinality(sample_index, job_data) fastr.log.debug('Cardinality for {} is {}'.format(, cardinality)) if not isinstance(cardinality, int): message = 'For execution cardinality should be an int, for output ' \ '{} we found {} (type {})'.format(, cardinality, type(cardinality).__name__) fastr.log.critical(message) raise exceptions.FastrTypeError(message) value = tuple(job.get_deferred(, cardinality_nr) for cardinality_nr in range(cardinality)) output[sample_id] = SampleItem(sample_index, sample_id, {0: value}, {job}) else: fastr.log.debug('Cannot determine blocking node output a priori! Needs to be collected afterwards!') return job
def _wrap_arguments(self, job_data, sample_id, sample_index): """ Wrap arguments into a list of tuples that the execution script can parse :param dict job_data: dictionary containing all input data for the job :param sample_id: the id of the corresponding sample :type sample_id: :py:class:`SampleId <fastr.core.sampleidlist.SampleId>` :return: the wrapped arguments in a tuple with the form ``(inputs, outputs)`` """ arguments = ({}, {}) # format is (input_args, output_args) for key, input_ in self.inputs.items(): # Skip inputs that have no data if job_data[key] is None: if input_.default is not None: # Insert the default data if present job_data[key] = [input_.datatype(input_.default)] elif input_.required: fastr.log.debug('Job data: {}'.format(job_data)) raise exceptions.FastrValueError('NodeRun "{}" is missing data for required Input "{}"'.format(, else: continue arguments[0][key] = job_data[key] for key, output in self.outputs.items(): if not output.automatic: cardinality = output.cardinality(key=sample_index, job_data=job_data) else: cardinality = None if output.required or len(output.listeners) > 0: requested = True else: requested = False fastr.log.debug('Cardinality to be used: {}'.format(cardinality)) arguments[1][key] = {'id': key, 'cardinality': cardinality if cardinality is not None else output._output_cardinality, 'datatype':, 'requested': requested} return arguments
[docs] def get_sourced_nodes(self): """ A list of all Nodes connected as sources to this NodeRun :return: list of all nodes that are connected to an input of this node """ return list(set([n for input_ in self.inputs.values() for n in input_.get_sourced_nodes()]))
[docs] def find_source_index(self, target_index, target, source): # If there are multiple input groups, select only part of index from # the inputgroup which source belongs to if len(self.input_groups) > 1: input_groups = self.input_groups mask = [True if == source.input_group else False for ig in input_groups.values() for _ in ig.size] target_index = tuple(k for k, m in zip(target_index, mask) if m) # Delegate to InputGroup to check mixing within InputGroup return self.input_groups[source.input_group].find_source_index(target_size=target.size, target_dimnames=target.dimnames, source_size=source.size, source_dimnames=source.dimnames, target_index=target_index)
def _update(self, key, forward=True, backward=False): """ Update the NodeRun information and validity of the NodeRun and propagate the update downstream. Updates inputs, input groups, outputsize and outputs. A NodeRun is valid if: * All Inputs are valid (see :py:meth:`Input.update <fastr.core.inputoutput.Input.update>`) * All InputGroups are non-zero sized """ # Make sure the Inputs and input groups are up to date # fastr.log.debug('Update {} passing {} {}'.format(key, type(self).__name__, if backward: for sourced_node in self.get_sourced_nodes(): sourced_node.update(key, False, backward) for input_ in self.inputs.values(): input_.update(key, forward, backward) self.update_input_groups() self.input_group_combiner.update() # Update own status valid = True messages = [] for id_, input_ in self.inputs.items(): if not input_.valid: valid = False for message in input_.messages: messages.append('[{}] Input {} is not valid: {}'.format(,, message)) for input_group in self.input_groups.values(): if input_group.empty: valid = False messages.append('[{}] InputGroup {} is empty'.format(, for id_, output in self.outputs.items(): if output.resulting_datatype is not None: if not issubclass(output.resulting_datatype, DataType): valid = False messages.append('[{}] Output {} cannot determine the Output DataType (got {}), please specify a ' 'valid DataType or add casts to the Links'.format(, id_, output.resulting_datatype)) self._status['valid'] = valid self._status['messages'] = messages # Update all outputs for output in self.outputs.values(): output.update(key, forward, backward) # Update all downstream listeners if forward: for listener in self.listeners: listener.update(key, forward, False)
node_run_mapping['Node'] = NodeRun