Source code for fastr.plugins.managers.iopluginmanager
# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import urllib.parse as up
from .pluginmanager import PluginSubManager
from ...core.ioplugin import IOPlugin
from ...core.tool import Tool
from ...core.version import Version
[docs]class IOPluginManager(PluginSubManager):
"""
A mapping containing the IOPlugins known to this system
"""
[docs] def __init__(self, parent):
"""
Create the IOPluginManager and populate it.
:return: newly created IOPluginManager
"""
super().__init__(parent=parent,
plugin_class=IOPlugin)
self._key_map = {}
[docs] def cleanup(self):
"""
Cleanup all plugins, this closes files, connections and other things
that could be left dangling otherwise.
"""
for ioplugin in list(self.values()):
ioplugin.cleanup()
[docs] def __keytransform__(self, key):
try:
return self._key_map[key]
except KeyError:
self._key_map.clear()
for id_, value in self.data.items():
if isinstance(value.scheme, tuple):
for scheme in value.scheme:
self._key_map[scheme] = id_
else:
self._key_map[value.scheme] = id_
return self._key_map[key]
[docs] def __iter__(self):
for value in self.data.values():
if isinstance(value.scheme, tuple):
for scheme in value.scheme:
yield scheme
else:
yield value.scheme
def _print_key(self, key):
"""
Get a printable string for the IOPlugin key
:param key: key to get the printable version for
:return: printable version of the key
:rtype: str
"""
return self[key].status.value, '{}://'.format(key)
[docs] def expand_url(self, url):
"""
Expand the url by filling the wildcards. This function checks the url scheme
and uses the expand function of the correct IOPlugin.
:param str url: url to expand
:return: list of urls
:rtype: list of str
"""
parsed_url = up.urlparse(url)
return self[parsed_url.scheme].expand_url(url)
[docs] def pull_source_data(self, url, outdir, sample_id, datatype=None):
"""
Retrieve data from an external source. This function checks the url scheme and
selects the correct IOPlugin to retrieve the data.
:param url: url to pull
:param str outdir: the directory to write the data to
:param DataType datatype: the datatype of the data, used for determining
the total contents of the transfer
:return: None
"""
parsed_url = up.urlparse(url)
return self[parsed_url.scheme].pull_source_data(url, outdir, sample_id, datatype)
[docs] def push_sink_data(self, inpath, outurl, datatype=None):
"""
Send data to an external source. This function checks the url scheme and
selects the correct IOPlugin to retrieve the data.
:param str inpath: the path of the data to be pushed
:param str outurl: the url to write the data to
:param DataType datatype: the datatype of the data, used for determining
the total contents of the transfer
"""
parsed_url = up.urlparse(outurl)
return self[parsed_url.scheme].push_sink_data(inpath=inpath,
outurl=outurl,
datatype=datatype)
[docs] def put_url(self, inpath, outurl):
"""
Put the files to the external data store.
:param inpath: path to the local data
:param outurl: url to where to store the data in the external data store.
"""
parsed_url = up.urlparse(outurl)
return self[parsed_url.scheme].put_url(inpath, outurl)
[docs] def url_to_path(self, url):
"""
Retrieve the path for a given url
:param str url: the url to parse
:return: the path corresponding to the input url
:rtype: str
"""
parsed_url = up.urlparse(url)
return self[parsed_url.scheme].url_to_path(url)
[docs] @staticmethod
def register_url_scheme(scheme):
"""
Register a custom scheme to behave http like. This is needed to parse
all things properly with urlparse.
:param scheme: the scheme to register
"""
for method in [s for s in dir(up) if s.startswith('uses_')]:
if scheme not in getattr(up, method):
getattr(up, method).append(scheme)
[docs] @staticmethod
def create_ioplugin_tool(tools, interfaces):
"""
Create the tools which handles sinks and sources. The command of this tool is the main of core.ioplugin.
"""
if ('fastr/Source:1.0', '1.0') not in tools:
source_tool = Tool()
source_tool._id = 'Source'
source_tool.name = 'Source'
source_tool.version = Version('1.0')
source_tool.namespace = 'fastr'
source_tool.node_class = 'SourceNode'
source_tool.filename = __file__
source_tool.tests = []
source_tool._target = None
source_tool.command = {'authors': ['fastr devs'],
'description': 'Source Tool: tool to handle SourceNode operation.',
'targets': [{
'os': '*',
'arch': '*',
'bin': 'source.py',
'interpreter': sys.executable,
'paths': '../../utils/cmd/',
'module': None
}],
'url': 'http://www.bigr.nl/fastr',
'version': Version('1.0')}
input_params = [{'id': 'input',
'name': 'input',
'description': 'The data to be retrieved by the SourceNode',
'datatype': 'String',
'prefix': '--input',
'repeat_prefix': False,
'cardinality': '1-*',
'required': True,
'hidden': True},
{'id': 'datatype',
'name': 'datatype',
'datatype': 'String',
'prefix': '--datatype',
'cardinality': '1',
'required': True,
'hidden': True},
{'id': 'targetdir',
'name': 'targetdir',
'description': 'The location to store the result',
'datatype': 'Directory',
'prefix': '--output',
'cardinality': '1',
'required': True,
'hidden': True},
{'id': 'sample_id',
'name': 'sample_id',
'description': 'The sample id for the SourceJob that is run',
'datatype': 'String',
'prefix': '--sample_id',
'cardinality': '1',
'required': True,
'hidden': True}]
output_params = [{'id': 'output',
'name': 'The output urls in vfs scheme',
'description': '',
'datatype': 'AnyType',
'cardinality': 'unknown',
'automatic': True,
'location': '^__IOPLUGIN_OUT__=(.*)$',
'method': 'json'}]
interface = {'inputs': input_params, 'outputs': output_params}
source_tool.interface = interfaces['FastrInterface'](id_='source-interface', document=interface)
# Register source tool with the ToolManager
tools['fastr/Source:1.0', '1.0'] = source_tool
if ('fastr/Sink:1.0', '1.0') not in tools:
sink_tool = Tool()
sink_tool._id = 'Sink'
sink_tool.name = 'Sink'
sink_tool.version = Version('1.0')
sink_tool.namespace = 'fastr'
sink_tool.node_class = 'SinkNode'
sink_tool.filename = __file__
sink_tool.tests = []
sink_tool._target = None
sink_tool.command = {'authors': ['fastr devs'],
'description': 'Sink Tool: tool to handle Sink Node operation.',
'targets': [{
'os': '*',
'arch': '*',
'bin': 'sink.py',
'interpreter': sys.executable,
'paths': '../../utils/cmd/',
'module': None
}],
'url': 'http://www.bigr.nl/fastr',
'version': Version('1.0')}
input_params = [{'id': 'input',
'name': 'The url to process (can also be a list)',
'description': 'The data to be store by the SinkNode',
'datatype': 'AnyType',
'prefix': '--input',
'cardinality': '1-*',
'required': True, },
{'id': 'output',
'name': 'output',
'datatype': 'String',
'prefix': '--output',
'cardinality': '1',
'required': True,
'hidden': True},
{'id': 'datatype',
'name': 'datatype',
'datatype': 'String',
'prefix': '--datatype',
'cardinality': '1',
'required': True,
'hidden': True},
]
output_params = []
interface = {'inputs': input_params, 'outputs': output_params}
sink_tool.interface = interfaces['FastrInterface'](id_='sink-interface', document=interface)
# Register sink tool with the ToolManager
tools['fastr/Sink:1.0', '1.0'] = sink_tool