# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module holds the ExecutionPluginManager as well as the base-class
for all ExecutionPlugins.
"""
from abc import abstractmethod
import functools
import gzip
import pickle
import sys
import threading
import traceback
import types
from enum import Enum
import fastr
from fastr import exceptions
from fastr.core.baseplugin import Plugin
from fastr.core.pluginmanager import PluginSubManager
from fastr.execution.job import Job, JobState
from fastr.utils import iohelpers
[docs]class JobAction(Enum):
"""
Job actions that can be performed. This is used for checking
if held jobs should be queued, held longer or be cancelled.
"""
hold = 'hold'
queue = 'queue'
cancel = 'cancel'
[docs]class ExecutionPlugin(Plugin):
"""
This class is the base for all Plugins to execute jobs somewhere. There are
many methods already in place for taking care of stuff. Most plugins should
only need to redefine a few abstract methods:
* :py:meth:`__init__ <fastr.execution.executionpluginmanager.ExecutionPlugin.__init__>`
the constructor
* :py:meth:`cleanup <fastr.execution.executionpluginmanager.ExecutionPlugin.__init__>`
a clean up function that frees resources, closes connections, etc
* :py:meth:`_queue_job <fastr.execution.executionpluginmanager.ExecutionPlugin._queue_job>`
the method that queues the job for execution
* :py:meth:`_cancel_job <fastr.execution.executionpluginmanager.ExecutionPlugin._cancel_job>`
cancels a previously queued job
* :py:meth:`_release_job <fastr.execution.executionpluginmanager.ExecutionPlugin._release_job>`
releases a job that is currently held
* :py:meth:`_job_finished <fastr.execution.executionpluginmanager.ExecutionPlugin._job_finished>`
extra callback for when a job finishes
Not all of the functions need to actually do anything for a plugin. There
are examples of plugins that do not really need a ``cleanup``, but for
safety you need to implement it. Just using a ``pass`` for the method could
be fine in such a case.
.. warning::
When overwriting other function, extreme care must be taken not to break
the plugins working.
"""
@abstractmethod
[docs] def __init__(self, finished_callback=None, cancelled_callback=None, status_callback=None):
"""
Setup the ExecutionPlugin
:param finished_callback: the callback to call after a job finished
:param cancelled_callback: the callback to call after a job cancelled
:return: newly created ExecutionPlugin
"""
super(ExecutionPlugin, self).__init__()
# Pylint seems to be unable to figure out the .dict() member
# pylint: disable=no-member
self.job_status = {}
self.job_dict = {}
self.job_archive = {}
self._finished_callback = finished_callback
self._cancelled_callback = cancelled_callback
self._status_callback = status_callback
# Dict indicating the depending jobs for a certain jobs (who is waiting on the key jobid)
self.held_queue = {}
self.held_queue_lock = threading.Lock()
# Flag indicating the plugin is accepting new jobs queued
self.accepting = True
[docs] def __enter__(self):
return self
[docs] def __exit__(self, type_, value, tb):
self.cleanup()
[docs] def __del__(self):
"""
Cleanup if the variable was deleted on purpose
"""
fastr.log.debug('Calling cleanup')
self.cleanup()
@abstractmethod
[docs] def cleanup(self):
"""
Method to call to clean up the ExecutionPlugin. This can be to clear
temporary data, close connections, etc.
:param force: force cleanup (e.g. kill instead of join a process)
"""
# Stop accepting new jobs (close the queue)
self.accepting = False
# Cancel all queued jobs
while len(self.job_dict) > 0:
jobid, job = self.job_dict.popitem()
fastr.log.debug('Cleanup cancelling {}'.format(jobid))
self.cancel_job(job)
[docs] def queue_job(self, job):
"""
Add a job to the execution queue
:param Job job: job to add
"""
if not self.accepting:
return
if isinstance(job, list):
for j in job:
self.queue_job(j)
return
self.job_dict[job.jobid] = job
self.job_status[job.jobid] = job.status = JobState.queued
for hold_id in job.hold_jobs:
# Add job reference to held queue to receive signal when the
# required jobs are finished/failed. Do not subscribe for jobs that
# are already finished.
if hold_id in self.job_status and self.job_status[hold_id].done:
continue
with self.held_queue_lock:
if hold_id not in self.held_queue:
self.held_queue[hold_id] = []
# append to held_queue, because of the managed dict, we need to replace the value (not update)
self.held_queue[hold_id].append(job.jobid)
# Save the job to file before serializing
with gzip.open(job.commandfile, 'wb') as fout:
fastr.log.debug('Writing job pickle.gz to: {}'.format(job.commandfile))
pickle.dump(job, fout)
self._queue_job(job)
[docs] def cancel_job(self, job):
"""
Cancel a job previously queued
:param job: job to cancel
"""
if not isinstance(job, Job):
try:
job = self.job_dict[job]
except KeyError:
fastr.log.warning('Job {} is no longer under processing, cannot cancel!'.format(job))
return
fastr.log.debug('Cancelling {}'.format(job.jobid))
job.status = self.job_status[job.id] = JobState.cancelled
fastr.log.debug('Cancelling children for {}'.format(job.id))
if job.id in self.held_queue:
fastr.log.debug('Found children....')
held_queue = self.held_queue[job.id]
for dependent_job in held_queue:
fastr.log.debug('Checking sub {}'.format(dependent_job))
if dependent_job in self.job_dict and dependent_job in self.job_status and not self.job_status[dependent_job].done:
fastr.log.debug('Cancelling sub {}'.format(dependent_job))
self.cancel_job(dependent_job)
else:
fastr.log.debug('No children....')
self._cancel_job(job)
fastr.log.debug('Removing {} from jobdict'.format(job.jobid))
self.job_archive[job.id] = job
try:
del self.job_dict[job.id]
except KeyError:
pass
fastr.log.debug('Calling cancelled for {}'.format(job.id))
if self._cancelled_callback is not None:
self._cancelled_callback(job)
[docs] def release_job(self, job):
"""
Release a job that has been put on hold
:param jobid: job to release
"""
if not isinstance(job, Job):
if job not in self.job_dict:
fastr.log.warning('Job {} is no longer under processing, cannot release!'.format(job))
return
try:
job = self.job_dict[job]
except KeyError:
fastr.log.warning('Job {} is no longer under processing, cannot release!'.format(job))
return
job.status = JobState.queued
self._release_job(job.id)
[docs] def job_finished(self, job, blocking=False):
"""
The default callback that is called when a Job finishes. This will
create a new thread that handles the actual callback.
:param Job job: the job that finished
:return:
"""
if not blocking:
# The callback has to finish immediately, so create thead to handle callback and return
callback_thread = threading.Thread(target=self._job_finished_body,
name='fastr_jobfinished_callback',
args=(job,))
callback_thread.start()
else:
self._job_finished_body(job)
def _job_finished_body(self, job):
"""
The actual callback that is executed in a separate thread. This
method handles the collection of the result, the release of depending
jobs and calling the user defined callback.
:param Job job: the job that finished
"""
fastr.log.debug('ExecutorInterface._job_finished_callback called')
self.job_status[job.jobid] = JobState.processing_callback
# The Job finished should always log the errors rather than
# crashing the whole execution system
# pylint: disable=bare-except
try:
try:
job = iohelpers.load_gpickle(job.logfile)
except EOFError:
job.info_store['errors'].append(
exceptions.FastrResultFileNotFound(
('Could not read job result file {}, assuming '
'the job crashed during output write.').format(job.logfile)).excerpt())
job.status = JobState.failed
except IOError:
job.info_store['errors'].append(
exceptions.FastrResultFileNotFound(
('Could not find/read job result file {}, assuming '
'the job crashed before it created output.').format(job.logfile)).excerpt())
job.status = JobState.failed
if self._status_callback is not None:
self._status_callback(job)
job.status_callback = self._status_callback
except:
exc_type, _, trace = sys.exc_info()
exc_info = traceback.format_exc()
trace = traceback.extract_tb(trace, 1)[0]
fastr.log.error('Encountered exception ({}) during execution:\n{}'.format(exc_type.__name__, exc_info))
if 'errors' not in job.info_store:
job.info_store['errors'] = []
job.info_store['errors'].append((exc_type.__name__, exc_info, trace[0], trace[1]))
job.status = JobState.execution_failed
result = job
fastr.log.debug('Finished {} with status {}'.format(job.jobid, job.status))
jobid = result.jobid
# Make sure the status is either finished or failed
if result.status == JobState.execution_done:
result.status = JobState.finished
else:
result.status = JobState.failed
# Set the job status so the hold jobs will be release properly
self.job_status[jobid] = result.status
if jobid in self.job_dict:
self.job_dict[jobid].status = JobState.finished
# The ProcessPoolExecutor has to track job dependencies itself, so
# therefor we have to check for jobs depending on the finished job
if jobid in self.held_queue:
fastr.log.debug('Signaling depending jobs {}'.format(self.held_queue[jobid]))
ready_jobs = []
for held_jobid in self.held_queue[jobid]:
action = self.check_job_requirements(held_jobid)
if action == JobAction.queue:
# Re-assign managed dict member
if held_jobid in self.job_dict:
held_job = self.job_dict[held_jobid]
self.job_dict[held_jobid] = held_job
fastr.log.debug('Job {} is now ready to be submitted'.format(held_jobid))
# If ready, flag job for removal from held queue and send
# to pool queue to be executed
ready_jobs.append(held_jobid)
self.release_job(held_jobid)
elif action == JobAction.cancel:
fastr.log.debug('Job {} will be cancelled'.format(held_jobid))
ready_jobs.append(held_jobid)
self.cancel_job(held_jobid)
else:
fastr.log.debug('Job {} still has unmet dependencies'.format(held_jobid))
# Remove jobs that no longer need to be held from held_queue
for readyjobid in ready_jobs:
job = self.get_job(readyjobid)
for hold_id in job.hold_jobs:
with self.held_queue_lock:
if hold_id in self.held_queue:
# remove from held_queue. because of the managed dict,
# we need to replace the value (not update)
required_job = self.held_queue[hold_id]
if readyjobid in required_job:
required_job.remove(readyjobid)
else:
fastr.log.warning('Could not remove {} from {}'.format(readyjobid, required_job))
with self.held_queue_lock:
del self.held_queue[jobid]
# Extra subclass callback
fastr.log.debug('Subclass callback')
try:
self._job_finished(result)
except:
exc_type, _, _ = sys.exc_info()
exc_info = traceback.format_exc()
fastr.log.error('Encountered exception ({}) during callback {}._job_finished:\n{}'.format(exc_type.__name__, type(self).__name__, exc_info))
# Extra callback from object
fastr.log.debug('Calling callback for {}'.format(jobid))
if self._finished_callback is not None:
try:
self._finished_callback(result)
except:
if isinstance(self._finished_callback, functools.partial):
args = self._finished_callback.args + tuple('{}={}'.format(k, v) for k, v in self._finished_callback.keywords.items())
callback_name = '{f.__module__}.{f.func_name}({a})'.format(f=self._finished_callback.func, a=','.join(args))
elif isinstance(self._finished_callback, types.FunctionType):
callback_name = '{f.__module__}.{f.func_name}'.format(f=self._finished_callback)
elif isinstance(self._finished_callback, types.MethodType):
callback_name = '{m.__module__}.{m.im_class.__name__}.{m.im_func.func_name}'.format(m=self._finished_callback)
else:
callback_name = repr(self._finished_callback)
exc_type, _, _ = sys.exc_info()
exc_info = traceback.format_exc()
fastr.log.error('Encountered exception ({}) during callback {}:\n{}'.format(exc_type.__name__, callback_name, exc_info))
else:
fastr.log.debug('No callback specified')
# Move the job to archive (to keep the number of working jobs limited
# in the future the archive can be moved to a db/disk if needed
fastr.log.debug('Archiving job {} with status {}'.format(jobid, result.status))
try:
del self.job_status[jobid]
except KeyError:
pass
self.job_archive[jobid] = result
try:
del self.job_dict[jobid]
except KeyError:
pass
fastr.log.debug('Done archiving')
[docs] def get_job(self, jobid):
try:
return self.job_dict[jobid]
except KeyError:
try:
return self.job_archive[jobid]
except:
raise exceptions.FastrKeyError('Could not find job {}'.format(jobid))
[docs] def get_status(self, job):
if not isinstance(job, Job):
job = self.get_job(job)
return job.status
@abstractmethod
def _queue_job(self, job):
"""
Method that a subclass implements to actually queue a Job for execution
:param job: job to queue
"""
pass
@abstractmethod
def _cancel_job(self, jobid):
"""
Method that a subclass implements to actually cancel a Job
:param jobid: job to queue
"""
pass
@abstractmethod
def _release_job(self, jobid):
"""
Method that a subclass implements to actually release a job
:param jobid: job to queue
"""
pass
@abstractmethod
def _job_finished(self, job):
"""
Method that a subclass can implement to add to the default callback.
It will be called by ``_job_finished_body`` right before the user
defined callback will be called.
:param job: Job that resulted from the execution
"""
pass
[docs] def show_jobs(self, req_status=None):
"""
List the queued jobs, possible filtered by status
:param req_status: requested status to filter on
:return: list of jobs
"""
if isinstance(req_status, basestring):
req_status = JobState[req_status]
if not isinstance(req_status, JobState):
return []
results = []
for key, status in self.job_status.items():
if req_status is None or status == req_status:
results.append(self.get_job(key))
return results
[docs] def check_job_status(self, jobid):
"""
Get the status of a specified job
:param jobid: the target job
:return: the status of the job (or None if job not found)
"""
try:
return self.get_status(jobid)
except exceptions.FastrKeyError:
return None
[docs] def check_job_requirements(self, jobid):
"""
Check if the requirements for a job are fulfilled.
:param jobid: job to check
:return: directive what should happen with the job
:rtype: JobAction
"""
job = self.get_job(jobid)
if job.hold_jobs is None or len(job.hold_jobs) == 0:
return JobAction.queue
all_done = True
for hold_id in job.hold_jobs:
status = self.check_job_status(hold_id)
if status is not None and status != JobState.finished:
if status.done:
return JobAction.cancel
else:
fastr.log.debug('Dependency {} for {} is unmet ({})'.format(hold_id, jobid, status))
all_done = False
if all_done:
return JobAction.queue
else:
return JobAction.hold
[docs]class ExecutionPluginManager(PluginSubManager):
"""
Container holding all the ExecutionPlugins known to the Fastr system
"""
[docs] def __init__(self):
"""
Initialize a ExecutionPluginManager and load plugins.
:param path: path to search for plugins
:param recursive: flag for searching recursively
:return: newly created ExecutionPluginManager
"""
super(ExecutionPluginManager, self).__init__(parent=fastr.plugin_manager,
plugin_class=self.plugin_class)
@property
def plugin_class(self):
"""
The class of the Plugins expected in this BasePluginManager
"""
return ExecutionPlugin