# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from collections import OrderedDict
import sympy
from .flownoderun import FlowNodeRun
from .inputoutputrun import SourceOutputRun
from .job import SourceJob, JobState
from .. import exceptions
from ..core import vfs_plugin as vfs
from ..core.samples import SampleId, SampleItem, SampleIndex
from ..data import url
from ..datatypes import DataType, types
from ..helpers import log
__all__ = ['SourceNodeRun', 'ConstantNodeRun']
[docs]class SourceNodeRun(FlowNodeRun):
"""
Class providing a connection to data resources. This can be any kind of
file, stream, database, etc from which data can be received.
"""
__dataschemafile__ = 'SourceNodeRun.schema.json'
_OutputType = SourceOutputRun
_JobType = SourceJob
[docs] def __init__(self, node, parent):
""" Instantiation of the SourceNodeRun.
:param fastr.planning.node.Node node: The Node that this Run is based on.
:param NetworkRun parent: The NetworkRun that this NodeRun belongs to
:return: newly created sink node run
"""
super(SourceNodeRun, self).__init__(node, parent)
# Set the DataType
self._input_data = None
self._outputsize = None
self.outputsize = 'N_{}'.format(self.id)
[docs] def __eq__(self, other):
"""Compare two Node instances with each other. This function ignores
the parent and update status, but tests rest of the dict for equality.
equality
:param other: the other instances to compare to
:type other: NodeRun
:returns: True if equal, False otherwise
"""
if not isinstance(other, SourceNodeRun):
return NotImplemented
dict_self = dict(vars(self))
del dict_self['_parent']
del dict_self['_status']
del dict_self['_input_groups']
del dict_self['_input_data']
del dict_self['_outputsize']
del dict_self['input_group_combiner']
dict_other = dict(vars(other))
del dict_other['_parent']
del dict_other['_status']
del dict_other['_input_groups']
del dict_other['_input_data']
del dict_other['_outputsize']
del dict_other['input_group_combiner']
return dict_self == dict_other
[docs] def __getstate__(self):
"""
Retrieve the state of the SourceNodeRun
:return: the state of the object
:rtype dict:
"""
state = super(SourceNodeRun, self).__getstate__()
return state
[docs] def __setstate__(self, state):
"""
Set the state of the SourceNodeRun by the given state.
:param dict state: The state to populate the object with
:return: None
"""
super(SourceNodeRun, self).__setstate__(state)
self._input_data = None
self._outputsize = None
self.outputsize = 'N_{}'.format(self.id)
@property
def datatype(self):
"""
The datatype of the data this source supplies.
"""
return self.outputs['output'].datatype
@property
def sourcegroup(self):
log.warning('[DEPRECATED] The sourcegroup property of the'
' SourceNodeRun is deprecated and replaced by the'
' nodegroup property of the NodeRun. Please use that'
' property instead, it will have the same'
' functionality')
return self.nodegroup
@property
def dimnames(self):
"""
Names of the dimensions in the SourceNodeRun output. These will be reflected
in the SampleIdLists.
"""
if self.nodegroup is not None:
return self.nodegroup,
else:
return self.id,
@property
def output(self):
"""
Shorthand for ``self.outputs['output']``
"""
return self.outputs['output']
@property
def outputsize(self):
"""
The size of output of this SourceNodeRun
"""
return self._outputsize
@outputsize.setter
def outputsize(self, value):
# it seems pylint does not realize this is part of a property
# pylint: disable=arguments-differ
if isinstance(value, str):
self._outputsize = (sympy.symbols(value),)
elif isinstance(value, int):
self._outputsize = (value,)
else:
try:
self._outputsize = [x if isinstance(x, int) else sympy.symbols(x.replace(' ', '_')) for x in value]
except TypeError:
raise exceptions.FastrTypeError('Not a valid input type')
@property
def valid(self):
"""
This does nothing. It only overloads the valid method of NodeRun().
The original is intended to check if the inputs are connected to
some output. Since this class does not implement inputs, it is skipped.
"""
return True
[docs] def set_data(self, data, ids=None):
"""
Set the data of this source node.
:param data: the data to use
:type data: dict, OrderedDict or list of urls
:param ids: if data is a list, a list of accompanying ids
"""
self._input_data = OrderedDict()
# Check if data has key or generate keys
log.debug('Storing {} (ids {}) in {}'.format(data, ids, self.fullid))
if isinstance(data, OrderedDict):
ids = [SampleId(x) for x in data.keys()]
data = list(data.values())
elif isinstance(data, dict):
# Have data sorted on ids
ids, data = list(zip(*sorted(data.items())))
ids = [SampleId(x) for x in ids]
elif isinstance(data, list):
if ids is None:
ids = [SampleId('id_{}'.format(k)) for k in range(len(data))]
elif not isinstance(ids, list):
raise exceptions.FastrTypeError('Invalid type! The ids argument should be a list that matches the data samples!')
elif isinstance(data, tuple):
# A single sample with cardinality
ids = [SampleId('id_0')]
data = [data]
else:
if isinstance(data, set):
log.warning('Source data for {} is given as a set,'.format(self.fullid) +
' this is most probably a mistake and a list or dict should'
' be used instead')
ids = [SampleId('id_0')]
data = [data]
log.debug('Set data in {} with {} (Type {})'.format(self.id, data, self.datatype))
for key, value in zip(ids, data):
if isinstance(value, tuple):
self._input_data[key] = tuple(x if self.datatype.isinstance(x) else str(x) for x in value)
else:
self._input_data[key] = (value if self.datatype.isinstance(value) else str(value)),
log.debug('Result {}: {} (Type {})'.format(key, self._input_data[key], type(self._input_data[key]).__name__))
self.outputsize = len(self._input_data),
[docs] def execute(self):
"""
Execute the source node and create the jobs that need to run
:return: list of jobs to run
:rtype: list of :py:class:`Jobs <fastr.execution.job.Job>`
"""
self.update(False, False)
if not self.valid or self._input_data is None:
msg = 'Cannot executed a SourceNodeRun that is not valid! Messages:\n{}'.format('\n'.join(self.messages))
log.error(msg)
raise exceptions.FastrValueError(msg)
joblist = []
for index, (sample_id, value) in enumerate(self._input_data.items()):
sample_index = SampleIndex(index)
if all(not url.isurl(x) for x in value):
# A simple string should not be send to IOPlugin for procesing
log.debug('No job needed for sample {} at {}'.format(sample_id, self.fullid))
self.jobs[sample_id] = None
output_value = []
for subvalue in value:
# it appears pylint does not realize that self.datatype is a class
# pylint: disable=not-callable
if self.datatype.isinstance(subvalue):
output_value.append(subvalue)
else:
output_value.append(self.datatype(subvalue))
self.outputs['output'][sample_id, sample_index + (0,)] = SampleItem(sample_index + (0,),
sample_id,
{0: tuple(output_value)},
set())
# Broadcast fake job update?
job = self.create_job(sample_id,
sample_index,
job_data={'input': value},
job_dependencies=None)
job.status = JobState.finished
else:
# We found an URL, should be
log.debug('Spawning job for sample {} at {}'.format(sample_id, self.fullid))
joblist.append(self.create_job(sample_id, sample_index, {'input': value}, []))
self.drained = True
yield joblist
[docs] def create_job(self, sample_id, sample_index, job_data, job_dependencies, **kwargs):
job = super(SourceNodeRun, self).create_job(sample_id, sample_index,
job_data, job_dependencies,
**kwargs)
job._datatype = self.datatype.id
return job
def _wrap_arguments(self, job_data, sample_id, sample_index):
"""
Wrap arguments into a list of tuples that the execution script can parse
:param dict job_data: dictionary containing all input data for the job
:param sample_id: the id of the corresponding sample
:type sample_id: :py:class:`SampleId <fastr.core.sampleidlist.SampleId>`
:return: the wrapped arguments in a tuple with the form ``(inputs, outputs)``
.. note::
For a SourceNodeRun this function adds a few default (hidden) arguments
"""
log.debug('Wrapping SourceNodeRun with {}'.format(job_data))
arguments = super(SourceNodeRun, self)._wrap_arguments(job_data, sample_id, sample_index)
arguments[0]['input'] = job_data['input']
arguments[0]['datatype'] = types['String'](self.datatype.id),
arguments[0]['sample_id'] = types['String'](str(sample_id)),
outputurl = url.join(self.parent.tmpurl, self.id, str(sample_id), 'result')
outputpath = vfs.url_to_path(outputurl)
if not os.path.exists(outputpath):
os.makedirs(outputpath)
arguments[0]['targetdir'] = types['Directory'](outputurl),
return arguments
def _update(self, key, forward=True, backward=False):
"""
Update the NodeRun information and validity of the NodeRun and propagate
the update downstream. Updates inputs, input_groups, outputsize and outputs.
A NodeRun is valid if:
* All Inputs are valid (see :py:meth:`Input.update <fastr.planning.inputoutput.Input.update>`)
* All InputGroups are non-zero sized
"""
# Make sure the Inputs and input groups are up to date
# log.debug('Update {} passing {} {}'.format(key, type(self).__name__, self.id))
for input_ in self.inputs.values():
input_.update(key)
# Update own status
valid = True
messages = []
for id_, input_ in self.inputs.items():
if not input_.valid:
valid = False
for message in input_.messages:
messages.append('Input {} is not valid: {}'.format(id_, message))
for input_group in self.input_groups.values():
if input_group.empty:
valid = False
messages.append('InputGroup {} is empty'.format(input_group.id))
for id_, output in self.outputs.items():
if output.resulting_datatype is not None and not issubclass(output.resulting_datatype, DataType):
valid = False
messages.append(
'Output {} cannot determine the Output DataType (got {}), '
'please specify a valid DataType or add casts to the Links'.format(
id_, output.resulting_datatype))
self._status['valid'] = valid
self._status['messages'] = messages
# Update all downstream listeners
if forward:
for listener in self.listeners:
listener.update(key, forward, backward)
[docs]class ConstantNodeRun(SourceNodeRun):
"""
Class encapsulating one output for which a value can be set. For example
used to set a scalar value to the input of a node.
"""
__dataschemafile__ = 'ConstantNodeRun.schema.json'
[docs] def __init__(self, node, parent):
"""
Instantiation of the ConstantNodeRun.
:param datatype: The datatype of the output.
:param data: the prefilled data to use.
:param id_: The url pattern.
This class should never be instantiated directly (unless you know what
you are doing). Instead create a constant using the network class like
shown in the usage example below.
usage example:
.. code-block:: python
>>> import fastr
>>> network = fastr.Network()
>>> source = network.create_source(datatype=types['ITKImageFile'], id_='sourceN')
or alternatively create a constant node by assigning data to an item in an InputDict:
.. code-block:: python
>>> node_a.inputs['in'] = ['some', 'data']
which automatically creates and links a ConstantNodeRun to the specified Input
"""
super(ConstantNodeRun, self).__init__(node, parent)
self.set_data(node.data)
self._data = node.data
[docs] def __getstate__(self):
"""
Retrieve the state of the ConstantNodeRun
:return: the state of the object
:rtype dict:
"""
state = super(ConstantNodeRun, self).__getstate__()
state['data'] = list(self._data.items())
return state
[docs] def __setstate__(self, state):
"""
Set the state of the ConstantNodeRun by the given state.
:param dict state: The state to populate the object with
:return: None
"""
super(ConstantNodeRun, self).__setstate__(state)
self._data = OrderedDict((SampleId(str(x) for x in key), tuple(str(x) for x in value)) for key, value in state['data'])
self.set_data() # Make sure that the output size etc gets set
[docs] def set_data(self, data=None, ids=None):
"""
Set the data of this constant node in the correct way. This is mainly
for compatibility with the parent class SourceNodeRun
:param data: the data to use
:type data: dict or list of urls
:param ids: if data is a list, a list of accompanying ids
"""
# We have to arguments to match the superclas
# pylint: disable=unused-argument
if data is None and self.data is not None:
self._input_data = self.data
else:
super(ConstantNodeRun, self).set_data(data, ids)
@property
def data(self):
"""
The data stored in this constant node
"""
return self._data
[docs] def execute(self):
"""
Execute the constant node and create the jobs that need to run
:return: list of jobs to run
:rtype: list of :py:class:`Jobs <fastr.execution.job.Job>`
"""
# Make sure the data is set
self.set_data()
# Run as a normal SourceNode
for joblist in super(ConstantNodeRun, self).execute():
yield joblist