execution Package¶
execution
Package¶
This package contains all modules related directly to the execution
environmentmodules
Module¶
This module contains a class to interact with EnvironmentModules
-
class
fastr.execution.environmentmodules.
EnvironmentModules
(protected=None)[source]¶ Bases:
object
This class can control the module environments in python. It can list, load and unload environmentmodules. These modules are then used if subprocess is
called from python.-
__dict__
= dict_proxy({'load': <function load>, '__module__': 'fastr.execution.environmentmodules', 'sync': <function sync>, '_sync_avail': <function _sync_avail>, '_run_commands_string': <function _run_commands_string>, 'reload': <function reload>, '_sync_loaded': <function _sync_loaded>, '__dict__': <attribute '__dict__' of 'EnvironmentModules' objects>, '__weakref__': <attribute '__weakref__' of 'EnvironmentModules' objects>, 'tostring_modvalue': <staticmethod object>, '__init__': <function __init__>, '_module': <function _module>, 'avail_modules': <property object>, 'unload': <function unload>, 'clear': <function clear>, '_module_settings_loaded': False, 'isloaded': <function isloaded>, 'avail': <function avail>, 'totuple_modvalue': <staticmethod object>, 'swap': <function swap>, '__doc__': '\n This class can control the module environments in python. It can list, load\n and unload environmentmodules. These modules are then used if subprocess is\n called from python.\n ', 'loaded_modules': <property object>})¶
-
__init__
(protected=None)[source]¶ Create the environmentmodules control object
Parameters: protected (list) – list of modules that should never be unloaded Returns: newly created EnvironmentModules
-
__module__
= 'fastr.execution.environmentmodules'¶
-
__weakref__
¶ list of weak references to the object (if defined)
-
avail
(namestart=None)[source]¶ Print available modules in same way as commandline version
Parameters: namestart – filter on modules that start with namestart
-
avail_modules
¶ List of avaible modules
-
clear
()[source]¶ Unload all modules (except the protected modules as they cannot be unloaded). This should result in a clean environment.
-
isloaded
(module)[source]¶ Check if a specific module is loaded
Parameters: module – module to check Returns: flag indicating the module is loaded
-
loaded_modules
¶ List of currently loaded modules
-
swap
(module1, module2)[source]¶ Swap one module for another one
Parameters: - module1 – module to unload
- module2 – module to load
-
sync
()[source]¶ Sync the object with the underlying enviroment. Re-checks the available and loaded modules
-
static
tostring_modvalue
(value)[source]¶ Turn a representation of a module into a string representation
Parameters: value – module representation (either str or tuple) Returns: string representation
-
executionpluginmanager
Module¶
This module holds the ExecutionPluginManager as well as the base-class for all ExecutionPlugins.
-
class
fastr.execution.executionpluginmanager.
ExecutionPlugin
(finished_callback=None, cancelled_callback=None, status_callback=None)[source]¶ Bases:
fastr.core.baseplugin.Plugin
This class is the base for all Plugins to execute jobs somewhere. There are many methods already in place for taking care of stuff. Most plugins should only need to redefine a few abstract methods:
__init__
the constructorcleanup
a clean up function that frees resources, closes connections, etc_queue_job
the method that queues the job for execution_cancel_job
cancels a previously queued job_release_job
releases a job that is currently held_job_finished
extra callback for when a job finishes
Not all of the functions need to actually do anything for a plugin. There are examples of plugins that do not really need a
cleanup
, but for safety you need to implement it. Just using apass
for the method could be fine in such a case.Warning
When overwriting other function, extreme care must be taken not to break the plugins working.
-
__abstractmethods__
= frozenset(['cleanup', '_queue_job', '_cancel_job', '_job_finished', '_release_job', '__init__'])¶
-
__init__
(finished_callback=None, cancelled_callback=None, status_callback=None)[source]¶ Setup the ExecutionPlugin
Parameters: - finished_callback – the callback to call after a job finished
- cancelled_callback – the callback to call after a job cancelled
Returns: newly created ExecutionPlugin
-
__module__
= 'fastr.execution.executionpluginmanager'¶
-
check_job_requirements
(jobid)[source]¶ Check if the requirements for a job are fulfilled.
Parameters: jobid – job to check Returns: directive what should happen with the job Return type: JobAction
-
check_job_status
(jobid)[source]¶ Get the status of a specified job
Parameters: jobid – the target job Returns: the status of the job (or None if job not found)
-
cleanup
()[source]¶ Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.
Parameters: force – force cleanup (e.g. kill instead of join a process)
-
class
fastr.execution.executionpluginmanager.
ExecutionPluginManager
[source]¶ Bases:
fastr.core.pluginmanager.PluginSubManager
Container holding all the ExecutionPlugins known to the Fastr system
-
__abstractmethods__
= frozenset([])¶
-
__init__
()[source]¶ Initialize a ExecutionPluginManager and load plugins.
Parameters: - path – path to search for plugins
- recursive – flag for searching recursively
Returns: newly created ExecutionPluginManager
-
__module__
= 'fastr.execution.executionpluginmanager'¶
-
plugin_class
¶ The class of the Plugins expected in this BasePluginManager
-
-
class
fastr.execution.executionpluginmanager.
JobAction
[source]¶ Bases:
enum.Enum
Job actions that can be performed. This is used for checking if held jobs should be queued, held longer or be cancelled.
-
__format__
(format_spec)¶
-
__module__
= 'fastr.execution.executionpluginmanager'¶
-
static
__new__
(value)¶
-
__reduce_ex__
(proto)¶
-
__repr__
()¶
-
__str__
()¶
-
cancel
= 'cancel'¶
-
hold
= 'hold'¶
-
queue
= 'queue'¶
-
executionscript
Module¶
The executionscript is the script that wraps around a tool executable. It takes a job, builds the command, executes the command (while profiling it) and collects the results.
job
Module¶
This module contains the Job class and some related classes.
-
class
fastr.execution.job.
InlineJob
(*args, **kwargs)[source]¶ Bases:
fastr.execution.job.Job
-
__module__
= 'fastr.execution.job'¶
-
-
class
fastr.execution.job.
Job
(jobid, tool_name, tool_version, nodeid, sample_id, sample_index, input_arguments, output_arguments, tmpdir, hold_jobs=None, timestamp=None, cores=None, memory=None, walltime=None, status_callback=None, preferred_types=None)[source]¶ Bases:
fastr.core.serializable.Serializable
Class describing a job.
Arguments: tool_name - the name of the tool (str) tool_version - the version of the tool (Version) argument - the arguments used when calling the tool (list) tmpdir - temporary directory to use to store output data hold_jobs - list of jobs that need to finished before this job can run (list)
-
COMMAND_DUMP
= '__fastr_command__.pickle.gz'¶
-
RESULT_DUMP
= '__fastr_result__.pickle.gz'¶
-
STDERR_DUMP
= '__fastr_stderr__.txt'¶
-
STDOUT_DUMP
= '__fastr_stdout__.txt'¶
-
__init__
(jobid, tool_name, tool_version, nodeid, sample_id, sample_index, input_arguments, output_arguments, tmpdir, hold_jobs=None, timestamp=None, cores=None, memory=None, walltime=None, status_callback=None, preferred_types=None)[source]¶ Create a job
Parameters: - jobid (str) – the job id
- tool_name (str) – the id of the tool
- tool_version (fastr.core.version.Version) – the version of the tool
- nodeid (str) – the id of the creating node
- sample_id (fastr.core.samples.SampleId) – the id of the sample
- input_arguments (list[dict]) – the argument list
- output_arguments (list[dict]) – the argument list
- tmpdir (str) – the workdir for this job
- hold_jobs (list[str]) – the jobs on which this jobs depend
- timestamp (datetime.datetime) – the time this job was spawned
- cores (int) – number of cores this jobs is allowed consume
- memory (str) – max amount of memory that this job is allowed to consume
- walltime (str) – max amount of time this job is allowed to run
Returns:
-
__module__
= 'fastr.execution.job'¶
-
commandfile
¶ The path of the command pickle
-
commandurl
¶ The url of the command pickle
-
create_payload
()[source]¶ Create the payload for this object based on all the input/output arguments
Returns: the payload Return type: dict
-
fill_output_argument
(output_spec, cardinality, desired_type)[source]¶ This is an abstract class method. The method should take the argument_dict generated from calling self.get_argument_dict() and turn it into a list of commandline arguments that represent this Input/Output.
Parameters: Returns: the values for this output
Return type:
-
fullid
¶ The full id of the job
-
get_output_datatype
(output_id)[source]¶ Get the datatype for a specific output
Parameters: output_id (str) – the id of the output to get the datatype for Returns: the requested datatype Return type: BaseDataType
-
get_result
()[source]¶ Get the result of the job if it is available. Load the output file if found and check if the job matches the current object. If so, load and return the result.
Returns: Job after execution or None if not available Return type: Job | None
-
classmethod
get_value
(value)[source]¶ Get a value
Parameters: - value – the url of the value
- datatype – datatype of the value
Returns: the retrieved value
-
id
¶ The id of this job
-
logfile
¶ The path of the result pickle
-
logurl
¶ The url of the result pickle
-
required_cores
¶ Number of required cores
-
required_memory
¶ Number of required memory
-
required_time
¶ Number of required runtime
-
status
¶ The status of the job
-
stderrfile
¶ The path where the stderr text is saved
-
stderrurl
¶ The url where the stderr text is saved
-
stdoutfile
¶ The path where the stdout text is saved
-
stdouturl
¶ The url where the stdout text is saved
-
tool
¶
-
translate_argument
(value)[source]¶ Translate an argument from a URL to an actual path.
Parameters: - value – value to translate
- datatype – the datatype of the value
Returns: the translated value
-
translate_results
(result)[source]¶ Translate the results of an interface (using paths etc) to the proper form using URI’s instead.
Parameters: result (dict) – the result data of an interface Returns: the translated result Return type: dict
-
-
class
fastr.execution.job.
JobState
(_, stage, error)[source]¶ Bases:
enum.Enum
The possible states a Job can be in. An overview of the states and the adviced transitions are depicted in the following figure:
-
__format__
(format_spec)¶
-
__module__
= 'fastr.execution.job'¶
-
static
__new__
(value)¶
-
__reduce_ex__
(proto)¶
-
__repr__
()¶
-
__str__
()¶
-
cancelled
= ('cancelled', 'done', True)¶
-
created
= ('created', 'idle', False)¶
-
done
¶
-
execution_done
= ('execution_done', 'in_progress', False)¶
-
execution_failed
= ('execution_failed', 'in_progress', True)¶
-
failed
= ('failed', 'done', True)¶
-
finished
= ('finished', 'done', False)¶
-
hold
= ('hold', 'idle', False)¶
-
in_progress
¶
-
nonexistent
= ('nonexistent', 'idle', False)¶
-
processing_callback
= ('processing_callback', 'in_progress', False)¶
-
queued
= ('queued', 'idle', False)¶
-
running
= ('running', 'in_progress', False)¶
-
-
class
fastr.execution.job.
SinkJob
(jobid, tool_name, tool_version, nodeid, sample_id, sample_index, input_arguments, output_arguments, tmpdir, hold_jobs=None, timestamp=None, cores=None, memory=None, walltime=None, substitutions=None, status_callback=None, preferred_types=None)[source]¶ Bases:
fastr.execution.job.Job
Special SinkJob for the Sink
-
__init__
(jobid, tool_name, tool_version, nodeid, sample_id, sample_index, input_arguments, output_arguments, tmpdir, hold_jobs=None, timestamp=None, cores=None, memory=None, walltime=None, substitutions=None, status_callback=None, preferred_types=None)[source]¶
-
__module__
= 'fastr.execution.job'¶
-
create_payload
()[source]¶ Create the payload for this object based on all the input/output arguments
Returns: the payload Return type: dict
-
get_result
()[source]¶ Get the result of the job if it is available. Load the output file if found and check if the job matches the current object. If so, load and return the result.
Returns: Job after execution
-
substitute
(value, datatype=None)[source]¶ Substitute the special fields that can be used in a SinkJob.
Parameters: - value (str) – the value to substitute fields in
- datatype (BaseDataType) – the datatype for the value
Returns: string with substitutions performed
Return type:
-
-
class
fastr.execution.job.
SourceJob
(jobid, tool_name, tool_version, nodeid, sample_id, sample_index, input_arguments, output_arguments, tmpdir, hold_jobs=None, timestamp=None, cores=None, memory=None, walltime=None, status_callback=None, preferred_types=None)[source]¶ Bases:
fastr.execution.job.Job
Special SourceJob for the Source
-
__module__
= 'fastr.execution.job'¶
-
get_output_datatype
(output_id)[source]¶ Get the datatype for a specific output
Parameters: output_id (str) – the id of the output to get the datatype for Returns: the requested datatype Return type: BaseDataType
-
networkanalyzer
Module¶
Module that defines the NetworkAnalyzer and holds the reference implementation.
-
class
fastr.execution.networkanalyzer.
DefaultNetworkAnalyzer
[source]¶ Bases:
fastr.execution.networkanalyzer.NetworkAnalyzer
Default implementation of the NetworkAnalyzer.
-
__module__
= 'fastr.execution.networkanalyzer'¶
-
-
class
fastr.execution.networkanalyzer.
NetworkAnalyzer
[source]¶ Bases:
object
Base class for NetworkAnalyzers
-
__dict__
= dict_proxy({'__dict__': <attribute '__dict__' of 'NetworkAnalyzer' objects>, '__weakref__': <attribute '__weakref__' of 'NetworkAnalyzer' objects>, '__module__': 'fastr.execution.networkanalyzer', 'analyze_network': <function analyze_network>, '__doc__': '\n Base class for NetworkAnalyzers\n '})¶
-
__module__
= 'fastr.execution.networkanalyzer'¶
-
__weakref__
¶ list of weak references to the object (if defined)
-
networkchunker
Module¶
This module contains the NetworkChunker class and its default implementation the DefaultNetworkChunker
-
class
fastr.execution.networkchunker.
DefaultNetworkChunker
[source]¶ Bases:
fastr.execution.networkchunker.NetworkChunker
The default implementation of the NetworkChunker. It tries to create as large as possible chunks so the execution blocks as little as possible.
-
__module__
= 'fastr.execution.networkchunker'¶
-
chunck_network
(network)[source]¶ Create a list of Network chunks that can be pre-analyzed completely. Each chunk needs to be executed before the next can be analyzed and executed.
The returned chunks are (at the moment) in the format of a tuple (start, nodes) which are both tuples. The tuple contain the nodes where to start execution (should ready if previous chunks are done) and all nodes of the chunk respectively.
Parameters: network – Network to split into chunks Returns: tuple containing chunks
-
-
class
fastr.execution.networkchunker.
NetworkChunker
[source]¶ Bases:
object
The base class for NetworkChunkers. A Network chunker is a class that takes a Network and produces a list of chunks that can each be analyzed and executed in one go.
-
__dict__
= dict_proxy({'__dict__': <attribute '__dict__' of 'NetworkChunker' objects>, '__module__': 'fastr.execution.networkchunker', 'chunck_network': <function chunck_network>, '__weakref__': <attribute '__weakref__' of 'NetworkChunker' objects>, '__doc__': '\n The base class for NetworkChunkers. A Network chunker is a class that takes\n a Network and produces a list of chunks that can each be analyzed and\n executed in one go.\n '})¶
-
__module__
= 'fastr.execution.networkchunker'¶
-
__weakref__
¶ list of weak references to the object (if defined)
-