execution Package

execution Package

This package contains all modules related directly to the execution

environmentmodules Module

This module contains a class to interact with EnvironmentModules

class fastr.execution.environmentmodules.EnvironmentModules(protected=None)[source]

Bases: object

This class can control the module environments in python. It can list, load and unload environmentmodules. These modules are then used if subprocess is

called from python.
__dict__ = dict_proxy({'load': <function load>, '__module__': 'fastr.execution.environmentmodules', '__repr__': <function __repr__>, '__dict__': <attribute '__dict__' of 'EnvironmentModules' objects>, '_sync_avail': <function _sync_avail>, 'sync': <function sync>, '_run_commands_string': <function _run_commands_string>, 'reload': <function reload>, '_sync_loaded': <function _sync_loaded>, '_module_settings_warning': 'Cannot find Environment Modules home directory (environment variables not setup properly?)', '__weakref__': <attribute '__weakref__' of 'EnvironmentModules' objects>, 'tostring_modvalue': <staticmethod object>, '__init__': <function __init__>, '_module': <function _module>, 'exception': FastrImportError('Cannot find Environment Modules home directory (environment variables not setup properly?)',), 'avail_modules': <property object>, 'unload': <function unload>, 'clear': <function clear>, '_module_settings_loaded': False, 'isloaded': <function isloaded>, 'avail': <function avail>, 'totuple_modvalue': <staticmethod object>, 'swap': <function swap>, '__doc__': '\n This class can control the module environments in python. It can list, load\n and unload environmentmodules. These modules are then used if subprocess is\n called from python.\n ', 'loaded_modules': <property object>})
__init__(protected=None)[source]

Create the environmentmodules control object

Parameters:protected (list) – list of modules that should never be unloaded
Returns:newly created EnvironmentModules
__module__ = 'fastr.execution.environmentmodules'
__repr__()[source]
__weakref__

list of weak references to the object (if defined)

avail(namestart=None)[source]

Print available modules in same way as commandline version

Parameters:namestart – filter on modules that start with namestart
avail_modules

List of avaible modules

clear()[source]

Unload all modules (except the protected modules as they cannot be unloaded). This should result in a clean environment.

exception = FastrImportError('Cannot find Environment Modules home directory (environment variables not setup properly?)',)
isloaded(module)[source]

Check if a specific module is loaded

Parameters:module – module to check
Returns:flag indicating the module is loaded
load(module)[source]

Load specified module

Parameters:module – module to load
loaded_modules

List of currently loaded modules

reload(module)[source]

Reload specified module

Parameters:module – module to reload
swap(module1, module2)[source]

Swap one module for another one

Parameters:
  • module1 – module to unload
  • module2 – module to load
sync()[source]

Sync the object with the underlying enviroment. Re-checks the available and loaded modules

static tostring_modvalue(value)[source]

Turn a representation of a module into a string representation

Parameters:value – module representation (either str or tuple)
Returns:string representation
static totuple_modvalue(value)[source]

Turn a representation of a module into a tuple representation

Parameters:value – module representation (either str or tuple)
Returns:tuple representation (name, version, default)
unload(module)[source]

Unload specified module

Parameters:module – module to unload
class fastr.execution.environmentmodules.ModuleSystem[source]

Bases: enum.Enum

__format__(format_spec)
__module__ = 'fastr.execution.environmentmodules'
static __new__(value)
__reduce_ex__(proto)
__repr__()
__str__()
envmod = 'enviromentmodules'
lmod = 'Lmod'

executionpluginmanager Module

This module holds the ExecutionPluginManager as well as the base-class for all ExecutionPlugins.

class fastr.execution.executionpluginmanager.ExecutionPlugin(finished_callback=None, cancelled_callback=None, status_callback=None)[source]

Bases: fastr.core.baseplugin.Plugin

This class is the base for all Plugins to execute jobs somewhere. There are many methods already in place for taking care of stuff.

