plugins Package

plugins Package

The plugins module holds all plugins loaded by Fastr. It is empty on start and gets filled by the BasePluginManager

class fastr.plugins.BlockingExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

The blocking execution plugin is a special plugin which is meant for debug purposes. It will not queue jobs but immediately execute them inline, effectively blocking fastr until the Job is finished. It is the simplest execution plugin and can be used as a template for new plugins or for testing purposes.

__abstractmethods__ = frozenset({})
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters
  • finished_callback – the callback to call after a job finished

  • cancelled_callback – the callback to call after a job cancelled

Returns

newly created ExecutionPlugin

__module__ = 'fastr.plugins'
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters

force – force cleanup (e.g. kill instead of join a process)

filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/blockingexecution.py'
module = <module 'blockingexecution' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/blockingexecution.py'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.CommaSeperatedValueFile

Bases: fastr.core.ioplugin.IOPlugin

The CommaSeperatedValueFile an expand-only type of IOPlugin. No URLs can actually be fetched, but it can expand a single URL into a larger amount of URLs.

The csv:// URL is a vfs:// URL with a number of query variables available. The URL mount and path should point to a valid CSV file. The query variable then specify what column(s) of the file should be used.

The following variable can be set in the query:

variable

usage

value

the column containing the value of interest, can be int for index or string for key

id

the column containing the sample id (optional)

header

indicates if the first row is considered the header, can be true or false (optional)

delimiter

the delimiter used in the csv file (optional)

quote

the quote character used in the csv file (optional)

reformat

a reformatting string so that value = reformat.format(value) (used before relative_path)

relative_path

indicates the entries are relative paths (for files), can be true or false (optional)

The header is by default false if the neither the value and id are set as a string. If either of these are a string, the header is required to define the column names and it automatically is assumed true

The delimiter and quota characters of the file should be detected automatically using the Sniffer, but can be forced by setting them in the URL.

Example of valid csv URLs:

# Use the first column in the file (no header row assumed)
csv://mount/some/dir/file.csv?value=0

# Use the images column in the file (first row is assumed header row)
csv://mount/some/dir/file.csv?value=images

# Use the segmentations column in the file (first row is assumed header row)
# and use the id column as the sample id
csv://mount/some/dir/file.csv?value=segmentations&id=id

# Use the first column as the id and the second column as the value
# and skip the first row (considered the header)
csv://mount/some/dir/file.csv?value=1&id=0&header=true

# Use the first column and force the delimiter to be a comma
csv://mount/some/dir/file.csv?value=0&delimiter=,
__abstractmethods__ = frozenset({})
__init__()[source]

Initialization for the IOPlugin

Returns

newly created IOPlugin

__module__ = 'fastr.plugins'
expand_url(url)[source]

(abstract) Expand an URL. This allows a source to collect multiple samples from a single url. The URL will have a wildcard or point to something with info and multiple urls will be returned.

Parameters

url (str) – url to expand

Returns

the resulting url(s), a tuple if multiple, otherwise a str

Return type

str or tuple of str

filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/commaseperatedvaluefile.py'
module = <module 'commaseperatedvaluefile' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/commaseperatedvaluefile.py'>
scheme = 'csv'
class fastr.plugins.CrossValidation

Bases: flowinterface.FlowPlugin

Advanced flow plugin that generated a cross-validation data flow. The node need an input with data and an input number of folds. Based on that the outputs test and train will be supplied with a number of data sets.

__abstractmethods__ = frozenset({})
__module__ = 'fastr.plugins'
static execute(payload)[source]
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/flowplugins/crossvalidation.py'
module = <module 'crossvalidation' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/flowplugins/crossvalidation.py'>
class fastr.plugins.DRMAAExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A DRMAA execution plugin to execute Jobs on a Grid Engine cluster. It uses a configuration option for selecting the queue to submit to. It uses the python drmaa package.

Note

To use this plugin, make sure the drmaa package is installed and that the execution is started on an SGE submit host with DRMAA libraries installed.

Note

This plugin is at the moment tailored to SGE, but it should be fairly easy to make different subclasses for different DRMAA supporting systems.

