executionplugins Package

executionplugins Package

blockingexecution Module

class fastr.resources.plugins.executionplugins.blockingexecution.BlockingExecution(finished_callback=None, cancelled_callback=None)[source]

Bases: fastr.plugins.executionplugin.ExecutionPlugin

The blocking execution plugin is a special plugin which is meant for debug purposes. It will not queue jobs but immediately execute them inline, effectively blocking fastr until the Job is finished. It is the simplest execution plugin and can be used as a template for new plugins or for testing purposes.

__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters:
  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled
Returns:

newly created ExecutionPlugin

__module__ = 'fastr.resources.plugins.executionplugins.blockingexecution'
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

fastr.resources.plugins.executionplugins.blockingexecution.run_job(job, job_status)[source]

drmaaexecution Module

class fastr.resources.plugins.executionplugins.drmaaexecution.DRMAAExecution(finished_callback=None, cancelled_callback=None)[source]

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A DRMAA execution plugin to execute Jobs on a Grid Engine cluster. It uses a configuration option for selecting the queue to submit to. It uses the python drmaa package.

Note

To use this plugin, make sure the drmaa package is installed and that the execution is started on an SGE submit host with DRMAA libraries installed.

Note

This plugin is at the moment tailored to SGE, but it should be fairly easy to make different subclasses for different DRMAA supporting systems.

