plugins Package

plugins Package

The plugins module holds all plugins loaded by Fastr. It is empty on start and gets filled by the BasePluginManager

class fastr.plugins.BlockingExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

The blocking execution plugin is a special plugin which is meant for debug purposes. It will not queue jobs but immediately execute them inline, effectively blocking fastr until the Job is finished. It is the simplest execution plugin and can be used as a template for new plugins or for testing purposes.

__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled

newly created ExecutionPlugin

__module__ = 'fastr.plugins'

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
filename = '/home/docs/checkouts/'
module = <module 'blockingexecution' from '/home/docs/checkouts/'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.CommaSeperatedValueFile

Bases: fastr.core.ioplugin.IOPlugin

The CommaSeperatedValueFile an expand-only type of IOPlugin. No URLs can actually be fetched, but it can expand a single URL into a larger amount of URLs.

The csv:// URL is a vfs:// URL with a number of query variables available. The URL mount and path should point to a valid CSV file. The query variable then specify what column(s) of the file should be used.

The following variable can be set in the query:

variable usage
value the column containing the value of interest, can be int for index or string for key
id the column containing the sample id (optional)
header indicates if the first row is considered the header, can be true or false (optional)
delimiter the delimiter used in the csv file (optional)
quote the quote character used in the csv file (optional)
reformat a reformatting string so that value = reformat.format(value) (used before relative_path)
relative_path indicates the entries are relative paths (for files), can be true or false (optional)

The header is by default false if the neither the value and id are set as a string. If either of these are a string, the header is required to define the column names and it automatically is assumed true

The delimiter and quota characters of the file should be detected automatically using the Sniffer, but can be forced by setting them in the URL.

Example of valid csv URLs:

# Use the first column in the file (no header row assumed)

# Use the images column in the file (first row is assumed header row)

# Use the segmentations column in the file (first row is assumed header row)
# and use the id column as the sample id

# Use the first column as the id and the second column as the value
# and skip the first row (considered the header)

# Use the first column and force the delimiter to be a comma
__abstractmethods__ = frozenset()

Initialization for the IOPlugin

Returns:newly created IOPlugin
__module__ = 'fastr.plugins'

(abstract) Expand an URL. This allows a source to collect multiple samples from a single url. The URL will have a wildcard or point to something with info and multiple urls will be returned.

Parameters:url (str) – url to expand
Returns:the resulting url(s), a tuple if multiple, otherwise a str
Return type:str or tuple of str
filename = '/home/docs/checkouts/'
module = <module 'commaseperatedvaluefile' from '/home/docs/checkouts/'>
scheme = 'csv'
class fastr.plugins.CrossValidation

Bases: flowinterface.FlowPlugin

Advanced flow plugin that generated a cross-validation data flow. The node need an input with data and an input number of folds. Based on that the outputs test and train will be supplied with a number of data sets.

__abstractmethods__ = frozenset()
__module__ = 'fastr.plugins'
static execute(payload)[source]
filename = '/home/docs/checkouts/'
module = <module 'crossvalidation' from '/home/docs/checkouts/'>
class fastr.plugins.DRMAAExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A DRMAA execution plugin to execute Jobs on a Grid Engine cluster. It uses a configuration option for selecting the queue to submit to. It uses the python drmaa package.


To use this plugin, make sure the drmaa package is installed and that the execution is started on an SGE submit host with DRMAA libraries installed.


This plugin is at the moment tailored to SGE, but it should be fairly easy to make different subclasses for different DRMAA supporting systems.