There are fall-backs for certain features, but if a system already implements those it is usually preferred to skip the fall-back and let the external system handle it. There are a few flags to enable disable these features:

  • cls.SUPPORTS_CANCEL indicates that the plugin can cancel queued jobs

  • cls.SUPPORTS_HOLD_RELEASE indicates that the plugin can queue jobs in a hold state and can release them again (if not, the base plugin will create a hidden queue for held jobs). The plugin should respect the Job.status == JobState.hold when queueing jobs.

  • cls.SUPPORTS_DEPENDENCY indicate that the plugin can manage job dependencies, if not the base plugin job dependency system will be used and jobs with only be submitted when all dependencies are met.

  • cls.CANCELS_DEPENDENCIES indicates that if a job is cancelled it will automatically cancel all jobs depending on that job. If not the plugin traverse the dependency graph and kill each job manual.

    Note

    If a plugin supports dependencies it is assumed that when a job gets cancelled, the depending job also get cancelled automatically!

Most plugins should only need to redefine a few abstract methods:

  • __init__ the constructor
  • cleanup a clean up function that frees resources, closes connections, etc
  • _queue_job the method that queues the job for execution

Optionally an extra job finished callback could be added:

  • _job_finished extra callback for when a job finishes

If SUPPORTS_CANCEL is set to True, the plugin should also implement:

  • _cancel_job cancels a previously queued job

If SUPPORTS_HOLD_RELEASE is set to True, the plugin should also implement:

  • _hold_job hold_job a job that is currently held
  • _release_job releases a job that is currently held

If SUPPORTED_DEPENDENCY is set to True, the plugin should:

  • Make sure to use the Job.hold_jobs as a list of its dependencies

Not all of the functions need to actually do anything for a plugin. There are examples of plugins that do not really need a cleanup, but for safety you need to implement it. Just using a pass for the method could be fine in such a case.

Warning

When overwriting other functions, extreme care must be taken not to break the plugins working, as there is a lot of bookkeeping that can go wrong.

CANCELS_DEPENDENCIES = False

Indicates that when a job is cancelled the dependenceis

SUPPORTS_CANCEL = False

Indicates if the plugin can cancel queued jobs

SUPPORTS_DEPENDENCY = False

Indicate if the plugin can manage job dependencies, if not the base plugin job dependency system will be used and jobs with only be submitted when all dependencies are met.

SUPPORTS_HOLD_RELEASE = False

Indicates if the plugin can queue jobs in a hold state and can release them again (if not, the base plugin will create a hidden queue for held

jobs)
__abstractmethods__ = frozenset(['cleanup', '_queue_job', '__init__'])
__del__()[source]

Cleanup if the variable was deleted on purpose

__enter__()[source]
__exit__(type_, value, tb)[source]
__init__(finished_callback=None, cancelled_callback=None, status_callback=None)[source]

Setup the ExecutionPlugin

Parameters:
  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled
Returns:

newly created ExecutionPlugin

__module__ = 'fastr.execution.executionpluginmanager'
cancel_job(job)[source]

Cancel a job previously queued

Parameters:job – job to cancel
check_job_requirements(job_id)[source]

Check if the requirements for a job are fulfilled.

Parameters:job_id – job to check
Returns:directive what should happen with the job
Return type:JobAction
check_job_status(job_id)[source]

Get the status of a specified job

Parameters:job_id – the target job
Returns:the status of the job (or None if job not found)
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
get_job(job_id)[source]
get_status(job)[source]
hold_job(job)[source]
job_finished(job, errors=None, blocking=False)[source]

The default callback that is called when a Job finishes. This will create a new thread that handles the actual callback.

Parameters:
  • job (Job) – the job that finished
  • errors – optional list of errors encountered
  • blocking (bool) – if blocking, do not create threads
Returns:

queue_job(job)[source]

Add a job to the execution queue

Parameters:job (Job) – job to add
register_job(job)[source]
release_job(job)[source]

Release a job that has been put on hold

Parameters:job – job to release
show_jobs(req_status=None)[source]

List the queued jobs, possible filtered by status

Parameters:req_status – requested status to filter on
Returns:list of jobs
class fastr.execution.executionpluginmanager.ExecutionPluginManager[source]

Bases: fastr.core.pluginmanager.PluginSubManager

Container holding all the ExecutionPlugins known to the Fastr system

__abstractmethods__ = frozenset([])
__init__()[source]