CANCELS_DEPENDENCIES = False
GE_NATIVE_SPEC = {'CWD': '-cwd', 'DEPENDS': '-hold_jid {hold_list}', 'DEPENDS_SEP': ',', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l h_vmem={memory}', 'NCORES': '-pe smp {ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l h_rt={walltime}'}
NATIVE_SPEC = {'grid_engine': {'CWD': '-cwd', 'DEPENDS': '-hold_jid {hold_list}', 'DEPENDS_SEP': ',', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l h_vmem={memory}', 'NCORES': '-pe smp {ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l h_rt={walltime}'}, 'torque': {'CWD': '', 'DEPENDS': '-W depend=afterok:{hold_list}', 'DEPENDS_SEP': ':', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l mem={memory}', 'NCORES': '-l procs={ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l walltime={walltime}'}}
SUPPORTS_CANCEL = True
SUPPORTS_DEPENDENCY = True
SUPPORTS_HOLD_RELEASE = True
TORQUE_NATIVE_SPEC = {'CWD': '', 'DEPENDS': '-W depend=afterok:{hold_list}', 'DEPENDS_SEP': ':', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l mem={memory}', 'NCORES': '-l procs={ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l walltime={walltime}'}
__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters:
  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled
Returns:

newly created ExecutionPlugin

__module__ = 'fastr.resources.plugins.executionplugins.drmaaexecution'
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
collect_jobs()[source]
configuration_fields = {'drmaa_engine': (<class 'str'>, 'grid_engine', 'The engine to use (options: grid_engine, torque'), 'drmaa_job_check_interval': (<class 'int'>, 900, 'The interval in which the job checker will startto check for stale jobs'), 'drmaa_max_jobs': (<class 'int'>, 0, 'The maximum jobs that can be send to the scheduler at the same time (0 for no limit)'), 'drmaa_queue': (<class 'str'>, 'week', 'The default queue to use for jobs send to the scheduler')}
create_native_spec(queue, walltime, memory, ncores, outputLog, errorLog, hold_job, hold)[source]

Create the native spec for the DRMAA scheduler. Needs to be implemented in the subclasses

Parameters:
  • queue (str) – the queue to submit to
  • walltime (str) – walltime specified
  • memory (str) – memory requested
  • ncores (int) – number of cores requested
  • outputLog (str) – the location of the stdout log
  • errorLog (str) – the location of stderr log
  • hold_job (list) – list of jobs to depend on
  • hold (bool) – flag if job should be submitted in hold mode
Returns:

current_jobs
dispatch_callbacks()[source]
job_regression_check()[source]
send_job(command, arguments, queue=None, resources=None, job_name=None, joinLogFiles=False, outputLog=None, errorLog=None, hold_job=None, hold=False)[source]
spec_fields
submit_jobs()[source]
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

exception fastr.resources.plugins.executionplugins.drmaaexecution.FastrDRMAANotFoundError(*args, **kwargs)[source]

Bases: fastr.exceptions.FastrImportError

Indicate the DRMAA module was not found on the system.

__module__ = 'fastr.resources.plugins.executionplugins.drmaaexecution'
exception fastr.resources.plugins.executionplugins.drmaaexecution.FastrDRMAANotFunctionalError(*args, **kwargs)[source]

Bases: fastr.exceptions.FastrError

Indicate DRMAA is found but creating a session did not work

__module__ = 'fastr.resources.plugins.executionplugins.drmaaexecution'

linearexecution Module

class fastr.resources.plugins.executionplugins.linearexecution.LinearExecution(finished_callback=None, cancelled_callback=None)[source]

Bases: fastr.plugins.executionplugin.ExecutionPlugin

An execution engine that has a background thread that executes the jobs in order. The queue is a simple FIFO queue and there is one worker thread that operates in the background. This plugin is meant as a fallback when other plugins do not function properly. It does not multi-processing so it is safe to use in environments that do no support that.

__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters:
  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled
Returns:

newly created ExecutionPlugin

__module__ = 'fastr.resources.plugins.executionplugins.linearexecution'
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
exec_worker()[source]
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

processpoolexecution Module

class fastr.resources.plugins.executionplugins.processpoolexecution.ProcessPoolExecution(finished_callback=None, cancelled_callback=None, nr_of_workers=None)[source]

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A local execution plugin that uses multiprocessing to create a pool of worker processes. This allows fastr to execute jobs in parallel with true concurrency. The number of workers can be specified in the fastr configuration, but the default amount is the number of cores - 1 with a minimum of 1.

Warning

The ProcessPoolExecution does not check memory requirements of jobs and running many workers might lead to memory starvation and thus an unresponsive system.

__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None, nr_of_workers=None)[source]

Setup the ExecutionPlugin

Parameters:
  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled
Returns:

newly created ExecutionPlugin

__module__ = 'fastr.resources.plugins.executionplugins.processpoolexecution'
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
configuration_fields = {'process_pool_worker_number': (<class 'int'>, 3, 'Number of workers to use in a process pool')}
job_finished_callback(result)[source]

Reciever for the callback, it will split the result tuple and call job_finished

Parameters:result (tuple) – return value of run_job
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

fastr.resources.plugins.executionplugins.processpoolexecution.run_job(job, job_status)[source]

rqexecution Module

class fastr.resources.plugins.executionplugins.rqexecution.RQExecution(finished_callback=None, cancelled_callback=None)[source]

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A execution plugin based on Redis Queue. Fastr will submit jobs to the redis queue and workers will peel the jobs from the queue and process them.

This system requires a running redis database and the database url has to be set in the fastr configuration.

Note

This execution plugin required the redis and rq packages to be installed before it can be loaded properly.

__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters:
  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled
Returns:

newly created ExecutionPlugin

__module__ = 'fastr.resources.plugins.executionplugins.rqexecution'
check_finished()[source]
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
configuration_fields = {'rq_host': (<class 'str'>, 'redis://localhost:6379/0', 'The url of the redis serving the redis queue'), 'rq_queue': (<class 'str'>, 'default', 'The redis queue to use')}
classmethod run_job(job_id, job_command, job_stdout, job_stderr)[source]
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

slurmexecution Module

class fastr.resources.plugins.executionplugins.slurmexecution.SlurmExecution(finished_callback=None, cancelled_callback=None)[source]

Bases: fastr.plugins.executionplugin.ExecutionPlugin

SBATCH = 'sbatch'
SCANCEL = 'scancel'
SCONTROL = 'scontrol'
SQUEUE = 'squeue'
SQUEUE_FORMAT = '{"id": %.18i, "status": "%.2t"}'
STATUS_MAPPING = {' F': <JobState.failed: ('failed', 'done', True)>, ' R': <JobState.running: ('running', 'in_progress', False)>, 'CA': <JobState.cancelled: ('cancelled', 'done', True)>, 'CD': <JobState.finished: ('finished', 'done', False)>, 'CF': <JobState.running: ('running', 'in_progress', False)>, 'CG': <JobState.running: ('running', 'in_progress', False)>, 'NF': <JobState.failed: ('failed', 'done', True)>, 'PD': <JobState.queued: ('queued', 'idle', False)>, 'RV': <JobState.cancelled: ('cancelled', 'done', True)>, 'SE': <JobState.failed: ('failed', 'done', True)>, 'TO': <JobState.queued: ('queued', 'idle', False)>}
SUPPORTS_CANCEL = True
SUPPORTS_DEPENDENCY = True
SUPPORTS_HOLD_RELEASE = True
__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters:
  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled
Returns:

newly created ExecutionPlugin

__module__ = 'fastr.resources.plugins.executionplugins.slurmexecution'
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
configuration_fields = {'slurm_job_check_interval': (<class 'int'>, 30, 'The interval in which the job checker will startto check for stale jobs'), 'slurm_partition': (<class 'str'>, '', 'The slurm partition to use')}
job_status_check()[source]
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

strongrexecution Module

class fastr.resources.plugins.executionplugins.strongrexecution.StrongrExecution(finished_callback=None, cancelled_callback=None)[source]

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A execution plugin based on Redis Queue. Fastr will submit jobs to the redis queue and workers will peel the jobs from the queue and process them.

This system requires a running redis database and the database url has to be set in the fastr configuration.

Note

This execution plugin required the redis and rq packages to be installed before it can be loaded properly.

__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters:
  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled
Returns:

newly created ExecutionPlugin

__module__ = 'fastr.resources.plugins.executionplugins.strongrexecution'
check_finished()[source]
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
configuration_fields = {}
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin