Source code for fastr.execution.flownoderun

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from collections import OrderedDict

import sympy

import fastr
import fastr.exceptions as exceptions
from fastr.core.samples import SampleId, SampleItem, SampleIndex
from fastr.execution import node_run_mapping
from fastr.execution.inputoutputrun import AdvancedFlowOutputRun, OutputRun
from fastr.execution.job import InlineJob, JobState
from fastr.execution.noderun import NodeRun

__all__ = ['FlowNodeRun', 'AdvancedFlowNodeRun']


[docs]class FlowNodeRun(NodeRun): """ A Flow NodeRun is a special subclass of Nodes in which the amount of samples can vary per Output. This allows non-default data flows. """ _OutputType = OutputRun @property def blocking(self): """ A FlowNodeRun is (for the moment) always considered blocking. :return: True """ return True @property def outputsize(self): """ Size of the outputs in this NodeRun """ # Get sizes of all input groups output_size = [] for input_group in self.input_groups.values(): if input_group.size is not None: output_size.extend(input_group.size) else: return None output_size.append(sympy.symbols('N_{}'.format(self.id))) return tuple(output_size) @property def dimnames(self): """ Names of the dimensions in the NodeRun output. These will be reflected in the SampleIdList of this NodeRun. """ if self.nodegroup is not None: extra_dim = self.nodegroup else: extra_dim = self.id return super(FlowNodeRun, self).dimnames + (extra_dim,)
[docs] def set_result(self, job, failed_annotation): """ Incorporate result of a job into the FlowNodeRun. :param Type job: job of which the result to store """ fastr.log.debug('Job output data: {}'.format(job.output_data)) # Get the main sample index from the Job sample_index = job.sample_index for output in self.outputs.values(): if output.id not in job.output_data: fastr.log.error('Could not find expected data for {} in {}!'.format(output.fullid, job.output_data)) #continue if failed_annotation: data = [(job.sample_id, (job.get_deferred(output.id, 0),))] else: data = job.output_data[output.id] fastr.log.debug('output_data = {}'.format(data)) # Make sure dictionary is sorted, can also be list of items # which will be kept ordered if isinstance(data, dict): data = sorted(data.items()) if not all(isinstance(x, (list, tuple)) and len(x) == 2 for x in data): raise exceptions.FastrValueError('The output data for a FlowNodeRun should be a dictionary or a list of items (length 2 per entry)') for sample_nr, (sample_id, sample_data) in enumerate(data): orig_sample_id = sample_id # Ensure we have a SampleId (cast if need be) if not isinstance(sample_id, SampleId): # Make sure sample_id is built from a tuple of str if isinstance(sample_id, (str, unicode)): sample_id = (str(sample_id),) else: sample_id = tuple(str(x) for x in sample_id) sample_id = SampleId(sample_id) fastr.log.debug('Change sample_id from {} ({}) to {} ({})'.format(orig_sample_id, type(orig_sample_id).__name__, sample_id, type(sample_id).__name__)) if len(sample_id) != output.ndims: sample_id = job.sample_id + sample_id fastr.log.debug('Updated sample_id to {}'.format(sample_id)) if len(sample_id) != output.ndims: raise exceptions.FastrValueError('Sample ID {} has the wrong dimensionality!'.format(sample_id)) fastr.log.debug('Setting data for blocking node: {} sample: {}'.format(output.fullid, sample_id)) output_values = tuple(job.get_deferred(output.id, c, orig_sample_id) for c, _ in enumerate(sample_data)) fastr.log.debug('Setting collected for {} sample_id {} sample_index {!r} data: {}'.format(output.fullid, sample_id, sample_index + (sample_nr), output_values)) # Save with sample_index and sample nr in the extra dimension output[sample_id, sample_index + (sample_nr)] = SampleItem(sample_index + (sample_nr), sample_id, OrderedDict({0: tuple(output_values)}), {job}, failed_annotation) # Register the samples parent job self.jobs[sample_id] = job
[docs]class AdvancedFlowNodeRun(FlowNodeRun): _OutputType = AdvancedFlowOutputRun _JobType = InlineJob
[docs] def execute(self): """ Execute the node and create the jobs that need to run :return: list of jobs to run :rtype: list of :py:class:`Jobs <fastr.execution.job.Job>` """ self.update(False, False) # Make sure a NodeRun is valid if not self.valid: message = 'NodeRun {} is not valid'.format(self.fullid) fastr.log.error(message) fastr.log.error('Messages:\n{}'.format('\n'.join(self.messages))) raise exceptions.FastrNodeNotValidError(message) input_groups = self.input_groups # Prepare the output of the NodeRun fastr.log.debug('InputGroups: {}'.format(input_groups.values())) fastr.log.debug('Inputs: {}'.format([x for ig in input_groups.values() for x in ig.values()])) fastr.log.debug('Sources: {}'.format([x.source for ig in input_groups.values() for x in ig.values()])) data = {x.id: x.items() for x in self.inputs.values()} target = self.tool.target job = self.create_job(SampleId('FLOW'), SampleIndex(0), job_data=data, job_dependencies=None) with target: result = self.tool.interface.execute(target, data) job.flow_data = result.result_data output_data = {key: {str(v.id): v.data.sequence_part() for k, v in value.items()} for key, value in result.result_data.items()} job.output_data = output_data job.status = JobState.execution_done job.write() yield [job]
[docs] def set_result(self, job, failed_annotation): for output, data in job.flow_data.items(): fastr.log.debug('Advanced flow for output: {}'.format(output)) for (sample_index, sample_id), value in data.items(): fastr.log.debug('Advanced flow sample {!r} -> {}'.format(sample_index, list(value.data))) output_values = tuple(job.get_deferred(output, c, sample_id) for c, _ in enumerate(value.data)) fastr.log.debug('Setting collected for {} sample_id {!r} sample_index {!r} data: {}'.format(output, sample_id, sample_index, output_values)) # Save with sample_index and sample nr in the extra dimension self.outputs[output][sample_index] = SampleItem(value.index, value.id, OrderedDict({0: tuple(output_values)}), {job}, failed_annotation) self.jobs['FLOW'] = job
node_run_mapping['FlowNode'] = FlowNodeRun node_run_mapping['AdvancedFlowNode'] = AdvancedFlowNodeRun