Initialize a ExecutionPluginManager and load plugins.

Parameters:
  • path – path to search for plugins
  • recursive – flag for searching recursively
Returns:

newly created ExecutionPluginManager

__module__ = 'fastr.execution.executionpluginmanager'
class fastr.execution.executionpluginmanager.JobAction[source]

Bases: enum.Enum

Job actions that can be performed. This is used for checking if held jobs should be queued, held longer or be cancelled.

__format__(format_spec)
__module__ = 'fastr.execution.executionpluginmanager'
static __new__(value)
__reduce_ex__(proto)
__repr__()
__str__()
cancel = 'cancel'
hold = 'hold'
queue = 'queue'

executionscript Module

The executionscript is the script that wraps around a tool executable. It takes a job, builds the command, executes the command (while profiling it) and collects the results.

fastr.execution.executionscript.execute_job(job)[source]

Execute a Job and save the result to disk

Parameters:job – the job to execute
fastr.execution.executionscript.main(joblist=None)[source]

This is the main code. Wrapped inside a function to avoid the variables being seen as globals and to shut up pylint. Also if the joblist argument is given it can run any given job, otherwise it takes the first command line argument.

job Module

This module contains the Job class and some related classes.

class fastr.execution.job.InlineJob(*args, **kwargs)[source]

Bases: fastr.execution.job.Job

__init__(*args, **kwargs)[source]
__module__ = 'fastr.execution.job'
get_result()[source]
class fastr.execution.job.Job(node, sample_id, sample_index, input_arguments, output_arguments, hold_jobs=None, status_callback=None, preferred_types=None)[source]

Bases: fastr.core.serializable.Serializable

Class describing a job.

Arguments: tool_name - the name of the tool (str) tool_version - the version of the tool (Version) argument - the arguments used when calling the tool (list) tmpdir - temporary directory to use to store output data hold_jobs - list of jobs that need to finished before this job can run (list)

COMMAND_DUMP = '__fastr_command__.pickle.gz'
RESULT_DUMP = '__fastr_result__.pickle.gz'
STDERR_DUMP = '__fastr_stderr__.txt'
STDOUT_DUMP = '__fastr_stdout__.txt'
__getstate__()[source]

Get the state of the job

Returns:job state
Return type:dict
__init__(node, sample_id, sample_index, input_arguments, output_arguments, hold_jobs=None, status_callback=None, preferred_types=None)[source]

Create a job

Parameters:
  • node (Node) – the node the job is based on
  • sample_id (fastr.core.samples.SampleId) – the id of the sample
  • sample_index (fastr.core.samples.SampleIndex) – the index of the sample
  • input_arguments (list[dict]) – the argument list
  • output_arguments (list[dict]) – the argument list
  • hold_jobs (list[str]) – the jobs on which this jobs depend
  • status_callback (callable) – The callback to call when the status changed
  • preferred_types – The list of preferred types to use
Returns:

__module__ = 'fastr.execution.job'
__repr__()[source]

String representation of the Job

__setstate__(state)[source]

Set the state of the job

Parameters:state (dict) –
static calc_cardinality(description, payload)[source]
commandfile

The path of the command pickle

commandurl

The url of the command pickle

create_payload()[source]

Create the payload for this object based on all the input/output arguments

Returns:the payload
Return type:dict
ensure_tmp_dir()[source]
execute()[source]

Execute this job

Returns:The result of the execution
Return type:InterFaceResult
fill_output_argument(output_spec, cardinality, desired_type)[source]

This is an abstract class method. The method should take the argument_dict generated from calling self.get_argument_dict() and turn it into a list of commandline arguments that represent this Input/Output.

Parameters:
  • cardinality (int) – the cardinality for this output (can be non for automatic outputs)
  • desired_type (DataType) – the desired datatype for this output
Returns:

the values for this output

Return type:

list

fullid

The full id of the job

get_deferred(output_id, cardinality_nr, sample_id=None)[source]

Get a deferred pointing to a specific output value in the Job

Parameters:
  • output_id (str) – the output to select from
  • cardinality_nr (int) – the index of the cardinality
  • sample_id (str) – the sample id to select (optional)
