Source code for fastr.execution.job

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains the Job class and some related classes.
"""

from collections import OrderedDict
import datetime
import gzip
import os
try:
    import cPickle as pickle
except ImportError:
    import pickle
import re
import urlparse

from enum import Enum

import fastr
from fastr.core.provenance import Provenance
from fastr.core.samples import SampleItem
from fastr.core.serializable import Serializable
from fastr.data import url
from fastr.datatypes import URLType, DataType, Deferred
import fastr.exceptions as exceptions


try:
    from fastr.execution.environmentmodules import EnvironmentModules
    ENVIRONMENT_MODULES = EnvironmentModules(fastr.config.protected_modules)
    ENVIRONMENT_MODULES_LOADED = True
except exceptions.FastrValueError:
    ENVIRONMENT_MODULES_LOADED = False


[docs]class JobState(Enum): """ The possible states a Job can be in. An overview of the states and the adviced transitions are depicted in the following figure: .. graphviz:: digraph jobstate { nonexistent [shape=box]; created [shape=box]; queued [shape=box]; hold [shape=box]; running [shape=box]; execution_done [shape=box]; execution_failed [shape=box]; processing_callback [shape=box]; finished [shape=box]; failed [shape=box]; cancelled [shape=box]; nonexistent -> created; created -> queued; created -> hold; hold -> queued; queued -> running; running -> execution_done; running -> execution_failed; execution_done -> processing_callback; execution_failed -> processing_callback; processing_callback -> finished; processing_callback -> failed; running -> cancelled; queued -> cancelled; hold -> cancelled; } """ nonexistent = ('nonexistent', 'idle', False) created = ('created', 'idle', False) queued = ('queued', 'idle', False) hold = ('hold', 'idle', False) running = ('running', 'in_progress', False) execution_done = ('execution_done', 'in_progress', False) execution_failed = ('execution_failed', 'in_progress', True) processing_callback = ('processing_callback', 'in_progress', False) finished = ('finished', 'done', False) failed = ('failed', 'done', True) cancelled = ('cancelled', 'done', True)
[docs] def __init__(self, _, stage, error): self.stage = stage self.error = error
@property def done(self): return self.stage == 'done' @property def in_progress(self): return self.stage == 'in_progress'
[docs]class Job(Serializable): """Class describing a job. Arguments: tool_name - the name of the tool (str) tool_version - the version of the tool (Version) argument - the arguments used when calling the tool (list) tmpdir - temporary directory to use to store output data hold_jobs - list of jobs that need to finished before this job can run (list) """ # Constants for file names related to jobs COMMAND_DUMP = '__fastr_command__.pickle.gz' RESULT_DUMP = '__fastr_result__.pickle.gz' STDOUT_DUMP = '__fastr_stdout__.txt' STDERR_DUMP = '__fastr_stderr__.txt'
[docs] def __init__(self, jobid, tool_name, tool_version, nodeid, sample_id, sample_index, input_arguments, output_arguments, tmpdir, hold_jobs=None, timestamp=None, cores=None, memory=None, walltime=None, status_callback=None, preferred_types=None): """ Create a job :param str jobid: the job id :param str tool_name: the id of the tool :param fastr.core.version.Version tool_version: the version of the tool :param str nodeid: the id of the creating node :param fastr.core.samples.SampleId sample_id: the id of the sample :param list[dict] input_arguments: the argument list :param list[dict] output_arguments: the argument list :param str tmpdir: the workdir for this job :param list[str] hold_jobs: the jobs on which this jobs depend :param datetime.datetime timestamp: the time this job was spawned :param int cores: number of cores this jobs is allowed consume :param str memory: max amount of memory that this job is allowed to consume :param str walltime: max amount of time this job is allowed to run :return: """ self.jobid = jobid self.tool_name = tool_name self.nodeid = nodeid self.tool_version = str(tool_version) self.input_arguments = input_arguments self.output_arguments = output_arguments self.tmpdir = tmpdir self.sample_id = sample_id self.sample_index = sample_index self._required_cores = None self._required_memory = None self._required_time = None self.required_cores = cores self.required_memory = memory self.required_time = walltime self.translated_values = {} self.status_callback = status_callback self.preferred_types = preferred_types if preferred_types else {} # Some fields for provenance self.job_activity = None self.tool_agent = None self.node_agent = None self.network_agent = None if isinstance(hold_jobs, (set, list, tuple)): self.hold_jobs = list(hold_jobs) elif isinstance(hold_jobs, str): self.hold_jobs = [hold_jobs] elif hold_jobs is None: self.hold_jobs = [] else: raise exceptions.FastrTypeError('Cannot create jobs: hold_jobs has invalid type!') if timestamp is not None: self.timestamp = timestamp else: self.timestamp = datetime.datetime.now() self.info_store = {'jobid': self.jobid, 'dependencies': self.hold_jobs, 'client_tool': {'id': self.tool_name, 'version': str(self.tool_version)}, 'status': [JobState.nonexistent], 'errors': [], 'output_hash': {}} self.status = JobState.created self.provenance = Provenance(self) self._init_provenance() # Dictionary where the output data will be stored self.output_data = {}
[docs] def __getstate__(self): """ Get the state of the job :return: job state :rtype: dict """ state = {k: v for k, v in self.__dict__.items()} del state['status_callback'] return state
[docs] def __setstate__(self, state): """ Set the state of the job :param dict state: """ self.status_callback = None self.__dict__.update(state)
def _init_provenance(self): """ Create initial provenance document """ self.job_activity = self.provenance.activity(self.provenance.job[self.jobid]) self.tool_agent = self.provenance.agent( self.provenance.tool["{}/{}".format(self.tool_name, self.tool_version)], { 'fastrinfo:tool_dump': fastr.toollist[self.tool_name, self.tool_version].dumps() } ) self.node_agent = self.provenance.agent(self.provenance.node[self.nodeid]) #FIXME: Find a better way to collect the current network id self.network_agent = self.provenance.agent( self.provenance.network["{}/{}".format(fastr.current_network.id, fastr.current_network.version)], { 'fastrinfo:network_dump': fastr.current_network.dumps() } ) self.provenance.document.actedOnBehalfOf(self.network_agent, self.provenance.fastr_agent) self.provenance.document.actedOnBehalfOf(self.node_agent, self.tool_agent) self.provenance.document.actedOnBehalfOf(self.node_agent, self.network_agent) self.provenance.document.wasAssociatedWith(self.job_activity, self.node_agent) def _collect_provenance(self): """ Collect the provenance for this job """ tool = self.tool for input_argument_key, input_argument_value in self.input_arguments.items(): input_description = tool.inputs[input_argument_key] if input_description.hidden: # Skip hidden inputs (they are used for the system to feed # additional data to sources/sinks etc) not interesting # for provenance continue for cardinality, value in enumerate(input_argument_value.data.iterelements()): url = value.data_uri parent_provenance = self._get_parent_provenance(value) if isinstance(parent_provenance, Provenance): self.provenance.document.update(parent_provenance.document) data_entity = self.provenance.entity(self.provenance.data[url], { 'fastrinfo:value': value.value }) self.provenance.document.wasGeneratedBy(data_entity, parent_provenance.parent.job_activity) else: data_entity = self.provenance.entity(self.provenance.data[url]) self.provenance.document.used(self.job_activity, data_entity) self.provenance.document._records = list(set(self.provenance.document._records)) @staticmethod def _get_parent_provenance(value_url): """ Find the provenace of the parent job :param str value_url: url for the value for which to find the job :return: the provenance of the job that created the value """ if isinstance(value_url, Deferred): return value_url.provenance else: return None
[docs] def get_result(self): """ Get the result of the job if it is available. Load the output file if found and check if the job matches the current object. If so, load and return the result. :returns: Job after execution or None if not available :rtype: Job | None """ if not os.path.exists(self.logfile): return None try: with gzip.open(self.logfile) as fin: result = pickle.load(fin) except (IOError, EOFError): # Errors loading pickle or gzip stream return None if not isinstance(result, Job): fastr.log.debug('Result is not valid Job! (found {})'.format(type(result).__name__)) return None if result.status != JobState.execution_done: fastr.log.debug('Result status is wrong ({})'.format(result.status)) return None if result.jobid != self.jobid: fastr.log.debug('Result job id is wrong ({})'.format(result.jobid)) return None if result.tool_name != self.tool_name: fastr.log.debug('Result tool name is wrong ({})'.format(result.tool_name)) return None if result.tool_version != self.tool_version: fastr.log.debug('Result tool version is wrong ({})'.format(result.tool_version)) return None if result.sample_id != self.sample_id: fastr.log.debug('Result sample id is wrong ({})'.format(result.sample_id)) return None result_payload = result.create_payload() if result_payload != self.create_payload(): fastr.log.debug('Result payload is wrong ({})'.format(result_payload)) return None return result
[docs] def __repr__(self): """ String representation of the Job """ return '<Job\n id={job.jobid}\n tool={job.tool_name} {job.tool_version}\n tmpdir={job.tmpdir}/>'.format(job=self)
@property def status(self): """ The status of the job """ return self.info_store['status'][-1] @status.setter def status(self, status): """ Set the status of a job :param status: new status """ if not isinstance(status, JobState): raise exceptions.FastrTypeError('Job status should be of class JobState, found [{}] {}'.format(type(status).__name__, status)) if self.status != status: self.info_store['status'].append(status) if self.status_callback is not None: self.status_callback(self) @property def id(self): """ The id of this job """ return self.jobid @property def fullid(self): """ The full id of the job """ return self.jobid @property def commandurl(self): """ The url of the command pickle """ return url.join(self.tmpdir, self.COMMAND_DUMP) @property def logurl(self): """ The url of the result pickle """ return url.join(self.tmpdir, self.RESULT_DUMP) @property def stdouturl(self): """ The url where the stdout text is saved """ return url.join(self.tmpdir, self.STDOUT_DUMP) @property def stderrurl(self): """ The url where the stderr text is saved """ return url.join(self.tmpdir, self.STDERR_DUMP) @property def commandfile(self): """ The path of the command pickle """ return fastr.vfs.url_to_path(self.commandurl) @property def logfile(self): """ The path of the result pickle """ return fastr.vfs.url_to_path(self.logurl) @property def stdoutfile(self): """ The path where the stdout text is saved """ return fastr.vfs.url_to_path(self.stdouturl) @property def stderrfile(self): """ The path where the stderr text is saved """ return fastr.vfs.url_to_path(self.stderrurl) @property def required_cores(self): """ Number of required cores """ return self._required_cores @required_cores.setter def required_cores(self, value): """ Number of required cores """ if value is None: self._required_cores = value else: if not isinstance(value, int): raise TypeError('Required number of cores should be an integer or None') if value < 1: raise ValueError('Required number of cores should be above zero ({} < 1)'.format(value)) self._required_cores = value @property def required_memory(self): """ Number of required memory """ return self._required_memory @required_memory.setter def required_memory(self, value): """ Number of required memory """ if value is None: self._required_memory = value else: if isinstance(value, unicode): value = str(value) if not isinstance(value, str): raise TypeError('Required memory should be a str or None (found: {} [{}])'.format(value, type(value).__name__)) if re.match(r'\d+[mMgG]', value) is None: raise ValueError('Required memory should be in the form \\d+[mMgG] (found {})'.format(value)) self._required_memory = value @property def required_time(self): """ Number of required runtime """ return self._required_time @required_time.setter def required_time(self, value): """ Number of required runtime """ if value is None: self._required_time = value else: if isinstance(value, unicode): value = str(value) if not isinstance(value, str): raise TypeError('Required number of cores should be a str or None') if re.match(r'^(\d*:\d*:\d*|\d+)$', value) is None: raise ValueError('Required memory should be in the form HH:MM:SS or MM:SS (found {})'.format(value)) self._required_time = value @property def tool(self): return fastr.toollist[self.tool_name, self.tool_version]
[docs] def create_payload(self): """ Create the payload for this object based on all the input/output arguments :return: the payload :rtype: dict """ tool = self.tool payload = {'inputs': {}, 'outputs': {}} # Fill the payload with the values to use (these should be translated to paths/strings/int etc # Translate all inputs to be in correct form for id_, value in self.input_arguments.items(): argument = tool.inputs[id_] if isinstance(value, SampleItem): if len(value.data.mapping_part()) == 0: value = value.data.sequence_part() elif len(value.data.sequence_part()) == 0: value = value.data.mapping_part() else: raise ValueError('Fastr does not (yet) accept mixed sequence/mapping input!') if not argument.hidden: if isinstance(value, tuple): payload['inputs'][id_] = tuple(self.translate_argument(x) for x in value) else: # Should be ordered dict # FIXME: v is actually a tuple that needs fixing payload['inputs'][id_] = OrderedDict((k, tuple(self.translate_argument(x) for x in v)) for k, v in value.items()) else: if issubclass(fastr.typelist[argument.datatype], URLType): payload['inputs'][id_] = tuple(self.translate_argument(x) for x in value) else: payload['inputs'][id_] = value if len(payload['inputs'][id_]) == 0 and argument.default is not None: payload['inputs'][id_] = (argument.default,) # Create output arguments automatically for id_, spec in self.output_arguments.items(): if isinstance(spec['cardinality'], int): cardinality = spec['cardinality'] else: cardinality = self.calc_cardinality(spec['cardinality'], payload) payload['outputs'][id_] = self.fill_output_argument(tool.outputs[id_], cardinality, spec['datatype']) return payload
@staticmethod
[docs] def calc_cardinality(desc, payload): if desc[0] == 'int': return desc[1] elif desc[0] == 'as': if desc[1] in payload['inputs']: return len(payload['inputs'][desc[1]]) if desc[1] in payload['outputs']: return len(payload['outputs'][desc[1]]) else: raise exceptions.FastrValueError('Cannot determine cardinality from {} (payload {})'.format(desc, payload)) elif desc[0] == 'val': if desc[1] in payload['inputs'] and len(payload['inputs'][desc[1]]) == 1: return int(str(payload['inputs'][desc[1]][0])) if desc[1] in payload['outputs'] and len(payload['outputs'][desc[1]]) == 1: return int(str(payload['outputs'][desc[1]][0])) else: raise exceptions.FastrValueError('Cannot determine cardinality from {} (payload {})'.format(desc, payload)) else: raise exceptions.FastrValueError('Cannot determine cardinality from {} (payload {})'.format(desc, payload))
[docs] def execute(self): """ Execute this job :returns: The result of the execution :rtype: InterFaceResult """ tool = fastr.toollist[self.tool_name, self.tool_version] # Create the payload fastr.log.info('Start executing tool') start = datetime.datetime.now() payload = self.create_payload() end = datetime.datetime.now() fastr.log.info('Finished creating payload in {} seconds'.format((end - start).total_seconds())) # Execute the tool fastr.log.info('Start executing tool') start = datetime.datetime.now() result = tool.execute(payload) end = datetime.datetime.now() fastr.log.info('Finished executing tool in {} seconds'.format((end - start).total_seconds())) self.info_store['process'] = result.log_data fastr.log.info('Start translating results tool') start = datetime.datetime.now() self.output_data = self.translate_results(result.result_data) end = datetime.datetime.now() fastr.log.info('Finished translating results in {} seconds'.format((end - start).total_seconds())) # Collect the provenance for the node self._collect_provenance() if not self.validate_results(payload): raise exceptions.FastrValueError('Output values are not valid!') return result
[docs] def translate_argument(self, value): """ Translate an argument from a URL to an actual path. :param value: value to translate :param datatype: the datatype of the value :return: the translated value """ return self.get_value(value=value)
[docs] def get_output_datatype(self, output_id): """ Get the datatype for a specific output :param str output_id: the id of the output to get the datatype for :return: the requested datatype :rtype: BaseDataType """ output = self.tool.outputs[output_id] datatype = fastr.typelist[output.datatype] # If there are preferred types, match with that if possible if output_id in self.preferred_types and len(self.preferred_types[output_id]) > 0: new_datatype = fastr.typelist.match_types(datatype, preferred=self.preferred_types[output_id]) if new_datatype is not None: fastr.log.info('Found new type (after using preferred): {} -> {}'.format(datatype.id, new_datatype.id)) datatype = new_datatype return datatype
[docs] def translate_results(self, result): """ Translate the results of an interface (using paths etc) to the proper form using URI's instead. :param dict result: the result data of an interface :return: the translated result :rtype: dict """ for key, value in result.items(): datatype = self.get_output_datatype(key) if isinstance(value, dict): for subkey, subvalue in value.items(): new_subvalue = [] for item in subvalue: if not isinstance(item, DataType): item = datatype(str(item)) if isinstance(item, URLType): item.value = fastr.vfs.path_to_url(item.value) new_subvalue.append(item) value[subkey] = tuple(new_subvalue) else: new_value = [] for item in value: if not isinstance(item, DataType): item = datatype(str(item)) if isinstance(item, URLType): item.value = fastr.vfs.path_to_url(item.value) new_value.append(item) result[key] = new_value return result
[docs] def fill_output_argument(self, output_spec, cardinality, desired_type): """ This is an abstract class method. The method should take the argument_dict generated from calling self.get_argument_dict() and turn it into a list of commandline arguments that represent this Input/Output. :param int cardinality: the cardinality for this output (can be non for automatic outputs) :param DataType desired_type: the desired datatype for this output :return: the values for this output :rtype: list """ values = [] if not output_spec.automatic: datatype = fastr.typelist[desired_type] for cardinality_nr in range(cardinality): if datatype.extension is not None: output_url = '{}/{}_{}.{}'.format(self.tmpdir, output_spec.id, cardinality_nr, datatype.extension) else: output_url = '{}/{}_{}'.format(self.tmpdir, output_spec.id, cardinality_nr) # Wrap the output url in the correct DataType fastr.log.info('Wrapping {} in a {}'.format(output_url, datatype)) output_value = datatype(output_url) fastr.log.info('Got {}'.format(output_value)) # Translate to a path and use values.append(self.translate_argument(output_value)) return tuple(values)
@classmethod
[docs] def get_value(cls, value): """ Get a value :param value: the url of the value :param datatype: datatype of the value :return: the retrieved value """ if isinstance(value, Deferred): value = value.target # If the value already has valid datatype, use that and don't guess from scratch if isinstance(value, URLType): datatype = type(value) if url.isurl(str(value)): value = fastr.vfs.url_to_path(value) else: if not os.path.exists(value): raise exceptions.FastrValueError('Found a non-url path ({}) that does not exist!'.format(value)) return datatype(value) elif isinstance(value, DataType): return value else: raise exceptions.FastrTypeError('Arguments should be either a DataType, found {}'.format(type(value).__name__))
[docs] def hash_results(self): """ Create hashes of all output values and store them in the info store """ for output in self.output_arguments.values(): id_ = output['id'] output_value = self.output_data[id_] output_datatype = fastr.typelist[output['datatype']] if isinstance(output_value, list): self.info_store['output_hash'][id_] = [output_datatype(x).checksum() for x in output_value] elif isinstance(output_value, dict): self.info_store['output_hash'][id_] = {} for sample_id, output_val in output_value.items(): self.info_store['output_hash'][id_][sample_id] = [output_datatype(x).checksum() for x in output_val]
[docs] def validate_results(self, payload): """ Validate the results of the Job :return: flag indicating the results are complete and valid """ valid = True for output in self.output_arguments.values(): id_ = output['id'] output_value = self.output_data[id_] if isinstance(output_value, (list, tuple)): if not self._validate_result(output, output_value, payload): message = 'The output "{}" is invalid!'.format(id_) self.info_store['errors'].append(exceptions.FastrOutputValidationError(message).excerpt()) fastr.log.warning(message) valid = False elif isinstance(output_value, dict): for sample_id, output_val in output_value.items(): if not self._validate_result(output, output_val, payload): message = 'The output "{}" for sample "{}" is invalid!'.format(id_, sample_id) self.info_store['errors'].append(exceptions.FastrOutputValidationError(message).excerpt()) fastr.log.warning(message) valid = False else: raise exceptions.FastrTypeError('Output data is not of correct type (expected a list/dict)') return valid
[docs] def write(self): with gzip.open(self.logfile, 'wb') as result_file: result_file.write(pickle.dumps(self))
def _validate_result(self, output, output_value, payload): """ Validate the result for a specific otuput/sample :param output: the output for which to check :param output_value: the value for the output :return: flag indicating if the result is value """ valid = True if output['cardinality'] is not None: if ((isinstance(output['cardinality'], int) and output['cardinality'] != len(output_value)) or (isinstance(output['cardinality'], tuple) and self.calc_cardinality(output['cardinality'], payload) != len(output_value))): message = 'Cardinality mismatch for {} (found {}, expected {})'.format(output['id'], len(output_value), output['cardinality']) self.info_store['errors'].append(exceptions.FastrOutputValidationError(message).excerpt()) fastr.log.warning(message) valid = False for value in output_value: output_datatype = fastr.typelist[output['datatype']] typed_value = output_datatype(value) if not typed_value.valid: message = 'Output value "{}" not valid for datatype "{}"'.format(value, output['datatype']) self.info_store['errors'].append(exceptions.FastrOutputValidationError(message).excerpt()) fastr.log.warning(message) valid = False return valid
[docs]class SinkJob(Job): """ Special SinkJob for the Sink """
[docs] def __init__(self, jobid, tool_name, tool_version, nodeid, sample_id, sample_index, input_arguments, output_arguments, tmpdir, hold_jobs=None, timestamp=None, cores=None, memory=None, walltime=None, substitutions=None, status_callback=None, preferred_types=None): super(SinkJob, self).__init__(jobid=jobid, tool_name=tool_name, tool_version=tool_version, nodeid=nodeid, sample_id=sample_id, sample_index=sample_index, input_arguments=input_arguments, output_arguments=output_arguments, tmpdir=tmpdir, hold_jobs=hold_jobs, timestamp=timestamp, cores=cores, memory=memory, walltime=walltime, status_callback=status_callback, preferred_types=preferred_types) self._substitutions = substitutions if substitutions else {}
[docs] def __repr__(self): """ String representation for the SinkJob """ return '<SinkJob\n id={job.jobid}\n tmpdir={job.tmpdir}/>'.format(job=self)
[docs] def get_result(self): """ Get the result of the job if it is available. Load the output file if found and check if the job matches the current object. If so, load and return the result. :returns: Job after execution """ return None
[docs] def validate_results(self, payload): """ Validate the results of the SinkJob :return: flag indicating the results are complete and valid """ if self.info_store['process']['stderr'] != '': message = 'SinkJob should have an empty stderr, found error messages!\n{}'.format(self.info_store['process']['stderr']) fastr.log.warning(message) self.info_store['errors'].append(exceptions.FastrOutputValidationError(message).excerpt()) return False else: return True
[docs] def create_payload(self): """ Create the payload for this object based on all the input/output arguments :return: the payload :rtype: dict """ payload = super(SinkJob, self).create_payload() fastr.log.info('Temp payload: {}'.format(payload)) payload['inputs']['output'] = (self.substitute(payload['inputs']['output'][0], datatype=type(payload['inputs']['input'][0])),) return payload
[docs] def substitute(self, value, datatype=None): """ Substitute the special fields that can be used in a SinkJob. :param str value: the value to substitute fields in :param BaseDataType datatype: the datatype for the value :return: string with substitutions performed :rtype: str """ if datatype is None: datatype = type(value) subs = dict(self._substitutions) subs['ext'] = '.{}'.format(datatype.extension) if datatype.extension is not None else '' return str(value).format(**subs)
[docs]class SourceJob(Job): """ Special SourceJob for the Source """
[docs] def __repr__(self): """ String representation for the SourceJob """ return '<SourceJob\n id={job.jobid}\n tmpdir={job.tmpdir}/>'.format(job=self)
[docs] def validate_results(self, payload): """ Validate the results of the Job :return: flag indicating the results are complete and valid """ if self.info_store['process']['stderr'] != '': message = 'SourceJob should have an empty stderr, found error messages!' fastr.log.warning(message) self.info_store['errors'].append(exceptions.FastrOutputValidationError(message).excerpt()) return False else: return True
[docs] def get_output_datatype(self, output_id): """ Get the datatype for a specific output :param str output_id: the id of the output to get the datatype for :return: the requested datatype :rtype: BaseDataType """ return fastr.typelist[self._datatype]
[docs]class InlineJob(Job):
[docs] def __init__(self, *args, **kwargs): super(InlineJob, self).__init__(*args, **kwargs) self._collect_provenance()
[docs] def get_result(self): if not os.path.exists(self.logfile): return None try: with gzip.open(self.logfile) as fin: result = pickle.load(fin) except (IOError, EOFError): # Errors loading pickle or gzip stream return None if not isinstance(result, Job): fastr.log.debug('Result is not valid Job! (found {})'.format(type(result).__name__)) return None return result
def _collect_provenance(self): """ Collect the provenance for this job """ tool = self.tool for input_argument_key, input_argument_value in self.input_arguments.items(): input_description = tool.inputs[input_argument_key] if input_description.hidden: # Skip hidden inputs (they are used for the system to feed # additional data to sources/sinks etc) not interesting # for provenance continue for sample in input_argument_value: for cardinality, value in enumerate(sample.data.iterelements()): url = value.data_uri parent_provenance = self._get_parent_provenance(value) if isinstance(parent_provenance, Provenance): self.provenance.document.update(parent_provenance.document) data_entity = self.provenance.entity(self.provenance.data[url], { 'fastrinfo:value': value.value }) self.provenance.document.wasGeneratedBy(data_entity, parent_provenance.parent.job_activity) else: data_entity = self.provenance.entity(self.provenance.data[url]) self.provenance.document.used(self.job_activity, data_entity) self.provenance.document._records = list(set(self.provenance.document._records))