GE_NATIVE_SPEC = {'CWD': '-cwd', 'DEPENDS': '-hold_jid {hold_list}', 'DEPENDS_SEP': ',', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l h_vmem={memory}', 'NCORES': '-pe smp {ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l h_rt={walltime}'}
NATIVE_SPEC = {'grid_engine': {'CWD': '-cwd', 'DEPENDS': '-hold_jid {hold_list}', 'DEPENDS_SEP': ',', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l h_vmem={memory}', 'NCORES': '-pe smp {ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l h_rt={walltime}'}, 'torque': {'CWD': '', 'DEPENDS': '-W depend=afterok:{hold_list}', 'DEPENDS_SEP': ':', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l mem={memory}', 'NCORES': '-l procs={ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l walltime={walltime}'}}
TORQUE_NATIVE_SPEC = {'CWD': '', 'DEPENDS': '-W depend=afterok:{hold_list}', 'DEPENDS_SEP': ':', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l mem={memory}', 'NCORES': '-l procs={ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l walltime={walltime}'}
__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled

newly created ExecutionPlugin

__module__ = 'fastr.plugins'

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
configuration_fields = {'drmaa_engine': (<class 'str'>, 'grid_engine', 'The engine to use (options: grid_engine, torque'), 'drmaa_job_check_interval': (<class 'int'>, 900, 'The interval in which the job checker will startto check for stale jobs'), 'drmaa_max_jobs': (<class 'int'>, 0, 'The maximum jobs that can be send to the scheduler at the same time (0 for no limit)'), 'drmaa_queue': (<class 'str'>, 'week', 'The default queue to use for jobs send to the scheduler')}
create_native_spec(queue, walltime, memory, ncores, outputLog, errorLog, hold_job, hold)[source]

Create the native spec for the DRMAA scheduler. Needs to be implemented in the subclasses

  • queue (str) – the queue to submit to
  • walltime (str) – walltime specified
  • memory (str) – memory requested
  • ncores (int) – number of cores requested
  • outputLog (str) – the location of the stdout log
  • errorLog (str) – the location of stderr log
  • hold_job (list) – list of jobs to depend on
  • hold (bool) – flag if job should be submitted in hold mode

filename = '/home/docs/checkouts/'
module = <module 'drmaaexecution' from '/home/docs/checkouts/'>
send_job(command, arguments, queue=None, resources=None, job_name=None, joinLogFiles=False, outputLog=None, errorLog=None, hold_job=None, hold=False)[source]
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.DockerTarget(binary, docker_image)


A tool target that is located in a Docker images. Can be run using docker-py. A docker target only need two variables: the binary to call within the docker container, and the docker container to use.

  "arch": "*",
  "os": "*",
  "binary": "bin/",
  "docker_image": "fastr/test"
<target os="*" arch="*" binary="bin/" docker_image="fastr/test">
__abstractmethods__ = frozenset()

Set the environment in such a way that the target will be on the path.

__exit__(exc_type, exc_value, traceback)[source]

Cleanup the environment where needed

__init__(binary, docker_image)[source]

Define a new docker target.

Parameters:docker_image (str) – Docker image to use
__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/'
module = <module 'dockertarget' from '/home/docs/checkouts/'>
monitor_docker(container, resources)[source]

Monitor a docker container and profile the cpu, memory and io use. Register the resource use every _MONITOR_INTERVAL seconds.

  • container (ContainerCollection) – process to monitor
  • resources (ProcessUsageCollection) – list to append measurements to

Run a command with the target

Return type:TargetResult
class fastr.plugins.ElasticsearchReporter

Bases: fastr.plugins.reportingplugin.ReportingPlugin

__abstractmethods__ = frozenset()

The BasePlugin constructor.

Returns:the created plugin
Return type:BasePlugin
Raises:FastrPluginNotLoaded – if the plugin did not load correctly
__module__ = 'fastr.plugins'

Activate the reporting plugin

configuration_fields = {'elasticsearch_debug': (<class 'bool'>, False, 'Setup elasticsearch debug mode to send stdout stderr on job succes'), 'elasticsearch_host': (<class 'str'>, '', 'The elasticsearch host to report to'), 'elasticsearch_index': (<class 'str'>, 'fastr', 'The elasticsearch index to store data in')}
filename = '/home/docs/checkouts/'
module = <module 'elasticsearchreporter' from '/home/docs/checkouts/'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.FastrInterface(id_, document)

Bases: fastr.core.interface.Interface

The default Interface for fastr. For the command-line Tools as used by fastr. It build a commandline call based on the input/output specification.

The fields that can be set in the interface:

Attribute Description
id The id of this Tool (used internally in fastr)
inputs[]   List of Inputs that can are accepted by the Tool
id ID of the Input
name Longer name of the Input (more human readable)
datatype The ID of the DataType of the Input [1]
enum[] List of possible values for an EnumType (created on the fly by fastr) [1]
prefix Commandline prefix of the Input (e.g. –in, -i)
cardinality Cardinality of the Input
repeat_prefix Flag indicating if for every value of the Input the prefix is repeated
required Flag indicating if the input is required
nospace Flag indicating if there is no space between prefix and value (e.g. –in=val)
format For DataTypes that have multiple representations, indicate which one to use
default Default value for the Input
description Long description for an input
outputs[]   List of Outputs that are generated by the Tool (and accessible to fastr)
id ID of the Output
name Longer name of the Output (more human readable)
datatype The ID of the DataType of the Output [1]
enum[] List of possible values for an EnumType (created on the fly by fastr) [1]
prefix Commandline prefix of the Output (e.g. –out, -o)
cardinality Cardinality of the Output
repeat_prefix Flag indicating if for every value of the Output the prefix is repeated
required Flag indicating if the input is required
nospace Flag indicating if there is no space between prefix and value (e.g. –out=val)
format For DataTypes that have multiple representations, indicate which one to use
description Long description for an input
action Special action (defined per DataType) that needs to be performed before creating output value (e.g. ‘ensure’ will make sure an output directory exists)
automatic Indicate that output doesn’t require commandline argument, but is created automatically by a Tool [2]
method The collector plugin to use for the gathering automatic output, see the Collector plugins
location Definition where to an automatically, usage depends on the method [2]


[1](1, 2, 3, 4) datatype and enum are conflicting entries, if both specified datatype has presedence
[2](1, 2) More details on defining automatica output are given in [TODO]
__abstractmethods__ = frozenset()
__dataschemafile__ = 'FastrInterface.schema.json'

Return self==value.


Get the state of the FastrInterface object.

Returns:state of interface
Return type:dict
__hash__ = None
__init__(id_, document)[source]

The BasePlugin constructor.

Returns:the created plugin
Return type:BasePlugin
Raises:FastrPluginNotLoaded – if the plugin did not load correctly
__module__ = 'fastr.plugins'

Set the state of the Interface


Check if an id for an object is valid and unused in the Tool. The method will always returns True if it does not raise an exception.


id (str) – the id to check




Check if an id for an object is valid and unused in the Tool. The method will always returns True if it does not raise an exception.


id (str) – the id to check



static collect_errors(result)[source]

Special error collection for fastr interfaces


Collect all results of the interface


alias of fastrinterface.CollectorPlugin

collectors = CollectorPluginManager Loaded json : <CollectorPlugin: JsonCollector> Loaded path : <CollectorPlugin: PathCollector> Loaded stdout : <CollectorPlugin: StdoutCollector>
execute(target, payload)[source]

Execute the interface using a specific target and payload (containing a set of values for the arguments)

  • target (SampleId) – the target to use
  • payload (dict) – the values for the arguments

result of the execution

Return type:



Indicates whether or not this Interface will result in multiple samples per run. If the flow is unaffected, this will be zero, if it is nonzero it means that number of dimension will be added to the sample array.

filename = '/home/docs/checkouts/'

Get the argument list for this interface

Returns:return list of arguments
get_command(target, payload)[source]
get_specials(payload, output, cardinality_nr)[source]

Get special attributes. Returns tuples for specials, inputs and outputs that are used for formatting substitutions.

  • output – Output for which to get the specials
  • cardinality_nr (int) – the cardinality number

OrderedDict of Inputs connected to the Interface. The format should be {input_id: InputSpec}.

module = <module 'fastrinterface' from '/home/docs/checkouts/'>

OrderedDict of Output connected to the Interface. The format should be {output_id: OutputSpec}.

class fastr.plugins.FileSystem

Bases: fastr.core.ioplugin.IOPlugin

The FileSystem plugin is create to handle file:// type or URLs. This is generally not a good practice, as this is not portable over between machines. However, for test purposes it might be useful.

The URL scheme is rather simple: file://host/path (see wikipedia for details)

We do not make use of the host part and at the moment only support localhost (just leave the host empty) leading to file:/// URLs.


This plugin ignores the hostname in the URL and does only accept driver letters on Windows in the form c:/

__abstractmethods__ = frozenset()

Initialization for the IOPlugin

Returns:newly created IOPlugin
__module__ = 'fastr.plugins'
fetch_url(inurl, outpath)[source]

Fetch the files from the file.

  • inurl – url to the item in the data store, starts with file://
  • outpath – path where to store the fetch data locally

Fetch a value from an external file file.

Parameters:inurl – url of the value to read
Returns:the fetched value
filename = '/home/docs/checkouts/'
module = <module 'filesystem' from '/home/docs/checkouts/'>
path_to_url(path, mountpoint=None)[source]

Construct an url from a given mount point and a relative path to the mount point.

put_url(inpath, outurl)[source]

Put the files to the external data store.

  • inpath – path of the local data
  • outurl – url to where to store the data, starts with file://
put_value(value, outurl)[source]

Put the value in the external data store.

  • value – value to store
  • outurl – url to where to store the data, starts with file://
scheme = 'file'

Get the path to a file from a url. Currently supports the file:// scheme


>>> 'file:///d:/data/project/file.ext'


file:// will not function cross platform and is mainly for testing

class fastr.plugins.FlowInterface(id_, document)

Bases: fastr.core.interface.Interface

The Interface use for AdvancedFlowNodes to create the advanced data flows that are not implemented in the fastr. This allows nodes to implement new data flows using the plugin system.

The definition of FlowInterfaces are very similar to the default FastrInterfaces.


A flow interface should be using a specific FlowPlugin

__abstractmethods__ = frozenset()
__dataschemafile__ = 'FastrInterface.schema.json'

Return self==value.


Get the state of the FastrInterface object.

Returns:state of interface
Return type:dict
__hash__ = None
__init__(id_, document)[source]

The BasePlugin constructor.

Returns:the created plugin
Return type:BasePlugin
Raises:FastrPluginNotLoaded – if the plugin did not load correctly
__module__ = 'fastr.plugins'

Set the state of the Interface

execute(target, payload)[source]

Execute the interface given the a target and payload. The payload should have the form {‘input’: {‘input_id_a’: (value, value), ‘input_id_b’: (value, value)},

‘output’: {‘output_id_a’: (value, value), ‘output_id_b’: (value, value)}}
  • target – the target to call
  • payload – the payload to use

the result of the execution

Return type:

(tuple of) InterfaceResult


Indicates whether or not this Interface will result in multiple samples per run. If the flow is unaffected, this will be zero, if it is nonzero it means that number of dimension will be added to the sample array.

filename = '/home/docs/checkouts/'

alias of flowinterface.FlowPlugin

flow_plugins = FlowPluginManager Loaded CrossValidation : <FlowPlugin: CrossValidation>

OrderedDict of Inputs connected to the Interface. The format should be {input_id: InputSpec}.

module = <module 'flowinterface' from '/home/docs/checkouts/'>

OrderedDict of Output connected to the Interface. The format should be {output_id: OutputSpec}.

class fastr.plugins.HTTPPlugin

Bases: fastr.core.ioplugin.IOPlugin


This Plugin is still under development and has not been tested at all. example url:

__abstractmethods__ = frozenset()

Initialization for the IOPlugin

Returns:newly created IOPlugin
__module__ = 'fastr.plugins'
fetch_url(inurl, outpath)[source]

Download file from server.

  • inurl – url to the file.
  • outpath – path to store file
filename = '/home/docs/checkouts/'
module = <module 'httpplugin' from '/home/docs/checkouts/'>
scheme = ('https', 'http')
class fastr.plugins.LinearExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

An execution engine that has a background thread that executes the jobs in order. The queue is a simple FIFO queue and there is one worker thread that operates in the background. This plugin is meant as a fallback when other plugins do not function properly. It does not multi-processing so it is safe to use in environments that do no support that.

__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled

newly created ExecutionPlugin

__module__ = 'fastr.plugins'

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
filename = '/home/docs/checkouts/'
module = <module 'linearexecution' from '/home/docs/checkouts/'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.LocalBinaryTarget(binary, paths=None, environment_variables=None, initscripts=None, modules=None, interpreter=None, **kwargs)


A tool target that is a local binary on the system. Can be found using environmentmodules or a path on the executing machine. A local binary target has a number of fields that can be supplied:

  • binary (required): the name of the binary/script to call, can also be called bin for backwards compatibility.
  • modules: list of modules to load, this can be environmentmodules or lmod modules. If modules are given, the paths, environment_variables and initscripts are ignored.
  • paths: a list of paths to add following the structure {"value": "/path/to/dir", "type": "bin"}. The types can be bin if the it should be added to $PATH or lib if it should be added to te library path (e.g. $LD_LIBRARY_PATH for linux).
  • environment_variables: a dictionary of environment variables to set.
  • initscript: a list of script to run before running the main tool
  • interpreter: the interpreter to use to call the binary e.g. python

The LocalBinaryTarget will first check if there are modules given and the module subsystem is loaded. If that is the case it will simply unload all current modules and load the given modules. If not it will try to set up the environment itself by using the following steps:

  1. Prepend the bin paths to $PATH
  2. Prepend the lib paths to the correct environment variable
  3. Setting the other environment variables given ($PATH and the system library path are ignored and cannot be set that way)
  4. Call the initscripts one by one

The definition of the target in JSON is very straightforward:

  "binary": "bin/",
  "interpreter": "python",
  "paths": [
      "type": "bin",
      "value": "vfs://apps/test/bin"
      "type": "lib",
      "value": "./lib"
  "environment_variables": {
    "othervar": 42,
    "short_var": 1,
    "testvar": "value1"
  "initscripts": [
  "modules": ["elastix/4.8"]

In XML the definition would be in the form of:

<target os="linux" arch="*" modules="elastix/4.8" bin="bin/" interpreter="python">
    <path type="bin" value="vfs://apps/test/bin" />
    <path type="lib" value="./lib" />
  <environment_variables short_var="1">
__abstractmethods__ = frozenset()

Set the environment in such a way that the target will be on the path.

__exit__(exc_type, exc_value, traceback)[source]

Cleanup the environment

__init__(binary, paths=None, environment_variables=None, initscripts=None, modules=None, interpreter=None, **kwargs)[source]

Define a new local binary target. Must be defined either using paths and optionally environment_variables and initscripts, or enviroment modules.

__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/'
module = <module 'localbinarytarget' from '/home/docs/checkouts/'>

Run a command with the target

Return type:TargetResult
class fastr.plugins.MacroTarget(network_file, method=None, function='main')


A target for MacroNodes. This target cannot be executed as the MacroNode handles execution differently. But this contains the information for the MacroNode to find the internal Network.

__abstractmethods__ = frozenset()
__init__(network_file, method=None, function='main')[source]

Define a new local binary target. Must be defined either using paths and optionally environment_variables and initscripts, or enviroment modules.

__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/'
module = <module 'macrotarget' from '/home/docs/checkouts/'>

Run a command with the target

classmethod test()[source]

Test if singularity is availble on the path

class fastr.plugins.NipypeInterface(id_, nipype_cls=None, document=None)

Bases: fastr.core.interface.Interface

Experimental interfaces to using nipype interfaces directly in fastr tools, only using a simple reference.

To create a tool using a nipype interface just create an interface with the correct type and set the nipype argument to the correct class. For example in an xml tool this would become:

<interface class="NipypeInterface">


To use these interfaces nipype should be installed on the system.


This interface plugin is basically functional, but highly experimental!

__abstractmethods__ = frozenset()

Return self==value.


Retrieve the state of the Interface

Returns:the state of the object
Rtype dict:
__hash__ = None
__init__(id_, nipype_cls=None, document=None)[source]

The BasePlugin constructor.

Returns:the created plugin
Return type:BasePlugin
Raises:FastrPluginNotLoaded – if the plugin did not load correctly
__module__ = 'fastr.plugins'

Set the state of the Interface

execute(target, payload)[source]

Execute the interface using a specific target and payload (containing a set of values for the arguments)

  • target (SampleId) – the target to use
  • payload (dict) – the values for the arguments

result of the execution

Return type:



Indicates whether or not this Interface will result in multiple samples per run. If the flow is unaffected, this will be zero, if it is nonzero it means that number of dimension will be added to the sample array.

filename = '/home/docs/checkouts/'

OrderedDict of Inputs connected to the Interface. The format should be {input_id: InputSpec}.

module = <module 'nipypeinterface' from '/home/docs/checkouts/'>

OrderedDict of Output connected to the Interface. The format should be {output_id: OutputSpec}.

classmethod test()[source]

Test the plugin, interfaces do not need to be tested on import

class fastr.plugins.Null

Bases: fastr.core.ioplugin.IOPlugin

The Null plugin is create to handle null:// type or URLs. These URLs are indicating the sink should not do anything. The data is not written to anywhere. Besides the scheme, the rest of the URL is ignored.

__abstractmethods__ = frozenset()

Initialization for the IOPlugin

Returns:newly created IOPlugin
__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/'
module = <module 'null' from '/home/docs/checkouts/'>
put_url(inpath, outurl)[source]

Put the files to the external data store.

  • inpath – path of the local data
  • outurl – url to where to store the data, starts with file://
put_value(value, outurl)[source]

Put the value in the external data store.

  • value – value to store
  • outurl – url to where to store the data, starts with file://
scheme = 'null'
class fastr.plugins.PimReporter

Bases: fastr.plugins.reportingplugin.ReportingPlugin

SUPPORTED_APIS = {2: <class 'pimreporter.PimAPIv2'>}
__abstractmethods__ = frozenset()

The BasePlugin constructor.

Returns:the created plugin
Return type:BasePlugin
Raises:FastrPluginNotLoaded – if the plugin did not load correctly
__module__ = 'fastr.plugins'

Activate the reporting plugin

configuration_fields = {'pim_batch_size': (<class 'int'>, 100, 'Maximum number of jobs that can be send to PIM in a single interval'), 'pim_debug': (<class 'bool'>, False, 'Setup PIM debug mode to send stdout stderr on job succes'), 'pim_host': (<class 'str'>, '', 'The PIM host to report to'), 'pim_update_interval': (<class 'float'>, 2.5, 'The interval in which to send jobs to PIM')}
filename = '/home/docs/checkouts/'
module = <module 'pimreporter' from '/home/docs/checkouts/'>
class fastr.plugins.ProcessPoolExecution(finished_callback=None, cancelled_callback=None, nr_of_workers=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A local execution plugin that uses multiprocessing to create a pool of worker processes. This allows fastr to execute jobs in parallel with true concurrency. The number of workers can be specified in the fastr configuration, but the default amount is the number of cores - 1 with a minimum of 1.


The ProcessPoolExecution does not check memory requirements of jobs and running many workers might lead to memory starvation and thus an unresponsive system.

__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None, nr_of_workers=None)[source]

Setup the ExecutionPlugin

  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled

newly created ExecutionPlugin

__module__ = 'fastr.plugins'

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
configuration_fields = {'process_pool_worker_number': (<class 'int'>, 3, 'Number of workers to use in a process pool')}
filename = '/home/docs/checkouts/'

Reciever for the callback, it will split the result tuple and call job_finished

Parameters:result (tuple) – return value of run_job
module = <module 'processpoolexecution' from '/home/docs/checkouts/'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.RQExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A execution plugin based on Redis Queue. Fastr will submit jobs to the redis queue and workers will peel the jobs from the queue and process them.

This system requires a running redis database and the database url has to be set in the fastr configuration.


This execution plugin required the redis and rq packages to be installed before it can be loaded properly.

__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled

newly created ExecutionPlugin

__module__ = 'fastr.plugins'

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
configuration_fields = {'rq_host': (<class 'str'>, 'redis://localhost:6379/0', 'The url of the redis serving the redis queue'), 'rq_queue': (<class 'str'>, 'default', 'The redis queue to use')}
filename = '/home/docs/checkouts/'
module = <module 'rqexecution' from '/home/docs/checkouts/'>
classmethod run_job(job_id, job_command, job_stdout, job_stderr)[source]
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.Reference

Bases: fastr.core.ioplugin.IOPlugin

The Reference plugin is create to handle ref:// type or URLs. These URLs are to make the sink just write a simple reference file to the data. The reference file contains the DataType and the value so the result can be reconstructed. It for files just leaves the data on disk by reference. This plugin is not useful for production, but is used for testing purposes.

__abstractmethods__ = frozenset()

Initialization for the IOPlugin

Returns:newly created IOPlugin
__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/'
module = <module 'reference' from '/home/docs/checkouts/'>
push_sink_data(value, outurl, datatype=None)[source]

Write out the sink data from the inpath to the outurl.

  • value (str) – the path of the data to be pushed
  • outurl (str) – the url to write the data to
  • datatype (DataType) – the datatype of the data, used for determining the total contents of the transfer


scheme = 'ref'
class fastr.plugins.S3Filesystem

Bases: fastr.core.ioplugin.IOPlugin


As this IOPlugin is under development, it has not been thoroughly tested.

example url: s3://bucket.server/path/to/resource

__abstractmethods__ = frozenset()

Initialization for the IOPlugin

Returns:newly created IOPlugin
__module__ = 'fastr.plugins'

(abstract) Clean up the IOPlugin. This is to do things like closing files or connections. Will be called when the plugin is no longer required.


Expand an S3 URL. This allows a source to collect multiple samples from a single url.

Parameters:url (str) – url to expand
Returns:the resulting url(s), a tuple if multiple, otherwise a str
Return type:str or tuple of str
fetch_url(inurl, outpath)[source]

Get the file(s) or values from s3.

  • inurl – url to the item in the data store
  • outpath – path where to store the fetch data locally

Fetch a value from S3

Parameters:inurl – url of the value to read
Returns:the fetched value
filename = '/home/docs/checkouts/'
module = <module 's3filesystem' from '/home/docs/checkouts/'>
put_url(inpath, outurl)[source]

Upload the files to the S3 storage

  • inpath – path to the local data
  • outurl – url to where to store the data in the external data store.
put_value(value, outurl)[source]

Put the value in S3

  • value – value to store
  • outurl – url to where to store the data, starts with file://
scheme = ('s3', 's3list')
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.SimpleReport

Bases: fastr.plugins.reportingplugin.ReportingPlugin

__abstractmethods__ = frozenset()
__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/'
module = <module 'simplereport' from '/home/docs/checkouts/'>
class fastr.plugins.SingularityTarget(binary, container, interpreter=None)


A tool target that is run using a singularity container, see the singulary website

  • binary (required): the name of the binary/script to call, can also be called bin for backwards compatibility.
  • container (required): the singularity container to run, this can be in url form for singularity
    pull or as a path to a local container
  • interpreter: the interpreter to use to call the binary e.g. python
SINGULARITY_BIN = 'singularity'
__abstractmethods__ = frozenset()

Set the environment in such a way that the target will be on the path.

__exit__(exc_type, exc_value, traceback)[source]

Cleanup the environment

__init__(binary, container, interpreter=None)[source]

Define a new local binary target. Must be defined either using paths and optionally environment_variables and initscripts, or enviroment modules.

__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/'
module = <module 'singularitytarget' from '/home/docs/checkouts/'>

Run a command with the target

classmethod test()[source]

Test if singularity is availble on the path

class fastr.plugins.SlurmExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

SBATCH = 'sbatch'
SCANCEL = 'scancel'
SCONTROL = 'scontrol'
SQUEUE = 'squeue'
SQUEUE_FORMAT = '{"id": %.18i, "status": "%.2t"}'
STATUS_MAPPING = {' F': <JobState.failed: ('failed', 'done', True)>, ' R': <JobState.running: ('running', 'in_progress', False)>, 'CA': <JobState.cancelled: ('cancelled', 'done', True)>, 'CD': <JobState.finished: ('finished', 'done', False)>, 'CF': <JobState.running: ('running', 'in_progress', False)>, 'CG': <JobState.running: ('running', 'in_progress', False)>, 'NF': <JobState.failed: ('failed', 'done', True)>, 'PD': <JobState.queued: ('queued', 'idle', False)>, 'RV': <JobState.cancelled: ('cancelled', 'done', True)>, 'SE': <JobState.failed: ('failed', 'done', True)>, 'TO': <JobState.queued: ('queued', 'idle', False)>}
__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled

newly created ExecutionPlugin

__module__ = 'fastr.plugins'

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
configuration_fields = {'slurm_job_check_interval': (<class 'int'>, 30, 'The interval in which the job checker will startto check for stale jobs'), 'slurm_partition': (<class 'str'>, '', 'The slurm partition to use')}
filename = '/home/docs/checkouts/'
module = <module 'slurmexecution' from '/home/docs/checkouts/'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.StrongrExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A execution plugin based on Redis Queue. Fastr will submit jobs to the redis queue and workers will peel the jobs from the queue and process them.

This system requires a running redis database and the database url has to be set in the fastr configuration.


This execution plugin required the redis and rq packages to be installed before it can be loaded properly.

__abstractmethods__ = frozenset()
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled

newly created ExecutionPlugin

__module__ = 'fastr.plugins'

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
configuration_fields = {}
filename = '/home/docs/checkouts/'
module = <module 'strongrexecution' from '/home/docs/checkouts/'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.VirtualFileSystem

Bases: fastr.core.vfs.VirtualFileSystem, fastr.core.ioplugin.IOPlugin

The virtual file system class. This is an IOPlugin, but also heavily used internally in fastr for working with directories. The VirtualFileSystem uses the vfs:// url scheme.

A typical virtual filesystem url is formatted as vfs://mountpoint/relative/dir/from/mount.ext

Where the mountpoint is defined in the Config file. A list of the currently known mountpoints can be found in the fastr.config object

>>> fastr.config.mounts
{'example_data': '/home/username/fastr-feature-documentation/fastr/fastr/examples/data',
 'home': '/home/username/',
 'tmp': '/home/username/FastrTemp'}

This shows that a url with the mount home such as vfs://home/tempdir/testfile.txt would be translated into /home/username/tempdir/testfile.txt.

There are a few default mount points defined by Fastr (that can be changed via the config file).

mountpoint default location
home the users home directory (expanduser('~/'))
tmp the fastr temprorary dir, defaults to tempfile.gettempdir()
example_data the fastr example data directory, defaults $FASTRDIR/example/data
__abstractmethods__ = frozenset()
__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/'
module = <module 'virtualfilesystem' from '/home/docs/checkouts/'>
scheme = 'vfs'
class fastr.plugins.VirtualFileSystemRegularExpression

Bases: fastr.core.ioplugin.IOPlugin

The VirtualFileSystemValueList an expand-only type of IOPlugin. No URLs can actually be fetched, but it can expand a single URL into a larger amount of URLs.

A vfsregex:// URL is a vfs URL that can contain regular expressions on every level of the path. The regular expressions follow the re module definitions.

An example of a valid URLs would be:


The first URL would result in all the __fastr_result__.pickle.gz in the working directory of a Network. The second URL would only result in the file for a specific node (nodeX), but by adding the named group id using (?P<id>.*) the sample id of the data is automatically set to that group (see Regular Expression Syntax under the special characters for more info on named groups in regular expression).

Concretely if we would have a directory vfs://mount/somedir containing:


we could match these files using vfsregex://mount/somedir/(?P<id>image_\d+)/.*\.nii which would result in the following source data after expanding the URL:

{'image_1': 'vfs://mount/somedir/image_1/Image.nii',
 'image_2': 'vfs://mount/somedir/image_2/image.nii',
 'image_3': 'vfs://mount/somedir/image_3/anotherimage.nii',
 'image_5': 'vfs://mount/somedir/image_5/inconsistentnamingftw.nii'}

Showing the power of this regular expression filtering. Also it shows how the ID group from the URL can be used to have sensible sample ids.


due to the nature of regexp on multiple levels, this method can be slow when having many matches on the lower level of the path (because the tree of potential matches grows) or when directories that are parts of the path are very large.

__abstractmethods__ = frozenset()

Initialization for the IOPlugin

Returns:newly created IOPlugin
__module__ = 'fastr.plugins'

(abstract) Expand an URL. This allows a source to collect multiple samples from a single url. The URL will have a wildcard or point to something with info and multiple urls will be returned.

Parameters:url (str) – url to expand
Returns:the resulting url(s), a tuple if multiple, otherwise a str
Return type:str or tuple of str
filename = '/home/docs/checkouts/'
module = <module 'virtualfilesystemregularexpression' from '/home/docs/checkouts/'>
scheme = 'vfsregex'
class fastr.plugins.VirtualFileSystemValueList

Bases: fastr.core.ioplugin.IOPlugin

The VirtualFileSystemValueList an expand-only type of IOPlugin. No URLs can actually be fetched, but it can expand a single URL into a larger amount of URLs. A vfslist:// URL basically is a url that points to a file using vfs. This file then contains a number lines each containing another URL.

If the contents of a file vfs://mount/some/path/contents would be:


Then using the URL vfslist://mount/some/path/contents as source data would result in the four files being pulled.


The URLs in a vfslist file do not have to use the vfs scheme, but can use any scheme known to the Fastr system.

__abstractmethods__ = frozenset()

Initialization for the IOPlugin

Returns:newly created IOPlugin
__module__ = 'fastr.plugins'

(abstract) Expand an URL. This allows a source to collect multiple samples from a single url. The URL will have a wildcard or point to something with info and multiple urls will be returned.

Parameters:url (str) – url to expand
Returns:the resulting url(s), a tuple if multiple, otherwise a str
Return type:str or tuple of str
filename = '/home/docs/checkouts/'
module = <module 'virtualfilesystemvaluelist' from '/home/docs/checkouts/'>
scheme = 'vfslist'
class fastr.plugins.XNATStorage

Bases: fastr.core.ioplugin.IOPlugin


As this IOPlugin is under development, it has not been thoroughly tested.

The XNATStorage plugin is an IOPlugin that can download data from and upload data to an XNAT server. It uses its own xnat:// URL scheme. This is a scheme specific for this plugin and though it looks somewhat like the XNAT rest interface, a different type or URL.

Data resources can be access directly by a data url:


In the second URL you can see a wildcard being used. This is possible at long as it resolves to exactly one item.

The id query element will change the field from the default experiment to subject and the label query element sets the use of the label as the fastr id (instead of the XNAT id) to True (the default is False)

To disable https transport and use http instead the query string can be modified to add insecure=true. This will make the plugin send requests over http:


For sinks it is import to know where to save the data. Sometimes you want to save data in a new assessor/resource and it needs to be created. To allow the Fastr sink to create an object in XNAT, you have to supply the type as a query parameter:


Valid options are: subject_type, experiment_type, assessor_type, scan_type, and resource_type.

If you want to do a search where multiple resources are returned, it is possible to use a search url:


This will return all DICOMs for the T1 scans for experiments that end with _BRAIN that belong to a subjectXXX where XXX is a 3 digit number. By default the ID for the samples will be the experiment XNAT ID (e.g. XNAT_E00123). The wildcards that can be the used are the same UNIX shell-style wildcards as provided by the module fnmatch.

It is possible to change the id to a different fields id or label. Valid fields are project, subject, experiment, scan, and resource:


The following variables can be set in the search query:

variable default usage
projects * The project(s) to select, can contain wildcards (see fnmatch)
subjects * The subject(s) to select, can contain wildcards (see fnmatch)
experiments * The experiment(s) to select, can contain wildcards (see fnmatch)
scans * The scan(s) to select, can contain wildcards (see fnmatch)
resources * The resource(s) to select, can contain wildcards (see fnmatch)
id experiment What field to use a the id, can be: project, subject, experiment, scan, or resource
label false Indicate the XNAT label should be used as fastr id, options true or false
insecure false Change the url scheme to be used to http instead of https
verify true (Dis)able the verification of SSL certificates
regex false Change search to use regex re.match() instead of fnmatch for matching
overwrite false Tell XNAT to overwrite existing files if a file with the name is already present

For storing credentials the .netrc file can be used. This is a common way to store credentials on UNIX systems. It is required that the file is only accessible by the owner only or a NetrcParseError will be raised. A netrc file is really easy to create, as its entries look like:

        login username
        password secret123

See the netrc module or the GNU inet utils website for more information about the .netrc file.


On windows the location of the netrc file is assumed to be os.path.expanduser('~/_netrc'). The leading underscore is because windows does not like filename starting with a dot.


For scan the label will be the scan type (this is initially the same as the series description, but can be updated manually or the XNAT scan type cleanup).


labels in XNAT are not guaranteed to be unique, so be careful when using them as the sample ID.

For background on XNAT, see the XNAT API DIRECTORY for the REST API of XNAT.

__abstractmethods__ = frozenset()

Initialization for the IOPlugin

Returns:newly created IOPlugin
__module__ = 'fastr.plugins'

(abstract) Clean up the IOPlugin. This is to do things like closing files or connections. Will be called when the plugin is no longer required.

connect(server, path='', insecure=False, verify=True)[source]

(abstract) Expand an URL. This allows a source to collect multiple samples from a single url. The URL will have a wildcard or point to something with info and multiple urls will be returned.

Parameters:url (str) – url to expand
Returns:the resulting url(s), a tuple if multiple, otherwise a str
Return type:str or tuple of str
fetch_url(inurl, outpath)[source]

Get the file(s) or values from XNAT.

  • inurl – url to the item in the data store
  • outpath – path where to store the fetch data locally
filename = '/home/docs/checkouts/'
module = <module 'xnatstorage' from '/home/docs/checkouts/'>
put_url(inpath, outurl)[source]

Upload the files to the XNAT storage

  • inpath – path to the local data
  • outurl – url to where to store the data in the external data store.
scheme = ('xnat', 'xnat+http', 'xnat+https')
static upload(resource, in_path, location, retries=3, overwrite=False)[source]

alias of fastr.plugins.JsonCollector


alias of fastr.plugins.PathCollector


alias of fastr.plugins.StdoutCollector

executionplugin Module

class fastr.plugins.executionplugin.ExecutionPlugin(finished_callback=None, cancelled_callback=None)[source]


This class is the base for all Plugins to execute jobs somewhere. There are many methods already in place for taking care of stuff.

There are fall-backs for certain features, but if a system already implements those it is usually preferred to skip the fall-back and let the external system handle it. There are a few flags to enable disable these features:

  • cls.SUPPORTS_CANCEL indicates that the plugin can cancel queued jobs

  • cls.SUPPORTS_HOLD_RELEASE indicates that the plugin can queue jobs in a hold state and can release them again (if not, the base plugin will create a hidden queue for held jobs). The plugin should respect the Job.status == JobState.hold when queueing jobs.

  • cls.SUPPORTS_DEPENDENCY indicate that the plugin can manage job dependencies, if not the base plugin job dependency system will be used and jobs with only be submitted when all dependencies are met.

  • cls.CANCELS_DEPENDENCIES indicates that if a job is cancelled it will automatically cancel all jobs depending on that job. If not the plugin traverse the dependency graph and kill each job manual.


    If a plugin supports dependencies it is assumed that when a job gets cancelled, the depending job also get cancelled automatically!

Most plugins should only need to redefine a few abstract methods:

  • __init__ the constructor
  • cleanup a clean up function that frees resources, closes connections, etc
  • _queue_job the method that queues the job for execution

Optionally an extra job finished callback could be added:

  • _job_finished extra callback for when a job finishes

If SUPPORTS_CANCEL is set to True, the plugin should also implement:

  • _cancel_job cancels a previously queued job

If SUPPORTS_HOLD_RELEASE is set to True, the plugin should also implement:

  • _hold_job hold_job a job that is currently held
  • _release_job releases a job that is currently held

If SUPPORTED_DEPENDENCY is set to True, the plugin should:

  • Make sure to use the Job.hold_jobs as a list of its dependencies

Not all of the functions need to actually do anything for a plugin. There are examples of plugins that do not really need a cleanup, but for safety you need to implement it. Just using a pass for the method could be fine in such a case.


When overwriting other functions, extreme care must be taken not to break the plugins working, as there is a lot of bookkeeping that can go wrong.


Indicates that when a job is cancelled the dependencies


Indicates if the plugin can cancel queued jobs


Indicate if the plugin can manage job dependencies, if not the base plugin job dependency system will be used and jobs with only be submitted when all dependencies are met.


Indicates if the plugin can queue jobs in a hold state and can release them again (if not, the base plugin will create a hidden queue for held

__abstractmethods__ = frozenset({'__init__', 'cleanup', '_queue_job'})

Cleanup if the variable was deleted on purpose

__exit__(type_, value, tb)[source]
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

  • finished_callback – the callback to call after a job finished
  • cancelled_callback – the callback to call after a job cancelled

newly created ExecutionPlugin

__module__ = 'fastr.plugins.executionplugin'

Cancel a job previously queued

Parameters:job – job to cancel

Check if the requirements for a job are fulfilled.

Parameters:job_id – job to check
Returns:directive what should happen with the job
Return type:JobAction

Get the status of a specified job

Parameters:job_id – the target job
Returns:the status of the job (or None if job not found)

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters:force – force cleanup (e.g. kill instead of join a process)
job_finished(job, errors=None, blocking=False)[source]

The default callback that is called when a Job finishes. This will create a new thread that handles the actual callback.

  • job (Job) – the job that finished
  • errors – optional list of errors encountered
  • blocking (bool) – if blocking, do not create threads


Add a job to the execution queue

Parameters:job (Job) – job to add

Release a job that has been put on hold

Parameters:job – job to release

List the queued jobs, possible filtered by status

Parameters:req_status – requested status to filter on
Returns:list of jobs

Check all depedent jobs and process them if all their dependencies are met. :param job_id: :return:

class fastr.plugins.executionplugin.JobAction[source]

Bases: enum.Enum

Job actions that can be performed. This is used for checking if held jobs should be queued, held longer or be cancelled.

__module__ = 'fastr.plugins.executionplugin'
cancel = 'cancel'
hold = 'hold'
queue = 'queue'

reportingplugin Module

class fastr.plugins.reportingplugin.ReportingPlugin[source]


Base class for all reporting plugins. The plugin has a number of methods that can be implemented that will be called on certain events. On these events the plugin can inspect the presented data and take reporting actions.

__abstractmethods__ = frozenset()
__module__ = 'fastr.plugins.reportingplugin'