Returns:

The deferred

get_output_datatype(output_id)[source]

Get the datatype for a specific output

Parameters:output_id (str) – the id of the output to get the datatype for
Returns:the requested datatype
Return type:BaseDataType
get_result()[source]

Get the result of the job if it is available. Load the output file if found and check if the job matches the current object. If so, load and return the result.

Returns:Job after execution or None if not available
Return type:Job | None
classmethod get_value(value)[source]

Get a value

Parameters:
  • value – the url of the value
  • datatype – datatype of the value
Returns:

the retrieved value

hash_inputs()[source]

Create hashes for all input values and store them in the info store

hash_results()[source]

Create hashes of all output values and store them in the info store

id

The id of this job

logfile

The path of the result pickle

logurl

The url of the result pickle

required_cores

Number of required cores

required_memory

Number of required memory

required_time

Number of required runtime

status

The status of the job

stderrfile

The path where the stderr text is saved

stderrurl

The url where the stderr text is saved

stdoutfile

The path where the stdout text is saved

stdouturl

The url where the stdout text is saved

tmpurl

The URL of the tmpdir to use

tool
translate_argument(value)[source]

Translate an argument from a URL to an actual path.

Parameters:
  • value – value to translate
  • datatype – the datatype of the value
Returns:

the translated value

translate_results(result)[source]

Translate the results of an interface (using paths etc) to the proper form using URI’s instead.

Parameters:result (dict) – the result data of an interface
Returns:the translated result
Return type:dict
validate_results(payload)[source]

Validate the results of the Job

Returns:flag indicating the results are complete and valid
write()[source]
class fastr.execution.job.JobState(_, stage, error)[source]

Bases: enum.Enum

The possible states a Job can be in. An overview of the states and the adviced transitions are depicted in the following figure:

digraph jobstate { nonexistent [shape=box]; created [shape=box]; queued [shape=box]; hold [shape=box]; running [shape=box]; execution_done [shape=box]; execution_failed [shape=box]; processing_callback [shape=box]; finished [shape=box]; failed [shape=box]; cancelled [shape=box]; nonexistent -> created; created -> queued; created -> hold; hold -> queued; queued -> running; running -> execution_done; running -> execution_failed; execution_done -> processing_callback; execution_failed -> processing_callback; processing_callback -> finished; processing_callback -> failed; running -> cancelled; queued -> cancelled; hold -> cancelled; }

__format__(format_spec)
__init__(_, stage, error)[source]
__module__ = 'fastr.execution.job'
static __new__(value)
__reduce_ex__(proto)
__repr__()
__str__()
cancelled = ('cancelled', 'done', True)
created = ('created', 'idle', False)
done
execution_done = ('execution_done', 'in_progress', False)
execution_failed = ('execution_failed', 'in_progress', True)
failed = ('failed', 'done', True)
finished = ('finished', 'done', False)
hold = ('hold', 'idle', False)
in_progress
nonexistent = ('nonexistent', 'idle', False)
processing_callback = ('processing_callback', 'in_progress', False)
queued = ('queued', 'idle', False)
running = ('running', 'in_progress', False)
class fastr.execution.job.SinkJob(node, sample_id, sample_index, input_arguments, output_arguments, hold_jobs=None, substitutions=None, status_callback=None, preferred_types=None)[source]

Bases: fastr.execution.job.Job

Special SinkJob for the Sink

__init__(node, sample_id, sample_index, input_arguments, output_arguments, hold_jobs=None, substitutions=None, status_callback=None, preferred_types=None)[source]
__module__ = 'fastr.execution.job'
__repr__()[source]

String representation for the SinkJob

create_payload()[source]

Create the payload for this object based on all the input/output arguments

Returns:the payload
Return type:dict
get_result()[source]

Get the result of the job if it is available. Load the output file if found and check if the job matches the current object. If so, load and return the result.

Returns:Job after execution
hash_inputs()[source]

Create hashes for all input values and store them in the info store

id

The id of this job

substitute(value, datatype=None)[source]

Substitute the special fields that can be used in a SinkJob.

Parameters:
  • value (str) – the value to substitute fields in
  • datatype (BaseDataType) – the datatype for the value
