Source code for fastr.execution.networkrun

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Network module containing Network facilitators and analysers.
"""
import datetime
import itertools
import json
import os
import re
import signal
import sys
import threading
import time
import traceback
import urllib.parse
import uuid
from collections import defaultdict
from tempfile import mkdtemp
from threading import Event, Lock
from typing import Optional

import isodate

from .basenoderun import BaseNodeRun
from .inputoutputrun import OutputRun
from .job import JobState
from .linkrun import LinkRun
from .networkanalyzer import DefaultNetworkAnalyzer
from .networkchunker import DefaultNetworkChunker
from .sinknoderun import SinkNodeRun
from .sourcenoderun import ConstantNodeRun, SourceNodeRun
from .. import exceptions, resources
from ..abc.serializable import Serializable
from ..globals import set_current_run
from ..core import vfs_plugin
from ..core.version import Version
from ..data import url
from ..datatypes import types
from ..helpers import config, iohelpers, log
from ..helpers.events import emit_event, EventType
from ..helpers.lockfile import DirectoryLock

__all__ = ['NetworkRun']


[docs]class NetworkRun(Serializable): """ The Network class represents a workflow. This includes all Nodes (including ConstantNodes, SourceNodes and Sinks) and Links. """ NETWORK_DUMP_FILE_NAME = '__fastr_network__.json' SOURCE_DUMP_FILE_NAME = '__source_data__.pickle.gz' SINK_DUMP_FILE_NAME = '__sink_data__.json'
[docs] def __init__(self, network): """ Create a new, empty Network :param str name: name of the Network :return: newly created Network :raises OSError: if the tmp mount in the config is not a writable directory """ # Parent for when the run is part of another run (e.g. MacroNodes are involved) self.parent = None self.thread = None # Set result self.result = None # Basic information about the network the Run is based on, and a copy of the network state self._network = network.__getstate__() self.network_id = network.id self.network_version = network.version self.network_namespace = network.namespace self.network_filename = network.filename network_scope = network.filename if network_scope is not None: network_scope = os.path.dirname(network_scope) try: network_scope = vfs_plugin.path_to_url(network_scope) except exceptions.FastrMountUnknownError: log.warning('Network scope is not in any known mounts, cannot transfer scope to execution nodes!') network_scope = None log.info(f'Detected network scope: {network_scope}') self.network_scope = network_scope self.description = network.description # Info about the execution run self.tracking_id: Optional[str] = None self.timestamp: datetime.datetime = datetime.datetime.now() self.uuid: uuid.UUID = uuid.uuid1() # Preferred types self.preferred_types = list(network.preferred_types) # Recreate nodes self.nodelist = {k: BaseNodeRun.NODE_RUN_MAP.get(type(v).__name__)(v, parent=self) for k, v in network.nodelist.items()} # Set self as parent for node in self.nodelist.values(): node.parent = self # Sink results self.sink_results = {} # Create the link list, make sure all Nodes are in place first self.linklist = {} for link_id, link in list(network.linklist.items()): state = link.__getstate__() # Change the link state to target the contents of this NetworkRun # instead of the Network state['source'] = state['source'].replace(network.fullid, self.fullid) state['target'] = state['target'].replace(network.fullid, self.fullid) # Recreate the link inside the NetworkRun self.linklist[link_id] = LinkRun.createobj(state, self) # Copy step ids self.stepids = {} for key, nodes in network.stepids.items(): self.stepids[key] = [self.nodelist[x.id] for x in nodes] self.callback_edit_lock = Lock() self.executing = False self.execution_lock = Event() self.execution_lock.set() # Check if temp dir exists, if not try to create it if not os.path.exists(config.mounts['tmp']): log.info("fast temporary directory does not exists, creating it...") try: os.mkdir(config.mounts['tmp']) except OSError: message = "Could not create fastr temporary directory ({})".format(config.mounts['tmp']) log.critical(message) raise exceptions.FastrOSError(message) self.tmpdir = None self.tmpurl = None self.tmpdir_lock = None
def _get(self, path): value = self path = path.split('/') for part in path: if hasattr(value, part): value = getattr(value, part) elif hasattr(value, '__getitem__'): if isinstance(value, (list, tuple, OutputRun)): value = value[int(part)] else: value = value[part] else: raise exceptions.FastrLookupError('Could not find {} in {}'.format(part, value)) return value
[docs] def __repr__(self): return '<NetworkRun {})>'.format(self.id)
[docs] def __bool__(self): """ A network run is True if it finish running successfully and False otherwise """ return bool(self.result)
[docs] def __eq__(self, other): """ Compare two Networks and see if they are equal. :param other: :type other: :py:class:`Network <fastr.planning.network.Network>` :return: flag indicating that the Networks are the same :rtype: bool """ if not isinstance(other, NetworkRun): return NotImplemented dict_self = dict(vars(self)) del dict_self['callback_edit_lock'] del dict_self['execution_lock'] del dict_self['_job_finished_callback'] del dict_self['_job_status_callback'] del dict_self['sink_results'] dict_other = dict(vars(other)) del dict_other['callback_edit_lock'] del dict_other['execution_lock'] del dict_other['_job_finished_callback'] del dict_other['_job_status_callback'] del dict_other['sink_results'] return dict_self == dict_other
[docs] def __ne__(self, other): """ Tests for non-equality, this is the negated version __eq__ """ return not (self.__eq__(other))
# Retrieve a Node/Link/Input/Output in the network based on the fullid
[docs] def __getitem__(self, item): """ Get an item by its fullid. The fullid can point to a link, node, input, output or even subinput/suboutput. :param str,unicode item: fullid of the item to retrieve :return: the requested item """ if not isinstance(item, str): raise exceptions.FastrTypeError('Key should be a fullid string, found a {}'.format(type(item).__name__)) parsed = urllib.parse.urlparse(item) if parsed.scheme != 'fastr': raise exceptions.FastrValueError('Item should be an URL with the fastr:// scheme (Found {} in {})'.format(parsed.scheme, item)) path = parsed.path.split('/')[1:] if (len(path) < 2 or path[0] != 'networks' or path[1] != self.network_id or path[2] != str(self.network_version) or path[3] != 'runs' or path[4] != self.id): raise exceptions.FastrValueError( 'URL {} does not point to anything in this network run, {}'.format(item, path) ) value = self for part in path[5:]: tb_string = '' if hasattr(value, '__getitem__'): try: if isinstance(value, (list, tuple, OutputRun)): value = value[int(part)] else: value = value[part] except (KeyError, IndexError, TypeError, ValueError): _, _, exc_traceback = sys.exc_info() tb_string = traceback.format_tb(exc_traceback) else: continue if hasattr(value, part): value = getattr(value, part) else: message = f'Could not find {part} in {value} (path {path})' log.critical(message) if tb_string: log.critical(f'Original __getitem__ traceback:') log.critical('\n'.join(tb_string)) else: log.critical('No __getitem__ available') raise exceptions.FastrLookupError(message) return value
[docs] def __getstate__(self): """ Retrieve the state of the Network :return: the state of the object :rtype dict: """ state = { 'timestamp': datetime.datetime.strftime(self.timestamp, '%Y-%m-%dT%H:%M:%S.%f'), 'result': self.result, 'uuid': str(self.uuid), 'network_id': self.network_id, 'network_version': str(self.network_version), 'network_filename': self.network_filename, 'nodelist': [x.__getstate__() for x in self.nodelist.values()], 'linklist': [x.__getstate__() for x in self.linklist.values()], 'preferred_types': [x.id for x in self.preferred_types], 'tmpdir': self.tmpdir, 'tmpurl': self.tmpurl, 'network': self._network, } return state
[docs] def __setstate__(self, state): """ Set the state of the Network by the given state. This completely overwrites the old state! :param dict state: The state to populate the object with :return: None """ # Basic info self.network = state.pop('_network', None) self.network_id = state.pop('network_id') self.network_version = Version(state.pop('network_version')) self.network_filename = state.pop('network_filename') self.parent = None # Info about the execution run (should be moved to NetworkRun in the future) self.timestamp = datetime.datetime.strptime(state.pop('timestamp'), '%Y-%m-%dT%H:%M:%S.%f') self.uuid = uuid.UUID(state.pop('uuid')) self.tmpdir = state.pop('tmpdir', None) self.tmpurl = state.pop('tmpurl', None) # Initialize nodelists empty to avoid errors further on self.nodelist = {} self.linklist = {} self.preferred_types = [] # Create required locks self.callback_edit_lock = Lock() self.execution_lock = Event() self.execution_lock.set() self.executing = False # These should not be shared between Networks state.pop('callback_edit_lock', None) state.pop('execution_lock', None) # Recreate nodes if 'nodelist' in state: for node_state in state['nodelist']: # Get the node class node_class = node_state.get('class', 'NodeRun') node_class = BaseNodeRun.NODE_RUN_MAP.get(node_class) node = node_class.deserialize(node_state, self) log.debug('Adding node: {}'.format(node)) self.nodelist[node.id] = node node.parent = self del state['nodelist'] # Add preferred types state['preferred_types'] = [types[x] for x in state['preferred_types']] # Insert empty link_list statelinklist = state['linklist'] state['linklist'] = {} # Update the objects dict self.__dict__.update(state) # Create the link list, make sure all Nodes are in place first for link in statelinklist: self.linklist[link['id']] = LinkRun.createobj(link, self)
[docs] def check_id(self, id_): """ Check if an id for an object is valid and unused in the Network. The method will always returns True if it does not raise an exception. :param str id_: the id to check :return: True :raises FastrValueError: if the id is not correctly formatted :raises FastrValueError: if the id is already in use """ regex = r'^\w[\w\d_]*$' if re.match(regex, id_) is None: raise exceptions.FastrValueError('An id in Fastr should follow' ' the following pattern {}' ' (found {})'.format(regex, id_)) if id_ in self.nodelist or id_ in self.linklist: raise exceptions.FastrValueError('The id {} is already in use in {}!'.format(id_, self.id)) return True
@property def id(self): """ The id of the Network. This is a read only property. """ return '{}_{}'.format(self.network_id, datetime.datetime.strftime(self.timestamp, '%Y-%m-%dT%H-%M-%S')) @property def fullid(self): """ The fullid of the Network """ return 'fastr:///networks/{}/{}/runs/{}'.format(self.network_id, self.network_version, self.id) @property def global_id(self): """ The global id of the Network, this is different for networks used in macronodes, as they still have parents. """ if self.parent is None: return self.fullid else: return '{}/network_run'.format(self.parent.global_id) @property def nodegroups(self): """ Give an overview of the nodegroups in the network """ nodegroups = defaultdict(list) for node in self.nodelist.values(): if node.nodegroup is not None: nodegroups[node.nodegroup].append(node) return nodegroups @property def long_id(self): if self.parent is None: return self.network_id else: return '{}_{}_{}'.format(self.parent.parent.network_id, self.parent.id, self.network_id) @property def network(self): return self._network @property def sourcelist(self): return {k: v for k, v in self.nodelist.items() if isinstance(v, SourceNodeRun) and not isinstance(v, ConstantNodeRun)} @property def constantlist(self): return {k: v for k, v in self.nodelist.items() if isinstance(v, ConstantNodeRun)} @property def sinklist(self): return {k: v for k, v in self.nodelist.items() if isinstance(v, SinkNodeRun)}
[docs] def set_data(self, sourcedata, sinkdata): # Set the source and sink data for id_, source in self.sourcelist.items(): if isinstance(source, ConstantNodeRun): source.set_data() elif id_ in sourcedata: source.set_data(sourcedata[id_]) else: raise exceptions.FastrKeyError('Could not find source data for SourceNode {}!'.format(id_)) source.update() for id_, sink in self.sinklist.items(): if isinstance(sinkdata, str): sink.set_data(sinkdata) elif isinstance(sinkdata, dict): if id_ not in sinkdata: raise exceptions.FastrKeyError('Could not find sink data for SinkNode {}!'.format(id_)) sink.set_data(sinkdata[id_]) else: raise exceptions.FastrTypeError('Sinkdata should be a dict or str, found a {}!'.format( type(sinkdata).__name__ ))
[docs] def generate_jobs(self): log.debug('=== GENERATE JOBS FOR {} ==='.format(self.id)) # Create execution objects chuncker = DefaultNetworkChunker() analyzer = DefaultNetworkAnalyzer() # Create a network chuncker to Chunk the Network in executable blocks chunks = chuncker.chunck_network(self) for chunk in chunks: if not self.executing: return # Create a network analyzer to create the optimal execution order executionlist = analyzer.analyze_network(self, chunk) # Do not create sink jobs for a macro node if self.parent is not None: executionlist = [x for x in executionlist if not isinstance(x, SinkNodeRun)] # log.warning(f"===> {executionlist=}") # test1 = [x.execute() for x in executionlist] # log.warning(f"===> {test1=}") # test2 = list(itertools.zip_longest(*test1)) # log.warning(f"===> {test2=}") for job_lists in itertools.zip_longest(*[x.execute() for x in executionlist]): if not self.executing: return # Flatten the array of arrays while dropping the Nones (drained iterators) job_list = (x for y in job_lists if y is not None for x in y) # Only try to process chunks that actually have jobs... yield job_list log.debug('=== END GENERATE JOBS FOR {} ==='.format(self.id))
[docs] def execute(self, sourcedata, sinkdata, execution_plugin=None, tmpdir=None, cluster_queue=None, timestamp=None, tracking_id: Optional[str]=None): """ Execute the Network with the given data. This will analyze the Network, create jobs and send them to the execution backend of the system. :param dict sourcedata: dictionary containing all data for the sources :param dict sinkdata: dictionary containing directives for the sinks :param str execution_plugin: the execution plugin to use (None will use the config value) :raises FastrKeyError: if a source has not corresponding key in sourcedata :raises FastrKeyError: if a sink has not corresponding key in sinkdata """ log.debug('Acquiring execution lock...') self.execution_lock.wait() self.execution_lock.clear() self.executing = True set_current_run(self) self.register_signals() if timestamp: if isinstance(timestamp, str): timestamp = isodate.parse_datetime(timestamp) self.timestamp = timestamp if tracking_id: self.tracking_id = tracking_id else: self.tracking_id = None log.info("####################################") log.info("# network execution STARTED #") log.info("####################################") # Display the main file the Network is created from, if known it # is the network.filename, otherwise try to estimate the entry # point for the script used entry_file = self.network_filename if entry_file is None: for x in sys.argv[:2]: if os.path.isfile(x): entry_file = x break # If there is no entry_file or the first argument is ipython or # bpython we assume that is not a network but via prompt if entry_file is not None and not entry_file.endswith((os.path.sep + "ipython", os.path.sep + "bpython")): entry_file = os.path.abspath(entry_file) entry_file_date = time.ctime(os.path.getmtime(entry_file)) message = 'Running network via {} (last modified {})'.format(entry_file, entry_file_date) else: if entry_file is not None: entry_file = os.path.basename(entry_file) message = 'Running network via {} session'.format(entry_file or 'python') log.info(message) log.info('FASTR loaded from {}'.format(config.systemdir)) if tmpdir is None: tmpdir = os.path.normpath(mkdtemp(prefix='fastr_{}_'.format(self.id), dir=config.mounts['tmp'])) else: if url.isurl(tmpdir): if not tmpdir.startswith('vfs://'): raise exceptions.FastrValueError('The tmpdir keyword argument should be a path or vfs:// url!') tmpdir = vfs_plugin.url_to_path(tmpdir) if not os.path.exists(tmpdir): os.makedirs(tmpdir) self.tmpdir = tmpdir self.tmpurl = vfs_plugin.path_to_url(self.tmpdir) log.info('Network run tmpdir: {}'.format(self.tmpdir)) self.tmpdir = tmpdir self.tmpdir_lock = DirectoryLock(self.tmpdir) if not self.tmpdir_lock.acquire(): log.critical('Cannot lock temporary directory {}, aborting!'.format(self.tmpdir)) self.abort() return False network_file = os.path.join(self.tmpdir, self.NETWORK_DUMP_FILE_NAME) # Save a copy of the Network iohelpers.save_json(network_file, self.network, indent=2) iohelpers.save_gpickle(os.path.join(self.tmpdir, '__source_run_{}__.pickle.gz'.format( datetime.datetime.strftime(self.timestamp, '%Y-%m-%dT%H-%M-%S') )), sourcedata) # Make sure all reporting plugins are loaded for plugin_name in config.reporting_plugins: resources.plugins[plugin_name].activate() # Signal event that run is started emit_event(EventType.run_started, self) try: self.set_data(sourcedata, sinkdata) # Select desired server execution plugin and instantiate log.info('Selecting {} as executor plugin'.format(config.execution_plugin)) # Checlk what execution plugin to use if execution_plugin is None: execution_plugin = config.execution_plugin if execution_plugin not in resources.execution_plugins: message = 'Selected non-existing execution plugin ({}), available plugins: {}'.format(execution_plugin, list(resources.execution_plugins.keys())) log.error(message) raise exceptions.FastrValueError(message) log.debug('Retrieving execution plugin ({})'.format(execution_plugin)) execution_interface_type = resources.execution_plugins[execution_plugin] log.debug('Creating execution interface') with execution_interface_type(self.job_finished, self.job_finished) as execution_interface: if cluster_queue is not None: execution_interface.default_queue = cluster_queue for job_list in self.generate_jobs(): # Queue jobs but allow cancelling in between for job in job_list: if not self.executing: return log.info(f"Queueing job {job.id}") execution_interface.queue_job(job) log.info('Waiting for execution to finish...') # Wait in chuncks of two seconds (hopefully this will allow ipython to update more regularly) last_print = time.time() while len(execution_interface.job_dict) > 0: sys.stdout.flush() if not self.executing: return # Print out one which jobs we are still waiting if time.time() - last_print > 600: jobs = execution_interface.job_dict.values() log.info(f'Waiting for {len(jobs)} jobs:') if len(jobs) <= 15: for job in jobs: log.info(f'{job.id}: {job.status}') else: # Only show first and last 5, hide rest jobs = list(jobs) for job in jobs[:5]: log.info(f'{job.id}: {job.status}') log.info('---- {} JOBS HIDDEN ----'.format(len(jobs) - 10)) for job in jobs[-5:]: log.info(f'{job.id}: {job.status}') last_print = time.time() time.sleep(1.0) if not self.executing: return log.info('Chunk execution finished!') log.info("####################################") log.info("# network execution FINISHED #") log.info("####################################") finally: if not self.executing: log.info("####################################") log.info("# network execution ABORTED #") log.info("####################################") self.execution_finished() sink_data_json = { sink_node: { str(sample_id): { "status": str(job.status), "sample_status": status.name, "errors": [list(str(e) for e in x) for x in failed_annotations], } for sample_id, (job, status, failed_annotations) in sink_data.items() } for sink_node, sink_data in self.sink_results.items() } sink_result_file = os.path.join(self.tmpdir, self.SINK_DUMP_FILE_NAME) with open(sink_result_file, 'w') as fh_out: json.dump(sink_data_json, fh_out, indent=2) return self.result
[docs] def execution_finished(self): # Make sure to unlock the Network log.debug('Releasing execution lock') self.executing = False self.execution_lock.set() self.unregister_signals() # FIXME: This has be to move to AFTER the last write in the tmpdir # has finished, but inside a safety such as a with or finally. if self.tmpdir_lock: self.tmpdir_lock.release() # Check if everything was successful result = not any(x[1].name == 'FAILED' for sink_data in self.sink_results.values() for x in sink_data.values()) self.result = result # Signal event that run finished emit_event(EventType.run_finished, self)
[docs] def abort(self, signal_code=None, current_frame=None): if signal_code is not None: signal_obj = signal.Signals(signal_code) log.info('Recieved a {!s} signal, aborting execution of NetworkRun {}'.format(signal_obj, self.id)) else: log.info('Aborting execution of NetworkRun {}'.format(self.id)) self.executing = False self.unregister_signals()
[docs] def register_signals(self): """ Register handles to handle SIGINT and SIGTERM handlers to gracefully shut down the execution :return: """ if threading.current_thread() is not threading.main_thread(): log.warning('Cannot change signals from outside of main thread!') return signal.signal(signal.SIGINT, self.abort)
[docs] def unregister_signals(self): """ Unregister the signal handlers (set to default). Sending these signals twice will result that the second time the default handler is used. """ if threading.current_thread() is not threading.main_thread(): log.warning('Cannot change signals from outside of main thread!') return signal.signal(signal.SIGINT, signal.default_int_handler)
[docs] def job_finished(self, job): """ Call-back handler for when a job is finished. Will collect the results and handle blocking jobs. This function is automatically called when the execution plugin finished a job. :param job: the job that finished :type job: :py:class:`Job <fastr.execution.job.Job>` """ # Find the Node for this job node = self[job.node_global_id] # Collect earlier failed annotations failed_job_annotation = set() if not isinstance(node, SourceNodeRun): for input_id, input_argument in job.input_arguments.items(): # Check if input is hidden and system-generated (it will # not be part of the provenance) input_description = node.tool.inputs[input_id] if input_description.hidden: log.debug('Skipping hidden system input: {}'.format(input_id)) continue if not isinstance(input_argument, list): input_argument = [input_argument] for input_argument_element in input_argument: if str(input_argument_element.id) == '__EMPTY__': continue # Get the annotation from the input log.debug('Getting annotation from {} --> {}'.format(node.inputs[input_id].get_sourced_outputs(), node.inputs[input_id][input_argument_element.index].failed_annotations)) item = node.inputs[input_id][input_argument_element.index] failed_job_annotation.update(item.failed_annotations) # Log the status and add annotations if needed status_message = "Finished job {} with status {}".format(job.id, job.status) log.info(status_message) if job.status not in (JobState.finished, JobState.cancelled): if len(job.errors) > 0: error = job.errors[0] status_message = 'Encountered error: [{e[0]}] {e[1]} ({e[2]}:{e[3]})'.format(e=error) else: status_message = 'Encountered error in job {}'.format(job.id) job_relative_path = job.logurl.replace(self.tmpurl, '.') failed_job_annotation.add((job.id, str(job.status), status_message, job_relative_path)) # Lock Network for editing, make sure not multiple threads will edit network data at the same time with self.callback_edit_lock: # Data is stored here, need to place it back in the Nodes node.set_result(job, failed_job_annotation) sys.stdout.flush()