CANCELS_DEPENDENCIES = False
GE_NATIVE_SPEC = {'CWD': '-cwd', 'DEPENDS': '-hold_jid {hold_list}', 'DEPENDS_SEP': ',', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l h_vmem={memory}', 'NCORES': '-pe smp {ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l h_rt={walltime}'}
NATIVE_SPEC = {'grid_engine': {'CWD': '-cwd', 'DEPENDS': '-hold_jid {hold_list}', 'DEPENDS_SEP': ',', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l h_vmem={memory}', 'NCORES': '-pe smp {ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l h_rt={walltime}'}, 'torque': {'CWD': '', 'DEPENDS': '-W depend=afterok:{hold_list}', 'DEPENDS_SEP': ':', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l mem={memory}', 'NCORES': '-l procs={ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l walltime={walltime}'}}
SUPPORTS_CANCEL = True
SUPPORTS_DEPENDENCY = True
SUPPORTS_HOLD_RELEASE = True
TORQUE_NATIVE_SPEC = {'CWD': '', 'DEPENDS': '-W depend=afterok:{hold_list}', 'DEPENDS_SEP': ':', 'ERRORLOG': '-e {errorlog}', 'HOLD': '-h', 'MEMORY': '-l mem={memory}', 'NCORES': '-l procs={ncores:d}', 'OUTPUTLOG': '-o {outputlog}', 'QUEUE': '-q {queue}', 'WALLTIME': '-l walltime={walltime}'}
__abstractmethods__ = frozenset({})
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters
  • finished_callback – the callback to call after a job finished

  • cancelled_callback – the callback to call after a job cancelled

Returns

newly created ExecutionPlugin

__module__ = 'fastr.plugins'
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters

force – force cleanup (e.g. kill instead of join a process)

collect_jobs()[source]
configuration_fields = {'drmaa_engine': (<class 'str'>, 'grid_engine', 'The engine to use (options: grid_engine, torque'), 'drmaa_job_check_interval': (<class 'int'>, 900, 'The interval in which the job checker will startto check for stale jobs'), 'drmaa_max_jobs': (<class 'int'>, 0, 'The maximum jobs that can be send to the scheduler at the same time (0 for no limit)'), 'drmaa_queue': (<class 'str'>, 'week', 'The default queue to use for jobs send to the scheduler')}
create_native_spec(queue, walltime, memory, ncores, outputLog, errorLog, hold_job, hold)[source]

Create the native spec for the DRMAA scheduler. Needs to be implemented in the subclasses

Parameters
  • queue (str) – the queue to submit to

  • walltime (str) – walltime specified

  • memory (str) – memory requested

  • ncores (int) – number of cores requested

  • outputLog (str) – the location of the stdout log

  • errorLog (str) – the location of stderr log

  • hold_job (list) – list of jobs to depend on

  • hold (bool) – flag if job should be submitted in hold mode

Returns

current_jobs
dispatch_callbacks()[source]
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/drmaaexecution.py'
job_regression_check()[source]
module = <module 'drmaaexecution' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/drmaaexecution.py'>
send_job(command, arguments, queue=None, resources=None, job_name=None, joinLogFiles=False, outputLog=None, errorLog=None, hold_job=None, hold=False)[source]
spec_fields
submit_jobs()[source]
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.DockerTarget(binary, docker_image)

Bases: fastr.core.target.Target

A tool target that is located in a Docker images. Can be run using docker-py. A docker target only need two variables: the binary to call within the docker container, and the docker container to use.

{
  "arch": "*",
  "os": "*",
  "binary": "bin/test.py",
  "docker_image": "fastr/test"
}
<target os="*" arch="*" binary="bin/test.py" docker_image="fastr/test">
__abstractmethods__ = frozenset({})
__enter__()[source]

Set the environment in such a way that the target will be on the path.

__exit__(exc_type, exc_value, traceback)[source]

Cleanup the environment where needed

__init__(binary, docker_image)[source]

Define a new docker target.

Parameters

docker_image (str) – Docker image to use

__module__ = 'fastr.plugins'
container
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/targetplugins/dockertarget.py'
module = <module 'dockertarget' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/targetplugins/dockertarget.py'>
monitor_docker(container, resources)[source]

Monitor a docker container and profile the cpu, memory and io use. Register the resource use every _MONITOR_INTERVAL seconds.

Parameters
  • container (ContainerCollection) – process to monitor

  • resources (ProcessUsageCollection) – list to append measurements to

run_command(command)[source]

Run a command with the target

Return type

TargetResult

class fastr.plugins.ElasticsearchReporter

Bases: fastr.plugins.reportingplugin.ReportingPlugin

__abstractmethods__ = frozenset({})
__init__()[source]

The BasePlugin constructor.

Returns

the created plugin

Return type

BasePlugin

Raises

FastrPluginNotLoaded – if the plugin did not load correctly

__module__ = 'fastr.plugins'
activate()[source]

Activate the reporting plugin

configuration_fields = {'elasticsearch_debug': (<class 'bool'>, False, 'Setup elasticsearch debug mode to send stdout stderr on job succes'), 'elasticsearch_host': (<class 'str'>, '', 'The elasticsearch host to report to'), 'elasticsearch_index': (<class 'str'>, 'fastr', 'The elasticsearch index to store data in')}
elasticsearch_update_status(job)[source]
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/reportingplugins/elasticsearchreporter.py'
job_updated(job)[source]
module = <module 'elasticsearchreporter' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/reportingplugins/elasticsearchreporter.py'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.FastrInterface(id_, document)

Bases: fastr.core.interface.Interface

The default Interface for fastr. For the command-line Tools as used by fastr. It build a commandline call based on the input/output specification.

The fields that can be set in the interface:

Attribute

Description

id

The id of this Tool (used internally in fastr)

inputs[]

List of Inputs that can are accepted by the Tool

id

ID of the Input

name

Longer name of the Input (more human readable)

datatype

The ID of the DataType of the Input 1

enum[]

List of possible values for an EnumType (created on the fly by fastr) 1

prefix

Commandline prefix of the Input (e.g. –in, -i)

cardinality

Cardinality of the Input

repeat_prefix

Flag indicating if for every value of the Input the prefix is repeated

required

Flag indicating if the input is required

nospace

Flag indicating if there is no space between prefix and value (e.g. –in=val)

format

For DataTypes that have multiple representations, indicate which one to use

default

Default value for the Input

description

Long description for an input

outputs[]

List of Outputs that are generated by the Tool (and accessible to fastr)

id

ID of the Output

name

Longer name of the Output (more human readable)

datatype

The ID of the DataType of the Output 1

enum[]

List of possible values for an EnumType (created on the fly by fastr) 1

prefix

Commandline prefix of the Output (e.g. –out, -o)

cardinality

Cardinality of the Output

repeat_prefix

Flag indicating if for every value of the Output the prefix is repeated

required

Flag indicating if the input is required

nospace

Flag indicating if there is no space between prefix and value (e.g. –out=val)

format

For DataTypes that have multiple representations, indicate which one to use

description

Long description for an input

action

Special action (defined per DataType) that needs to be performed before creating output value (e.g. ‘ensure’ will make sure an output directory exists)

automatic

Indicate that output doesn’t require commandline argument, but is created automatically by a Tool 2

method

The collector plugin to use for the gathering automatic output, see the Collector plugins

location

Definition where to an automatically, usage depends on the method 2

Footnotes

1(1,2,3,4)

datatype and enum are conflicting entries, if both specified datatype has presedence

2(1,2)

More details on defining automatica output are given in [TODO]

__abstractmethods__ = frozenset({})
__dataschemafile__ = 'FastrInterface.schema.json'
__eq__(other)[source]

Return self==value.

__getstate__()[source]

Get the state of the FastrInterface object.

Returns

state of interface

Return type

dict

__hash__ = None
__init__(id_, document)[source]

The BasePlugin constructor.

Returns

the created plugin

Return type

BasePlugin

Raises

FastrPluginNotLoaded – if the plugin did not load correctly

__module__ = 'fastr.plugins'
__setstate__(state)[source]

Set the state of the Interface

check_input_id(id_)[source]

Check if an id for an object is valid and unused in the Tool. The method will always returns True if it does not raise an exception.

Parameters

id (str) – the id to check

Returns

True

Raises
check_output_id(id_)[source]

Check if an id for an object is valid and unused in the Tool. The method will always returns True if it does not raise an exception.

Parameters

id (str) – the id to check

Returns

True

Raises
static collect_errors(result)[source]

Special error collection for fastr interfaces

collect_results(result)[source]

Collect all results of the interface

collector_plugin_type

alias of fastrinterface.CollectorPlugin

collectors = CollectorPluginManager Loaded json : <CollectorPlugin: JsonCollector> Loaded path : <CollectorPlugin: PathCollector> Loaded stdout : <CollectorPlugin: StdoutCollector>
execute(target, payload)[source]

Execute the interface using a specific target and payload (containing a set of values for the arguments)

Parameters
  • target (SampleId) – the target to use

  • payload (dict) – the values for the arguments

Returns

result of the execution

Return type

InterfaceResult

expanding

Indicates whether or not this Interface will result in multiple samples per run. If the flow is unaffected, this will be zero, if it is nonzero it means that number of dimension will be added to the sample array.

filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/interfaceplugins/fastrinterface.py'
get_arguments(values)[source]

Get the argument list for this interface

Returns

return list of arguments

get_command(target, payload)[source]
get_specials(payload, output, cardinality_nr)[source]

Get special attributes. Returns tuples for specials, inputs and outputs that are used for formatting substitutions.

Parameters
  • output – Output for which to get the specials

  • cardinality_nr (int) – the cardinality number

inputs

OrderedDict of Inputs connected to the Interface. The format should be {input_id: InputSpec}.

module = <module 'fastrinterface' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/interfaceplugins/fastrinterface.py'>
outputs

OrderedDict of Output connected to the Interface. The format should be {output_id: OutputSpec}.

class fastr.plugins.FileSystem

Bases: fastr.core.ioplugin.IOPlugin

The FileSystem plugin is create to handle file:// type or URLs. This is generally not a good practice, as this is not portable over between machines. However, for test purposes it might be useful.

The URL scheme is rather simple: file://host/path (see wikipedia for details)

We do not make use of the host part and at the moment only support localhost (just leave the host empty) leading to file:/// URLs.

Warning

This plugin ignores the hostname in the URL and does only accept driver letters on Windows in the form c:/

__abstractmethods__ = frozenset({})
__init__()[source]

Initialization for the IOPlugin

Returns

newly created IOPlugin

__module__ = 'fastr.plugins'
fetch_url(inurl, outpath)[source]

Fetch the files from the file.

Parameters
  • inurl – url to the item in the data store, starts with file://

  • outpath – path where to store the fetch data locally

fetch_value(inurl)[source]

Fetch a value from an external file file.

Parameters

inurl – url of the value to read

Returns

the fetched value

filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/filesystem.py'
module = <module 'filesystem' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/filesystem.py'>
path_to_url(path, mountpoint=None)[source]

Construct an url from a given mount point and a relative path to the mount point.

put_url(inpath, outurl)[source]

Put the files to the external data store.

Parameters
  • inpath – path of the local data

  • outurl – url to where to store the data, starts with file://

put_value(value, outurl)[source]

Put the value in the external data store.

Parameters
  • value – value to store

  • outurl – url to where to store the data, starts with file://

scheme = 'file'
url_to_path(url)[source]

Get the path to a file from a url. Currently supports the file:// scheme

Examples:

>>> 'file:///d:/data/project/file.ext'
'd:\data\project\file.ext'

Warning

file:// will not function cross platform and is mainly for testing

class fastr.plugins.FlowInterface(id_, document)

Bases: fastr.core.interface.Interface

The Interface use for AdvancedFlowNodes to create the advanced data flows that are not implemented in the fastr. This allows nodes to implement new data flows using the plugin system.

The definition of FlowInterfaces are very similar to the default FastrInterfaces.

Note

A flow interface should be using a specific FlowPlugin

__abstractmethods__ = frozenset({})
__dataschemafile__ = 'FastrInterface.schema.json'
__eq__(other)[source]

Return self==value.

__getstate__()[source]

Get the state of the FastrInterface object.

Returns

state of interface

Return type

dict

__hash__ = None
__init__(id_, document)[source]

The BasePlugin constructor.

Returns

the created plugin

Return type

BasePlugin

Raises

FastrPluginNotLoaded – if the plugin did not load correctly

__module__ = 'fastr.plugins'
__setstate__(state)[source]

Set the state of the Interface

execute(target, payload)[source]

Execute the interface given the a target and payload. The payload should have the form:

{
  'input': {
    'input_id_a': (value, value),
    'input_id_b': (value, value)
  },
  'output': {
    'output_id_a': (value, value),
    'output_id_b': (value, value)
  }
}
Parameters
  • target – the target to call

  • payload – the payload to use

Returns

the result of the execution

Return type

(tuple of) InterfaceResult

expanding

Indicates whether or not this Interface will result in multiple samples per run. If the flow is unaffected, this will be zero, if it is nonzero it means that number of dimension will be added to the sample array.

filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/interfaceplugins/flowinterface.py'
flow_plugin_type

alias of flowinterface.FlowPlugin

flow_plugins = FlowPluginManager Loaded CrossValidation : <FlowPlugin: CrossValidation>
inputs

OrderedDict of Inputs connected to the Interface. The format should be {input_id: InputSpec}.

module = <module 'flowinterface' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/interfaceplugins/flowinterface.py'>
outputs

OrderedDict of Output connected to the Interface. The format should be {output_id: OutputSpec}.

class fastr.plugins.HTTPPlugin

Bases: fastr.core.ioplugin.IOPlugin

Warning

This Plugin is still under development and has not been tested at all. example url: https://server.io/path/to/resource

__abstractmethods__ = frozenset({})
__init__()[source]

Initialization for the IOPlugin

Returns

newly created IOPlugin

__module__ = 'fastr.plugins'
fetch_url(inurl, outpath)[source]

Download file from server.

Parameters
  • inurl – url to the file.

  • outpath – path to store file

filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/httpplugin.py'
module = <module 'httpplugin' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/httpplugin.py'>
scheme = ('https', 'http')
class fastr.plugins.LinearExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

An execution engine that has a background thread that executes the jobs in order. The queue is a simple FIFO queue and there is one worker thread that operates in the background. This plugin is meant as a fallback when other plugins do not function properly. It does not multi-processing so it is safe to use in environments that do no support that.

__abstractmethods__ = frozenset({})
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters
  • finished_callback – the callback to call after a job finished

  • cancelled_callback – the callback to call after a job cancelled

Returns

newly created ExecutionPlugin

__module__ = 'fastr.plugins'
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters

force – force cleanup (e.g. kill instead of join a process)

exec_worker()[source]
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/linearexecution.py'
module = <module 'linearexecution' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/linearexecution.py'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.LocalBinaryTarget(binary, paths=None, environment_variables=None, initscripts=None, modules=None, interpreter=None, **kwargs)

Bases: fastr.core.target.SubprocessBasedTarget

A tool target that is a local binary on the system. Can be found using environmentmodules or a path on the executing machine. A local binary target has a number of fields that can be supplied:

  • binary (required): the name of the binary/script to call, can also be called bin for backwards compatibility.

  • modules: list of modules to load, this can be environmentmodules or lmod modules. If modules are given, the paths, environment_variables and initscripts are ignored.

  • paths: a list of paths to add following the structure {"value": "/path/to/dir", "type": "bin"}. The types can be bin if the it should be added to $PATH or lib if it should be added to te library path (e.g. $LD_LIBRARY_PATH for linux).

  • environment_variables: a dictionary of environment variables to set.

  • initscript: a list of script to run before running the main tool

  • interpreter: the interpreter to use to call the binary e.g. python

The LocalBinaryTarget will first check if there are modules given and the module subsystem is loaded. If that is the case it will simply unload all current modules and load the given modules. If not it will try to set up the environment itself by using the following steps:

  1. Prepend the bin paths to $PATH

  2. Prepend the lib paths to the correct environment variable

  3. Setting the other environment variables given ($PATH and the system library path are ignored and cannot be set that way)

  4. Call the initscripts one by one

The definition of the target in JSON is very straightforward:

{
  "binary": "bin/test.py",
  "interpreter": "python",
  "paths": [
    {
      "type": "bin",
      "value": "vfs://apps/test/bin"
    },
    {
      "type": "lib",
      "value": "./lib"
    }
  ],
  "environment_variables": {
    "othervar": 42,
    "short_var": 1,
    "testvar": "value1"
  },
  "initscripts": [
    "bin/init.sh"
  ],
  "modules": ["elastix/4.8"]
}

In XML the definition would be in the form of:

<target os="linux" arch="*" modules="elastix/4.8" bin="bin/test.py" interpreter="python">
  <paths>
    <path type="bin" value="vfs://apps/test/bin" />
    <path type="lib" value="./lib" />
  </paths>
  <environment_variables short_var="1">
    <testvar>value1</testvar>
    <othervar>42</othervar>
  </environment_variables>
  <initscripts>
    <initscript>bin/init.sh</initscript>
  </initscripts>
</target>
DYNAMIC_LIBRARY_PATH_DICT = {'darwin': 'DYLD_LIBRARY_PATH', 'linux': 'LD_LIBRARY_PATH', 'windows': 'PATH'}
__abstractmethods__ = frozenset({})
__enter__()[source]

Set the environment in such a way that the target will be on the path.

__exit__(exc_type, exc_value, traceback)[source]

Cleanup the environment

__init__(binary, paths=None, environment_variables=None, initscripts=None, modules=None, interpreter=None, **kwargs)[source]

Define a new local binary target. Must be defined either using paths and optionally environment_variables and initscripts, or enviroment modules.

__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/targetplugins/localbinarytarget.py'
module = <module 'localbinarytarget' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/targetplugins/localbinarytarget.py'>
paths
run_command(command)[source]

Run a command with the target

Return type

TargetResult

class fastr.plugins.MacroTarget(network_file, method=None, function='main')

Bases: fastr.core.target.Target

A target for MacroNodes. This target cannot be executed as the MacroNode handles execution differently. But this contains the information for the MacroNode to find the internal Network.

__abstractmethods__ = frozenset({})
__init__(network_file, method=None, function='main')[source]

Define a new local binary target. Must be defined either using paths and optionally environment_variables and initscripts, or enviroment modules.

__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/targetplugins/macrotarget.py'
module = <module 'macrotarget' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/targetplugins/macrotarget.py'>
run_command(command)[source]

Run a command with the target

classmethod test()[source]

Test if singularity is availble on the path

class fastr.plugins.NipypeInterface(id_, nipype_cls=None, document=None)

Bases: fastr.core.interface.Interface

Experimental interfaces to using nipype interfaces directly in fastr tools, only using a simple reference.

To create a tool using a nipype interface just create an interface with the correct type and set the nipype argument to the correct class. For example in an xml tool this would become:

<interface class="NipypeInterface">
  <nipype_class>nipype.interfaces.elastix.Registration</nipype_class>
</interface>

Note

To use these interfaces nipype should be installed on the system.

Warning

This interface plugin is basically functional, but highly experimental!

__abstractmethods__ = frozenset({})
__eq__(other)[source]

Return self==value.

__getstate__()[source]

Retrieve the state of the Interface

Returns

the state of the object

Rtype dict

__hash__ = None
__init__(id_, nipype_cls=None, document=None)[source]

The BasePlugin constructor.

Returns

the created plugin

Return type

BasePlugin

Raises

FastrPluginNotLoaded – if the plugin did not load correctly

__module__ = 'fastr.plugins'
__setstate__(state)[source]

Set the state of the Interface

execute(target, payload)[source]

Execute the interface using a specific target and payload (containing a set of values for the arguments)

Parameters
  • target (SampleId) – the target to use

  • payload (dict) – the values for the arguments

Returns

result of the execution

Return type

InterfaceResult

expanding

Indicates whether or not this Interface will result in multiple samples per run. If the flow is unaffected, this will be zero, if it is nonzero it means that number of dimension will be added to the sample array.

filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/interfaceplugins/nipypeinterface.py'
get_type(trait)[source]
inputs

OrderedDict of Inputs connected to the Interface. The format should be {input_id: InputSpec}.

module = <module 'nipypeinterface' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/interfaceplugins/nipypeinterface.py'>
outputs

OrderedDict of Output connected to the Interface. The format should be {output_id: OutputSpec}.

classmethod test()[source]

Test the plugin, interfaces do not need to be tested on import

class fastr.plugins.Null

Bases: fastr.core.ioplugin.IOPlugin

The Null plugin is create to handle null:// type or URLs. These URLs are indicating the sink should not do anything. The data is not written to anywhere. Besides the scheme, the rest of the URL is ignored.

__abstractmethods__ = frozenset({})
__init__()[source]

Initialization for the IOPlugin

Returns

newly created IOPlugin

__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/null.py'
module = <module 'null' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/null.py'>
put_url(inpath, outurl)[source]

Put the files to the external data store.

Parameters
  • inpath – path of the local data

  • outurl – url to where to store the data, starts with file://

put_value(value, outurl)[source]

Put the value in the external data store.

Parameters
  • value – value to store

  • outurl – url to where to store the data, starts with file://

scheme = 'null'
class fastr.plugins.PimReporter

Bases: fastr.plugins.reportingplugin.ReportingPlugin

SUPPORTED_APIS = {2: <class 'pimreporter.PimAPIv2'>}
__abstractmethods__ = frozenset({})
__init__()[source]

The BasePlugin constructor.

Returns

the created plugin

Return type

BasePlugin

Raises

FastrPluginNotLoaded – if the plugin did not load correctly

__module__ = 'fastr.plugins'
activate()[source]

Activate the reporting plugin

configuration_fields = {'pim_batch_size': (<class 'int'>, 100, 'Maximum number of jobs that can be send to PIM in a single interval'), 'pim_debug': (<class 'bool'>, False, 'Setup PIM debug mode to send stdout stderr on job succes'), 'pim_host': (<class 'str'>, '', 'The PIM host to report to'), 'pim_update_interval': (<class 'float'>, 2.5, 'The interval in which to send jobs to PIM')}
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/reportingplugins/pimreporter.py'
job_updated(job)[source]
log_record_emitted(record)[source]
module = <module 'pimreporter' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/reportingplugins/pimreporter.py'>
run_finished(run)[source]
run_started(run)[source]
class fastr.plugins.ProcessPoolExecution(finished_callback=None, cancelled_callback=None, nr_of_workers=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A local execution plugin that uses multiprocessing to create a pool of worker processes. This allows fastr to execute jobs in parallel with true concurrency. The number of workers can be specified in the fastr configuration, but the default amount is the number of cores - 1 with a minimum of 1.

Warning

The ProcessPoolExecution does not check memory requirements of jobs and running many workers might lead to memory starvation and thus an unresponsive system.

__abstractmethods__ = frozenset({})
__init__(finished_callback=None, cancelled_callback=None, nr_of_workers=None)[source]

Setup the ExecutionPlugin

Parameters
  • finished_callback – the callback to call after a job finished

  • cancelled_callback – the callback to call after a job cancelled

Returns

newly created ExecutionPlugin

__module__ = 'fastr.plugins'
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters

force – force cleanup (e.g. kill instead of join a process)

configuration_fields = {'process_pool_worker_number': (<class 'int'>, 3, 'Number of workers to use in a process pool')}
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/processpoolexecution.py'
job_finished_callback(result)[source]

Reciever for the callback, it will split the result tuple and call job_finished

Parameters

result (tuple) – return value of run_job

module = <module 'processpoolexecution' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/processpoolexecution.py'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.RQExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A execution plugin based on Redis Queue. Fastr will submit jobs to the redis queue and workers will peel the jobs from the queue and process them.

This system requires a running redis database and the database url has to be set in the fastr configuration.

Note

This execution plugin required the redis and rq packages to be installed before it can be loaded properly.

__abstractmethods__ = frozenset({})
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters
  • finished_callback – the callback to call after a job finished

  • cancelled_callback – the callback to call after a job cancelled

Returns

newly created ExecutionPlugin

__module__ = 'fastr.plugins'
check_finished()[source]
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters

force – force cleanup (e.g. kill instead of join a process)

configuration_fields = {'rq_host': (<class 'str'>, 'redis://localhost:6379/0', 'The url of the redis serving the redis queue'), 'rq_queue': (<class 'str'>, 'default', 'The redis queue to use')}
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/rqexecution.py'
module = <module 'rqexecution' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/rqexecution.py'>
classmethod run_job(job_id, job_command, job_stdout, job_stderr)[source]
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.Reference

Bases: fastr.core.ioplugin.IOPlugin

The Reference plugin is create to handle ref:// type or URLs. These URLs are to make the sink just write a simple reference file to the data. The reference file contains the DataType and the value so the result can be reconstructed. It for files just leaves the data on disk by reference. This plugin is not useful for production, but is used for testing purposes.

__abstractmethods__ = frozenset({})
__init__()[source]

Initialization for the IOPlugin

Returns

newly created IOPlugin

__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/reference.py'
module = <module 'reference' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/reference.py'>
push_sink_data(value, outurl, datatype=None)[source]

Write out the sink data from the inpath to the outurl.

Parameters
  • value (str) – the path of the data to be pushed

  • outurl (str) – the url to write the data to

  • datatype (DataType) – the datatype of the data, used for determining the total contents of the transfer

Returns

None

scheme = 'ref'
class fastr.plugins.S3Filesystem

Bases: fastr.core.ioplugin.IOPlugin

Warning

As this IOPlugin is under development, it has not been thoroughly tested.

example url: s3://bucket.server/path/to/resource

__abstractmethods__ = frozenset({})
__init__()[source]

Initialization for the IOPlugin

Returns

newly created IOPlugin

__module__ = 'fastr.plugins'
cleanup()[source]

(abstract) Clean up the IOPlugin. This is to do things like closing files or connections. Will be called when the plugin is no longer required.

expand_url(url)[source]

Expand an S3 URL. This allows a source to collect multiple samples from a single url.

Parameters

url (str) – url to expand

Returns

the resulting url(s), a tuple if multiple, otherwise a str

Return type

str or tuple of str

fetch_url(inurl, outpath)[source]

Get the file(s) or values from s3.

Parameters
  • inurl – url to the item in the data store

  • outpath – path where to store the fetch data locally

fetch_value(inurl)[source]

Fetch a value from S3

Parameters

inurl – url of the value to read

Returns

the fetched value

filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/s3filesystem.py'
module = <module 's3filesystem' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/s3filesystem.py'>
put_url(inpath, outurl)[source]

Upload the files to the S3 storage

Parameters
  • inpath – path to the local data

  • outurl – url to where to store the data in the external data store.

put_value(value, outurl)[source]

Put the value in S3

Parameters
  • value – value to store

  • outurl – url to where to store the data, starts with file://

scheme = ('s3', 's3list')
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.SimpleReport

Bases: fastr.plugins.reportingplugin.ReportingPlugin

__abstractmethods__ = frozenset({})
__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/reportingplugins/simplereport.py'
module = <module 'simplereport' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/reportingplugins/simplereport.py'>
run_finished(run)[source]
class fastr.plugins.SingularityTarget(binary, container, interpreter=None)

Bases: fastr.core.target.SubprocessBasedTarget

A tool target that is run using a singularity container, see the singulary website

  • binary (required): the name of the binary/script to call, can also be called bin for backwards compatibility.

  • container (required): the singularity container to run, this can be in url form for singularity

    pull or as a path to a local container

  • interpreter: the interpreter to use to call the binary e.g. python

SINGULARITY_BIN = 'singularity'
__abstractmethods__ = frozenset({})
__enter__()[source]

Set the environment in such a way that the target will be on the path.

__exit__(exc_type, exc_value, traceback)[source]

Cleanup the environment

__init__(binary, container, interpreter=None)[source]

Define a new local binary target. Must be defined either using paths and optionally environment_variables and initscripts, or enviroment modules.

__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/targetplugins/singularitytarget.py'
module = <module 'singularitytarget' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/targetplugins/singularitytarget.py'>
run_command(command)[source]

Run a command with the target

classmethod test()[source]

Test if singularity is availble on the path

class fastr.plugins.SlurmExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

SBATCH = 'sbatch'
SCANCEL = 'scancel'
SCONTROL = 'scontrol'
SQUEUE = 'squeue'
SQUEUE_FORMAT = '{"id": %.18i, "status": "%.2t"}'
STATUS_MAPPING = {' F': <JobState.failed: ('failed', 'done', True)>, ' R': <JobState.running: ('running', 'in_progress', False)>, 'CA': <JobState.cancelled: ('cancelled', 'done', True)>, 'CD': <JobState.finished: ('finished', 'done', False)>, 'CF': <JobState.running: ('running', 'in_progress', False)>, 'CG': <JobState.running: ('running', 'in_progress', False)>, 'NF': <JobState.failed: ('failed', 'done', True)>, 'PD': <JobState.queued: ('queued', 'idle', False)>, 'RV': <JobState.cancelled: ('cancelled', 'done', True)>, 'SE': <JobState.failed: ('failed', 'done', True)>, 'TO': <JobState.queued: ('queued', 'idle', False)>}
SUPPORTS_CANCEL = True
SUPPORTS_DEPENDENCY = True
SUPPORTS_HOLD_RELEASE = True
__abstractmethods__ = frozenset({})
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters
  • finished_callback – the callback to call after a job finished

  • cancelled_callback – the callback to call after a job cancelled

Returns

newly created ExecutionPlugin

__module__ = 'fastr.plugins'
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters

force – force cleanup (e.g. kill instead of join a process)

configuration_fields = {'slurm_job_check_interval': (<class 'int'>, 30, 'The interval in which the job checker will startto check for stale jobs'), 'slurm_partition': (<class 'str'>, '', 'The slurm partition to use')}
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/slurmexecution.py'
job_status_check()[source]
module = <module 'slurmexecution' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/slurmexecution.py'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.StrongrExecution(finished_callback=None, cancelled_callback=None)

Bases: fastr.plugins.executionplugin.ExecutionPlugin

A execution plugin based on Redis Queue. Fastr will submit jobs to the redis queue and workers will peel the jobs from the queue and process them.

This system requires a running redis database and the database url has to be set in the fastr configuration.

Note

This execution plugin required the redis and rq packages to be installed before it can be loaded properly.

__abstractmethods__ = frozenset({})
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters
  • finished_callback – the callback to call after a job finished

  • cancelled_callback – the callback to call after a job cancelled

Returns

newly created ExecutionPlugin

__module__ = 'fastr.plugins'
check_finished()[source]
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters

force – force cleanup (e.g. kill instead of join a process)

configuration_fields = {}
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/strongrexecution.py'
module = <module 'strongrexecution' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/executionplugins/strongrexecution.py'>
classmethod test()[source]

Test the plugin, default behaviour is just to instantiate the plugin

class fastr.plugins.VirtualFileSystem

Bases: fastr.core.vfs.VirtualFileSystem, fastr.core.ioplugin.IOPlugin

The virtual file system class. This is an IOPlugin, but also heavily used internally in fastr for working with directories. The VirtualFileSystem uses the vfs:// url scheme.

A typical virtual filesystem url is formatted as vfs://mountpoint/relative/dir/from/mount.ext

Where the mountpoint is defined in the Config file. A list of the currently known mountpoints can be found in the fastr.config object

>>> fastr.config.mounts
{'example_data': '/home/username/fastr-feature-documentation/fastr/fastr/examples/data',
 'home': '/home/username/',
 'tmp': '/home/username/FastrTemp'}

This shows that a url with the mount home such as vfs://home/tempdir/testfile.txt would be translated into /home/username/tempdir/testfile.txt.

There are a few default mount points defined by Fastr (that can be changed via the config file).

mountpoint

default location

home

the users home directory (expanduser('~/'))

tmp

the fastr temprorary dir, defaults to tempfile.gettempdir()

example_data

the fastr example data directory, defaults $FASTRDIR/example/data

__abstractmethods__ = frozenset({})
__module__ = 'fastr.plugins'
filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/virtualfilesystem.py'
module = <module 'virtualfilesystem' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/virtualfilesystem.py'>
scheme = 'vfs'
class fastr.plugins.VirtualFileSystemRegularExpression

Bases: fastr.core.ioplugin.IOPlugin

The VirtualFileSystemValueList an expand-only type of IOPlugin. No URLs can actually be fetched, but it can expand a single URL into a larger amount of URLs.

A vfsregex:// URL is a vfs URL that can contain regular expressions on every level of the path. The regular expressions follow the re module definitions.

An example of a valid URLs would be:

vfsregex://tmp/network_dir/.*/.*/__fastr_result__.pickle.gz
vfsregex://tmp/network_dir/nodeX/(?P<id>.*)/__fastr_result__.pickle.gz

The first URL would result in all the __fastr_result__.pickle.gz in the working directory of a Network. The second URL would only result in the file for a specific node (nodeX), but by adding the named group id using (?P<id>.*) the sample id of the data is automatically set to that group (see Regular Expression Syntax under the special characters for more info on named groups in regular expression).

Concretely if we would have a directory vfs://mount/somedir containing:

image_1/Image.nii
image_2/image.nii
image_3/anotherimage.nii
image_5/inconsistentnamingftw.nii

we could match these files using vfsregex://mount/somedir/(?P<id>image_\d+)/.*\.nii which would result in the following source data after expanding the URL:

{'image_1': 'vfs://mount/somedir/image_1/Image.nii',
 'image_2': 'vfs://mount/somedir/image_2/image.nii',
 'image_3': 'vfs://mount/somedir/image_3/anotherimage.nii',
 'image_5': 'vfs://mount/somedir/image_5/inconsistentnamingftw.nii'}

Showing the power of this regular expression filtering. Also it shows how the ID group from the URL can be used to have sensible sample ids.

Warning

due to the nature of regexp on multiple levels, this method can be slow when having many matches on the lower level of the path (because the tree of potential matches grows) or when directories that are parts of the path are very large.

__abstractmethods__ = frozenset({})
__init__()[source]

Initialization for the IOPlugin

Returns

newly created IOPlugin

__module__ = 'fastr.plugins'
expand_url(url)[source]

(abstract) Expand an URL. This allows a source to collect multiple samples from a single url. The URL will have a wildcard or point to something with info and multiple urls will be returned.

Parameters

url (str) – url to expand

Returns

the resulting url(s), a tuple if multiple, otherwise a str

Return type

str or tuple of str

filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/virtualfilesystemregularexpression.py'
module = <module 'virtualfilesystemregularexpression' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/virtualfilesystemregularexpression.py'>
scheme = 'vfsregex'
class fastr.plugins.VirtualFileSystemValueList

Bases: fastr.core.ioplugin.IOPlugin

The VirtualFileSystemValueList an expand-only type of IOPlugin. No URLs can actually be fetched, but it can expand a single URL into a larger amount of URLs. A vfslist:// URL basically is a url that points to a file using vfs. This file then contains a number lines each containing another URL.

If the contents of a file vfs://mount/some/path/contents would be:

vfs://mount/some/path/file1.txt
vfs://mount/some/path/file2.txt
vfs://mount/some/path/file3.txt
vfs://mount/some/path/file4.txt

Then using the URL vfslist://mount/some/path/contents as source data would result in the four files being pulled.

Note

The URLs in a vfslist file do not have to use the vfs scheme, but can use any scheme known to the Fastr system.

__abstractmethods__ = frozenset({})
__init__()[source]

Initialization for the IOPlugin

Returns

newly created IOPlugin

__module__ = 'fastr.plugins'
expand_url(url)[source]

(abstract) Expand an URL. This allows a source to collect multiple samples from a single url. The URL will have a wildcard or point to something with info and multiple urls will be returned.

Parameters

url (str) – url to expand

Returns

the resulting url(s), a tuple if multiple, otherwise a str

Return type

str or tuple of str

filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/virtualfilesystemvaluelist.py'
module = <module 'virtualfilesystemvaluelist' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/virtualfilesystemvaluelist.py'>
scheme = 'vfslist'
class fastr.plugins.XNATStorage

Bases: fastr.core.ioplugin.IOPlugin

Warning

As this IOPlugin is under development, it has not been thoroughly tested.

The XNATStorage plugin is an IOPlugin that can download data from and upload data to an XNAT server. It uses its own xnat:// URL scheme. This is a scheme specific for this plugin and though it looks somewhat like the XNAT rest interface, a different type or URL.

Data resources can be access directly by a data url:

xnat://xnat.example.com/data/archive/projects/sandbox/subjects/subject001/experiments/experiment001/scans/T1/resources/DICOM
xnat://xnat.example.com/data/archive/projects/sandbox/subjects/subject001/experiments/*_BRAIN/scans/T1/resources/DICOM

In the second URL you can see a wildcard being used. This is possible at long as it resolves to exactly one item.

The id query element will change the field from the default experiment to subject and the label query element sets the use of the label as the fastr id (instead of the XNAT id) to True (the default is False)

To disable https transport and use http instead the query string can be modified to add insecure=true. This will make the plugin send requests over http:

xnat://xnat.example.com/data/archive/projects/sandbox/subjects/subject001/experiments/*_BRAIN/scans/T1/resources/DICOM?insecure=true

For sinks it is import to know where to save the data. Sometimes you want to save data in a new assessor/resource and it needs to be created. To allow the Fastr sink to create an object in XNAT, you have to supply the type as a query parameter:

xnat://xnat.bmia.nl/data/archive/projects/sandbox/subjects/S01/experiments/_BRAIN/assessors/test_assessor/resources/IMAGE/files/image.nii.gz?resource_type=xnat:resourceCatalog&assessor_type=xnat:qcAssessmentData

Valid options are: subject_type, experiment_type, assessor_type, scan_type, and resource_type.

If you want to do a search where multiple resources are returned, it is possible to use a search url:

xnat://xnat.example.com/search?projects=sandbox&subjects=subject[0-9][0-9][0-9]&experiments=*_BRAIN&scans=T1&resources=DICOM

This will return all DICOMs for the T1 scans for experiments that end with _BRAIN that belong to a subjectXXX where XXX is a 3 digit number. By default the ID for the samples will be the experiment XNAT ID (e.g. XNAT_E00123). The wildcards that can be the used are the same UNIX shell-style wildcards as provided by the module fnmatch.

It is possible to change the id to a different fields id or label. Valid fields are project, subject, experiment, scan, and resource:

xnat://xnat.example.com/search?projects=sandbox&subjects=subject[0-9][0-9][0-9]&experiments=*_BRAIN&scans=T1&resources=DICOM&id=subject&label=true

The following variables can be set in the search query:

variable

default

usage

projects

*

The project(s) to select, can contain wildcards (see fnmatch)

subjects

*

The subject(s) to select, can contain wildcards (see fnmatch)

experiments

*

The experiment(s) to select, can contain wildcards (see fnmatch)

scans

*

The scan(s) to select, can contain wildcards (see fnmatch)

resources

*

The resource(s) to select, can contain wildcards (see fnmatch)

id

experiment

What field to use a the id, can be: project, subject, experiment, scan, or resource

label

false

Indicate the XNAT label should be used as fastr id, options true or false

insecure

false

Change the url scheme to be used to http instead of https

verify

true

(Dis)able the verification of SSL certificates

regex

false

Change search to use regex re.match() instead of fnmatch for matching

overwrite

false

Tell XNAT to overwrite existing files if a file with the name is already present

For storing credentials the .netrc file can be used. This is a common way to store credentials on UNIX systems. It is required that the file is only accessible by the owner only or a NetrcParseError will be raised. A netrc file is really easy to create, as its entries look like:

machine xnat.example.com
        login username
        password secret123

See the netrc module or the GNU inet utils website for more information about the .netrc file.

Note

On windows the location of the netrc file is assumed to be os.path.expanduser('~/_netrc'). The leading underscore is because windows does not like filename starting with a dot.

Note

For scan the label will be the scan type (this is initially the same as the series description, but can be updated manually or the XNAT scan type cleanup).

Warning

labels in XNAT are not guaranteed to be unique, so be careful when using them as the sample ID.

For background on XNAT, see the XNAT API DIRECTORY for the REST API of XNAT.

__abstractmethods__ = frozenset({})
__init__()[source]

Initialization for the IOPlugin

Returns

newly created IOPlugin

__module__ = 'fastr.plugins'
cleanup()[source]

(abstract) Clean up the IOPlugin. This is to do things like closing files or connections. Will be called when the plugin is no longer required.

connect(server, path='', insecure=False, verify=True)[source]
expand_url(url)[source]

(abstract) Expand an URL. This allows a source to collect multiple samples from a single url. The URL will have a wildcard or point to something with info and multiple urls will be returned.

Parameters

url (str) – url to expand

Returns

the resulting url(s), a tuple if multiple, otherwise a str

Return type

str or tuple of str

fetch_url(inurl, outpath)[source]

Get the file(s) or values from XNAT.

Parameters
  • inurl – url to the item in the data store

  • outpath – path where to store the fetch data locally

filename = '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/xnatstorage.py'
module = <module 'xnatstorage' from '/home/docs/checkouts/readthedocs.org/user_builds/fastr/envs/3.1.0/lib/python3.6/site-packages/fastr/resources/plugins/ioplugins/xnatstorage.py'>
parse_uri(url)[source]
put_url(inpath, outurl)[source]

Upload the files to the XNAT storage

Parameters
  • inpath – path to the local data

  • outurl – url to where to store the data in the external data store.

scheme = ('xnat', 'xnat+http', 'xnat+https')
server
static upload(resource, in_path, location, retries=3, overwrite=False)[source]
xnat
fastr.plugins.json

alias of fastr.plugins.JsonCollector

fastr.plugins.path

alias of fastr.plugins.PathCollector

fastr.plugins.stdout

alias of fastr.plugins.StdoutCollector

executionplugin Module

class fastr.plugins.executionplugin.ExecutionPlugin(finished_callback=None, cancelled_callback=None)[source]

Bases: fastr.abc.baseplugin.Plugin

This class is the base for all Plugins to execute jobs somewhere. There are many methods already in place for taking care of stuff.

There are fall-backs for certain features, but if a system already implements those it is usually preferred to skip the fall-back and let the external system handle it. There are a few flags to enable disable these features:

  • cls.SUPPORTS_CANCEL indicates that the plugin can cancel queued jobs

  • cls.SUPPORTS_HOLD_RELEASE indicates that the plugin can queue jobs in a hold state and can release them again (if not, the base plugin will create a hidden queue for held jobs). The plugin should respect the Job.status == JobState.hold when queueing jobs.

  • cls.SUPPORTS_DEPENDENCY indicate that the plugin can manage job dependencies, if not the base plugin job dependency system will be used and jobs with only be submitted when all dependencies are met.

  • cls.CANCELS_DEPENDENCIES indicates that if a job is cancelled it will automatically cancel all jobs depending on that job. If not the plugin traverse the dependency graph and kill each job manual.

    Note

    If a plugin supports dependencies it is assumed that when a job gets cancelled, the depending job also get cancelled automatically!

Most plugins should only need to redefine a few abstract methods:

  • __init__ the constructor

  • cleanup a clean up function that frees resources, closes connections, etc

  • _queue_job the method that queues the job for execution

Optionally an extra job finished callback could be added:

  • _job_finished extra callback for when a job finishes

If SUPPORTS_CANCEL is set to True, the plugin should also implement:

  • _cancel_job cancels a previously queued job

If SUPPORTS_HOLD_RELEASE is set to True, the plugin should also implement:

  • _hold_job hold_job a job that is currently held

  • _release_job releases a job that is currently held

If SUPPORTED_DEPENDENCY is set to True, the plugin should:

  • Make sure to use the Job.hold_jobs as a list of its dependencies

Not all of the functions need to actually do anything for a plugin. There are examples of plugins that do not really need a cleanup, but for safety you need to implement it. Just using a pass for the method could be fine in such a case.

Warning

When overwriting other functions, extreme care must be taken not to break the plugins working, as there is a lot of bookkeeping that can go wrong.

CANCELS_DEPENDENCIES = False

Indicates that when a job is cancelled the dependencies

SUPPORTS_CANCEL = False

Indicates if the plugin can cancel queued jobs

SUPPORTS_DEPENDENCY = False

Indicate if the plugin can manage job dependencies, if not the base plugin job dependency system will be used and jobs with only be submitted when all dependencies are met.

SUPPORTS_HOLD_RELEASE = False

Indicates if the plugin can queue jobs in a hold state and can release them again (if not, the base plugin will create a hidden queue for held jobs)

__abstractmethods__ = frozenset({'__init__', '_queue_job', 'cleanup'})
__del__()[source]

Cleanup if the variable was deleted on purpose

__enter__()[source]
__exit__(type_, value, tb)[source]
__init__(finished_callback=None, cancelled_callback=None)[source]

Setup the ExecutionPlugin

Parameters
  • finished_callback – the callback to call after a job finished

  • cancelled_callback – the callback to call after a job cancelled

Returns

newly created ExecutionPlugin

__module__ = 'fastr.plugins.executionplugin'
cancel_job(job)[source]

Cancel a job previously queued

Parameters

job – job to cancel

check_job_requirements(job_id)[source]

Check if the requirements for a job are fulfilled.

Parameters

job_id – job to check

Returns

directive what should happen with the job

Return type

JobAction

check_job_status(job_id)[source]

Get the status of a specified job

Parameters

job_id – the target job

Returns

the status of the job (or None if job not found)

clean_free_jobs(job)[source]
cleanup()[source]

Method to call to clean up the ExecutionPlugin. This can be to clear temporary data, close connections, etc.

Parameters

force – force cleanup (e.g. kill instead of join a process)

get_job(job_id)[source]
get_status(job)[source]
hold_job(job)[source]
job_finished(job, errors=None, blocking=False)[source]

The default callback that is called when a Job finishes. This will create a new thread that handles the actual callback.

Parameters
  • job (Job) – the job that finished

  • errors – optional list of errors encountered

  • blocking (bool) – if blocking, do not create threads

Returns

process_callbacks()[source]
queue_job(job)[source]

Add a job to the execution queue

Parameters

job (Job) – job to add

register_job(job)[source]
release_job(job)[source]

Release a job that has been put on hold

Parameters

job – job to release

show_jobs(req_status=None)[source]

List the queued jobs, possible filtered by status

Parameters

req_status – requested status to filter on

Returns

list of jobs

signal_dependent_jobs(job_id)[source]

Check all depedent jobs and process them if all their dependencies are met. :param job_id: :return:

class fastr.plugins.executionplugin.JobAction[source]

Bases: enum.Enum

Job actions that can be performed. This is used for checking if held jobs should be queued, held longer or be cancelled.

__module__ = 'fastr.plugins.executionplugin'
cancel = 'cancel'
hold = 'hold'
queue = 'queue'

reportingplugin Module

class fastr.plugins.reportingplugin.ReportingPlugin[source]

Bases: fastr.abc.baseplugin.Plugin

Base class for all reporting plugins. The plugin has a number of methods that can be implemented that will be called on certain events. On these events the plugin can inspect the presented data and take reporting actions.

__abstractmethods__ = frozenset({})
__module__ = 'fastr.plugins.reportingplugin'
activate()[source]
deactivate()[source]
job_updated(job)[source]
log_record_emitted(record)[source]
run_finished(run)[source]
run_started(run)[source]