Returns:

string with substitutions performed

Return type:

str

tmpurl

The URL of the tmpdir to use

validate_results(payload)[source]

Validate the results of the SinkJob

Returns:flag indicating the results are complete and valid
class fastr.execution.job.SourceJob(node, sample_id, sample_index, input_arguments, output_arguments, hold_jobs=None, status_callback=None, preferred_types=None)[source]

Bases: fastr.execution.job.Job

Special SourceJob for the Source

__module__ = 'fastr.execution.job'
__repr__()[source]

String representation for the SourceJob

get_output_datatype(output_id)[source]

Get the datatype for a specific output

Parameters:output_id (str) – the id of the output to get the datatype for
Returns:the requested datatype
Return type:BaseDataType
hash_inputs()[source]

Create hashes for all input values and store them in the info store

validate_results(payload)[source]

Validate the results of the Job

Returns:flag indicating the results are complete and valid

networkanalyzer Module

Module that defines the NetworkAnalyzer and holds the reference implementation.

class fastr.execution.networkanalyzer.DefaultNetworkAnalyzer[source]

Bases: fastr.execution.networkanalyzer.NetworkAnalyzer

Default implementation of the NetworkAnalyzer.

__module__ = 'fastr.execution.networkanalyzer'
analyze_network(network, chunk)[source]

Analyze a chunk of a Network. Simply process the Nodes in the chunk sequentially.

Parameters:
  • network – Network corresponding with the chunk
  • chunk – The chunk of the network to analyze
class fastr.execution.networkanalyzer.NetworkAnalyzer[source]

Bases: object

Base class for NetworkAnalyzers

__dict__ = dict_proxy({'__dict__': <attribute '__dict__' of 'NetworkAnalyzer' objects>, '__weakref__': <attribute '__weakref__' of 'NetworkAnalyzer' objects>, '__module__': 'fastr.execution.networkanalyzer', 'analyze_network': <function analyze_network>, '__doc__': '\n Base class for NetworkAnalyzers\n '})
__module__ = 'fastr.execution.networkanalyzer'
__weakref__

list of weak references to the object (if defined)

analyze_network(network, chunk)[source]

Analyze a chunk of a Network.

Parameters:
  • network – Network corresponding with the chunk
  • chunk – The chunk of the network to analyze

networkchunker Module

This module contains the NetworkChunker class and its default implementation the DefaultNetworkChunker

class fastr.execution.networkchunker.DefaultNetworkChunker[source]

Bases: fastr.execution.networkchunker.NetworkChunker

The default implementation of the NetworkChunker. It tries to create as large as possible chunks so the execution blocks as little as possible.

__init__()[source]
__module__ = 'fastr.execution.networkchunker'
chunck_network(network)[source]

Create a list of Network chunks that can be pre-analyzed completely. Each chunk needs to be executed before the next can be analyzed and executed.

The returned chunks are (at the moment) in the format of a tuple (start, nodes) which are both tuples. The tuple contain the nodes where to start execution (should ready if previous chunks are done) and all nodes of the chunk respectively.

Parameters:network – Network to split into chunks
Returns:tuple containing chunks
class fastr.execution.networkchunker.NetworkChunker[source]

Bases: object

The base class for NetworkChunkers. A Network chunker is a class that takes a Network and produces a list of chunks that can each be analyzed and executed in one go.

__dict__ = dict_proxy({'__dict__': <attribute '__dict__' of 'NetworkChunker' objects>, '__module__': 'fastr.execution.networkchunker', 'chunck_network': <function chunck_network>, '__weakref__': <attribute '__weakref__' of 'NetworkChunker' objects>, '__doc__': '\n The base class for NetworkChunkers. A Network chunker is a class that takes\n a Network and produces a list of chunks that can each be analyzed and\n executed in one go.\n '})
__module__ = 'fastr.execution.networkchunker'
__weakref__

list of weak references to the object (if defined)

chunck_network(network)[source]

Create a list of Network chunks that can be pre-analyzed completely. Each chunk needs to be executed before the next can be analyzed and executed.

Parameters:network – Network to split into chunks
Returns:list containing chunks