# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
import sympy
import fastr
import fastr.exceptions as exceptions
from fastr.core.samples import SampleId, SampleItem, SampleIndex
from fastr.execution import node_run_mapping
from fastr.execution.inputoutputrun import AdvancedFlowOutputRun, OutputRun
from fastr.execution.job import InlineJob, JobState
from fastr.execution.noderun import NodeRun
__all__ = ['FlowNodeRun', 'AdvancedFlowNodeRun']
[docs]class FlowNodeRun(NodeRun):
"""
A Flow NodeRun is a special subclass of Nodes in which the amount of samples
can vary per Output. This allows non-default data flows.
"""
_OutputType = OutputRun
@property
def blocking(self):
"""
A FlowNodeRun is (for the moment) always considered blocking.
:return: True
"""
return True
@property
def outputsize(self):
"""
Size of the outputs in this NodeRun
"""
# Get sizes of all input groups
output_size = []
for input_group in self.input_groups.values():
if input_group.size is not None:
output_size.extend(input_group.size)
else:
return None
output_size.append(sympy.symbols('N_{}'.format(self.id)))
return tuple(output_size)
@property
def dimnames(self):
"""
Names of the dimensions in the NodeRun output. These will be reflected
in the SampleIdList of this NodeRun.
"""
if self.nodegroup is not None:
extra_dim = self.nodegroup
else:
extra_dim = self.id
return super(FlowNodeRun, self).dimnames + (extra_dim,)
[docs] def set_result(self, job, failed_annotation):
"""
Incorporate result of a job into the FlowNodeRun.
:param Type job: job of which the result to store
"""
fastr.log.debug('Job output data: {}'.format(job.output_data))
# Get the main sample index from the Job
sample_index = job.sample_index
for output in self.outputs.values():
if output.id not in job.output_data:
fastr.log.error('Could not find expected data for {} in {}!'.format(output.fullid, job.output_data))
#continue
if failed_annotation:
data = [(job.sample_id, (job.get_deferred(output.id, 0),))]
else:
data = job.output_data[output.id]
fastr.log.debug('output_data = {}'.format(data))
# Make sure dictionary is sorted, can also be list of items
# which will be kept ordered
if isinstance(data, dict):
data = sorted(data.items())
if not all(isinstance(x, (list, tuple)) and len(x) == 2 for x in data):
raise exceptions.FastrValueError('The output data for a FlowNodeRun should be a dictionary or a list of items (length 2 per entry)')
for sample_nr, (sample_id, sample_data) in enumerate(data):
orig_sample_id = sample_id
# Ensure we have a SampleId (cast if need be)
if not isinstance(sample_id, SampleId):
# Make sure sample_id is built from a tuple of str
if isinstance(sample_id, (str, unicode)):
sample_id = (str(sample_id),)
else:
sample_id = tuple(str(x) for x in sample_id)
sample_id = SampleId(sample_id)
fastr.log.debug('Change sample_id from {} ({}) to {} ({})'.format(orig_sample_id,
type(orig_sample_id).__name__,
sample_id,
type(sample_id).__name__))
if len(sample_id) != output.ndims:
sample_id = job.sample_id + sample_id
fastr.log.debug('Updated sample_id to {}'.format(sample_id))
if len(sample_id) != output.ndims:
raise exceptions.FastrValueError('Sample ID {} has the wrong dimensionality!'.format(sample_id))
fastr.log.debug('Setting data for blocking node: {} sample: {}'.format(output.fullid, sample_id))
output_values = tuple(job.get_deferred(output.id,
c,
orig_sample_id) for c, _ in enumerate(sample_data))
fastr.log.debug('Setting collected for {} sample_id {} sample_index {!r} data: {}'.format(output.fullid,
sample_id,
sample_index + (sample_nr),
output_values))
# Save with sample_index and sample nr in the extra dimension
output[sample_id, sample_index + (sample_nr)] = SampleItem(sample_index + (sample_nr),
sample_id,
OrderedDict({0: tuple(output_values)}),
{job},
failed_annotation)
# Register the samples parent job
self.jobs[sample_id] = job
[docs]class AdvancedFlowNodeRun(FlowNodeRun):
_OutputType = AdvancedFlowOutputRun
_JobType = InlineJob
[docs] def execute(self):
"""
Execute the node and create the jobs that need to run
:return: list of jobs to run
:rtype: list of :py:class:`Jobs <fastr.execution.job.Job>`
"""
self.update(False, False)
# Make sure a NodeRun is valid
if not self.valid:
message = 'NodeRun {} is not valid'.format(self.fullid)
fastr.log.error(message)
fastr.log.error('Messages:\n{}'.format('\n'.join(self.messages)))
raise exceptions.FastrNodeNotValidError(message)
input_groups = self.input_groups
# Prepare the output of the NodeRun
fastr.log.debug('InputGroups: {}'.format(input_groups.values()))
fastr.log.debug('Inputs: {}'.format([x for ig in input_groups.values() for x in ig.values()]))
fastr.log.debug('Sources: {}'.format([x.source for ig in input_groups.values() for x in ig.values()]))
data = {x.id: x.items() for x in self.inputs.values()}
target = self.tool.target
job = self.create_job(SampleId('FLOW'),
SampleIndex(0),
job_data=data,
job_dependencies=None)
with target:
result = self.tool.interface.execute(target, data)
job.flow_data = result.result_data
output_data = {key: {str(v.id): v.data.sequence_part() for k, v in value.items()} for key, value in result.result_data.items()}
job.output_data = output_data
job.status = JobState.execution_done
job.write()
yield [job]
[docs] def set_result(self, job, failed_annotation):
for output, data in job.flow_data.items():
fastr.log.debug('Advanced flow for output: {}'.format(output))
for (sample_index, sample_id), value in data.items():
fastr.log.debug('Advanced flow sample {!r} -> {}'.format(sample_index, list(value.data)))
output_values = tuple(job.get_deferred(output,
c,
sample_id) for c, _ in enumerate(value.data))
fastr.log.debug('Setting collected for {} sample_id {!r} sample_index {!r} data: {}'.format(output,
sample_id,
sample_index,
output_values))
# Save with sample_index and sample nr in the extra dimension
self.outputs[output][sample_index] = SampleItem(value.index,
value.id,
OrderedDict({0: tuple(output_values)}),
{job},
failed_annotation)
self.jobs['FLOW'] = job
node_run_mapping['FlowNode'] = FlowNodeRun
node_run_mapping['AdvancedFlowNode'] = AdvancedFlowNodeRun