Source code for fastr.datatypes

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The datatypes module holds all DataTypes generated by fastr and all the base
classes for these datatypes.
"""

# Empty module to be populated by the fastr.plugins.managers.datatypemanager.DataTypeManager
from abc import abstractmethod, abstractproperty
import os
import sys
import traceback
from typing import Generic, Iterable, Optional, Sequence, Tuple, Type, Union
import urllib.parse
import xml.etree.ElementTree as ElementTree

from .. import exceptions
from ..abc.baseplugin import BasePlugin, PluginState
from ..abc.basepluginmanager import BasePluginManager
from ..abc.serializable import Serializable, load
from ..core.version import Version
from ..core import vfs_plugin
from ..data import url
from ..helpers import config, log
from ..helpers.checksum import md5_checksum, hashsum
from ..helpers.classproperty import classproperty


# Placeholder until the actually datatype manager is running
types: 'DataTypeManager'


# Basic functions for pickle how to find DataTypes that are pickled
# these are referenced from __reduce_ex__
def _get_type(id_: str) -> 'BaseDataType':
    obj = types[id_]()
    return obj


def _get_enum(id_: str, options: Iterable[str]) -> 'EnumType':
    enum_type = types.create_enumtype(type_id=id_, options=options)
    return enum_type()


[docs]class BaseDataType(BasePlugin): """ The base class for all datatypes in the fastr type system. """ filename: str = __file__ #: Version of the DataType definition version: Version = Version('1.0') #: Description of the DataType description: str = '' #: Extension related to the Type extension = None # DataTypes do not need to be loaded, so they are always Loaded _status = (PluginState.loaded, 'DataTypes are always loaded', '')
[docs] @abstractmethod def __init__(self, value=None, format_: str=None): """ The BaseDataType constructor. :param value: value to assign to the new BaseDataType object :param format_: the format used for the ValueType :return: new BaseDataType object :raises FastrNotImplementedError: if *id*, *name*, *version* or *description* is None """ super(BaseDataType, self).__init__() self._value = None self.value = value self.format = format_
[docs] def __repr__(self) -> str: """ Returns string representation of the BaseDataType :return: string represenation :rtype: str """ return "<{}: {}>".format(self.id, repr(self.value))
[docs] def __str__(self) -> str: """ Returns the string version of the BaseDataType :return: string version :rtype: str """ return str(self.value)
[docs] def __eq__(self, other: 'BaseDataType') -> bool: """ Test the equality of two DataType objects :parameter DataType other: the object to compare against :return: flag indicating equality :rtype: bool """ if not isinstance(self, type(other)): return NotImplemented return self.value == other.value
[docs] def __ne__(self, other: 'BaseDataType') -> bool: """ Test if two objects are not equal. This is by default done by negating the __eq__ operator :parameter DataType other: the object to compare against :return: flag indicating equality :rtype: bool """ if not isinstance(self, type(other)): return NotImplemented return not self.__eq__(other)
[docs] def __reduce_ex__(self, *args, **kwargs): return _get_type, (self.id,), self.__getstate__()
[docs] def __getstate__(self): return type(self).id, self._value, self.format
[docs] def __setstate__(self, state): if self.id != state[0]: raise exceptions.FastrValueError('Invalid state for {}, state is for type {}'.format(self.id, state[0])) self._value = state[1] self.format = state[2]
@classproperty def id(cls) -> str: """ Internal id used for the DataType """ return cls.__name__ @classproperty def dot_extension(cls): """ Extension(s) with a prefixed dot """ if not cls.extension: return if isinstance(cls.extension, str): return '.{}'.format(cls.extension) else: # Must be tuple/list return tuple('.{}'.format(x) for x in cls.extension) @classproperty def fullid(cls): """ The full fastr id of the DataType """ return '{}/{}'.format(cls.parent.fullid, cls.id) @classproperty def name(cls): """ Display friendly name of the DataType """ return cls.id @property def value(self): """ The value of object instantiation of this DataType. """ return self._value @property def raw_value(self): """ The raw value of object instantiation of this DataType. For datatypes that override value (like Deferred) this is the way to access the _value field. """ return self._value @value.setter def value(self, value): """ Setter function for value property """ if isinstance(value, BaseDataType): if self.isinstance(value): self._value = value.value self.format = value.format else: raise exceptions.FastrTypeError('Cannot create a {} based on a {} (non-matching datatypes)'.format( type(self).id, type(value).id )) else: self._value = value
[docs] @classproperty def parent(self): """ The parent container of the DataType """ return types
@property def parsed_value(self): """ The parsed value of object instantiation of this DataType. """ return self._value
[docs] @classmethod def test(cls): """ Define the test for the BasePluginManager. Make sure we are not one of the base classes """ if cls in [BaseDataType, DataType, TypeGroup, EnumType, ValueType, URLType]: raise exceptions.FastrTypeError('Cannot use a basic datatype {} as a plugin!'.format(cls))
@property def valid(self): """ A boolean flag that indicates weather or not the value assigned to this DataType is valid. This property is generally overwritten by implementation of specific DataTypes. """ # Avoid errors in the validation testing (this is external code) # so we use a broad except on purpose # pylint: disable=broad-except try: return self._validate() except Exception: exc_type, _, _ = sys.exc_info() exc_info = traceback.format_exc() log.warning('Could not validate {}: encountered exception ({}) during execution:\n{}'.format(repr(self), exc_type.__name__, exc_info)) raise def _validate(self): """ The actual validation function to be overwritten by subclasses. """ # This function doesn't use self, but is intended to be potentially # overwritten by the subclasses. # pyline: disbale=no-self-use return True
[docs] def checksum(self): """ Generate a checksum for the value of this DataType :return: the checksum of the value :rtype: str """ return hashsum(self.value)
[docs] @classmethod def isinstance(cls, value): """ Indicate whether value is an instance for this DataType. :return: the flag indicating the value is of this DataType :rtype: bool """ return isinstance(value, cls)
[docs]class DataType(BaseDataType, Serializable): """ This class is the base class for all DataTypes that can hold a value. """
[docs] @abstractmethod def __init__(self, value=None, format_=None): """ The DataType constructor. :param value: value to assign to the new DataType object :param format: the format used for the ValueType :return: new DataType object """ super(DataType, self).__init__(value, format_)
[docs] def serialize(self) -> dict: """ Method that returns a dict structure with the datatype the object. :returns: serialized representation of object """ return { 'id': self.id, 'format': self.format, 'value': self._value, }
[docs] @classmethod def deserialize(cls, doc: dict, _=None) -> 'DataType': """ Classmethod that returns an object constructed based on the str/dict (or OrderedDict) representing the object :param doc: the state of the object to create :return: newly created object (of datatype indicated by the doc) """ datatype = types[doc['id']] obj = datatype(value=doc['value'], format_=doc.get('format')) return obj
[docs] def action(self, name): """ This function can be overwritten by subclasses to implement certain action that should be performed. For example, the *Directory* DataType has an action *ensure*. This method makes sure the Directory exists. A Tool can indicate an action that should be called for an Output which will be called before execution. :param str name: name of the action to execute :return: None """ if name is not None: log.warning("unknown action '{}' for DataType {}".format(name, self.id))
[docs]class Missing(DataType): """ Singleton DataType to annotate missing data """ _instance = None _initialized = False value = 'MISSING'
[docs] def __new__(cls, *args, **kwargs): if not isinstance(cls._instance, cls): cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance
[docs] def __init__(self, _=None, __=None): if not self._initialized: super().__init__('MISSING', None)
[docs]class TypeGroup(BaseDataType): """ The TypeGroup is a special DataType that does not hold a value of its own but is used to group a number of DataTypes. For example ITK has a list of supported file formats that all tools build on ITK support. A group can be used to conveniently specify this in multiple Tools that use the same set DataTypes. """ _member_types = None _preference = None
[docs] def __new__(cls, value=None, format_=None): """ Instantiate a TypeGroup. This will for match the value to the best matching type and instantiate that. Not that the returned object will not be of type TypeGroup but one of the TypeGroup members. """ # Avoid casting values that are already in a member type if any(isinstance(value, x) for x in cls.members): return value matching_type = types.guess_type(value, options=cls) # Only continue if we have value urls if isinstance(value, str) and value.startswith('val://'): if matching_type is None: # Just try a simple match rather than guessing matching_type = types.match_types(cls, type(value)) if matching_type is None: # Just try a simple match rather than guessing matching_type = types.match_types(cls) if matching_type is None: raise exceptions.FastrValueError('Cannot matching value {} [{}] to any of {}'.format(value, type(value).__name__, cls.members)) return matching_type(value, format_)
[docs] def __init__(self, value=None): """ Dummy constructor. TypeGroups are not instantiable and cannot hold a value of its own. :raises FastrDataTypeNotInstantiableError: if called """ # All type groups are per definition not instantiable # pylint: disable=super-init-not-called raise exceptions.FastrDataTypeNotInstantiableError('TypeGroups are not instantiable')
@property @abstractmethod def _members(self): """ The id of the members, this should be set in the subclass. It should be a frozen set of str. """ @classmethod def _member_list(cls): members = tuple(types[x] for x in cls._members if x in types) members = tuple(x.members if issubclass(x, TypeGroup) else (x,) for x in members) members = tuple(x for y in members for x in y) return members @classproperty def members(cls): """ The members of the TypeGroup """ if cls._member_types is None: cls._member_types = frozenset(cls._member_list()) return cls._member_types @classproperty def preference(cls): if cls._preference is None: if hasattr(cls, "preferred_types"): cls._preference = tuple(types[x] for x in cls.preferred_types) else: preference = [x.preference if issubclass(x, TypeGroup) else (x,) for x in cls._member_list()] cls._preference = tuple(x for y in preference for x in y) return cls._preference
[docs] @classmethod def isinstance(cls, value): return any(x.isinstance(value) for x in cls.members)
[docs]class AnyType(TypeGroup): """ Special Datatype in fastr that is a TypeGroup with all known DataTypes as its members. """ @classproperty def _members(cls): """ A "class-poperty" that gives a list of the ids of all currently loaded DataTypes """ # During object construction this is executed and types does not yet exist try: types_list = types except NameError: types_list = [] return frozenset(x.id for x in types_list.values() if issubclass(x, DataType)) @classproperty def description(cls): """ The description of the AnyType, including the list of member types. """ disp_members = [' - {}'.format(member) for member in cls.members] return """ TypeGroup {id} {name} ({id}) is a group of consisting of all DataTypes known by fastr, currently: {members} """.strip().format(id='AnyType', name='AnyType', members='\n'.join(disp_members))
[docs]class AnyFile(TypeGroup): """ Special Datatype in fastr that is a TypeGroup with all known DataTypes as its members. """ @classproperty def _members(cls): """ A "class-poperty" that gives a list of the ids of all currently loaded DataTypes """ # During object construction this is executed and types does not yet exist try: types_list = types except NameError: types_list = [] return frozenset(x.id for x in types_list.values() if issubclass(x, URLType)) @classproperty def description(cls): """ The description of the AnyType, including the list of member types. """ disp_members = [' - {}'.format(member) for member in cls.members] return """ TypeGroup {id} {name} ({id}) is a group of consisting of all URLTypes known by fastr, currently: {members} """.strip().format(id='AnyFile', name='AnyFile', members='\n'.join(disp_members))
[docs]class EnumType(DataType): """ The EnumType is the base for DataTypes that can have a value which is an option from a predefined set of possibilities (similar to an enum type in many programming languages). """ #: Enums always have version 1.0 version = Version('1.0') _options = frozenset()
[docs] def __init__(self, value=None, format_=None): """ The EnumType constructor. :param value: value to assign to the new EnumType object :param format: the format used for the ValueType :return: new EnumType object :raises FastrDataTypeNotInstantiableError: if not subclassed """ super(EnumType, self).__init__(value, format_) if self.__class__ is EnumType: raise exceptions.FastrDataTypeNotInstantiableError('EnumType is not instantiable')
[docs] def __reduce_ex__(self, *args, **kwargs): return _get_enum, (self.id, self.options), self.__getstate__()
@classproperty def description(cls): """ The description of the AnyType, including the list of member types. """ disp_options = [' - "{}"'.format(option) for option in cls.options] return """ {name} ({id}) is a enumerate type with options: {options} {name} can take the value of any of the option, but any other value is considered invalid. """.strip().format(id=cls.id, name=cls.name, options='\n'.join(disp_options)) @classproperty def options(cls): """ A frozenset holding the options that the value of the EnumType object can have. :return: the options the value can hold :rtype: frozenset """ return cls._options def _validate(self): return self._value in self._options
[docs]class ValueType(DataType): """ The ValueType is the base for DataTypes that hold simple values (not an EnumType and not a file/URL). The values is generally represented by a string. """
[docs] def __init__(self, value=None, format_=None): """ The ValueType constructor :param value: value to assign to the new ValueType :param format: the format used for the ValueType :return: new ValueType object """ super(ValueType, self).__init__(value, format_)
[docs]class URLType(DataType): """ The URLType is the base for DataTypes that point to a resource somewhere else (typically a filesystem). The true value is actually the resource referenced by the value in this object. """
[docs] def __init__(self, value=None, format_=None): """ The URLType constructor :param value: value to assign to the new URLType :param format: the format used for the ValueType :return: new URLType object """ super(URLType, self).__init__(value, format_)
[docs] def __eq__(self, other): """ Test the equality of two DataType objects :parameter URLType other: the object to compare against :return: flag indicating equality :rtype: bool """ if not isinstance(self, type(other)): return NotImplemented return self.checksum() == other.checksum()
[docs] def checksum(self): """ Return the checksum of this URL type :return: checksum string :rtype: str """ contents = self.content(self.parsed_value) return md5_checksum(contents)
@property def parsed_value(self): """ The parsed value of object instantiation of this DataType. """ if url.isurl(self.value): parsed_url = urllib.parse.urlparse(self.value) if parsed_url.scheme == 'vfs': return vfs_plugin.url_to_path(self.value) else: raise exceptions.FastrValueError('Cannot get parsed value for non-vfs url: {} (scheme {})'.format(self.value, parsed_url.scheme)) else: return self.value
[docs] @classmethod def content(cls, inval, outval=None): """ Give the contents of a URLType, this is generally useful for filetypes that consists of multiple files (e.g. AnalyzeImageFile, DICOM). The value will indicate the main file, and the contents function can determine all files that form a single data value. :param inval: a value to figure out contents for this type :param outval: the place where the copy should point to :return: a list of all files part of the value (e.g. header and data file) :rtype: list """ if outval is not None: return [(inval, outval)] else: return [inval]
@property def valid(self): """ A boolean flag that indicates weather or not the value assigned to this DataType is valid. This property is generally overwritten by implementation of specific DataTypes. """ if not isinstance(self.value, str): return False return super(URLType, self).valid def _validate(self): """ The actual validation function to be overwritten by subclasses. """ if self.extension and not self.parsed_value.endswith(self.dot_extension): return False try: contents = self.content(self.parsed_value) for content in contents: if not os.path.exists(content): return False return True except (TypeError, ValueError, IOError): return False
[docs]class Deferred(DataType):
[docs] def __init__(self, value=None, format_=None): """ The Deferred constructor. :param value: value to assign to the new DataType object :param format: This is ignore but here for compatibility :return: new Deferred object """ self._value = value self._data = None self.format = format_
[docs] def __repr__(self): """ Returns string representation of the BaseDataType :return: string represenation :rtype: str """ if self.target is None: return "<{}: {}>".format(self.id, repr(self._value)) else: return "<{}: {}>".format(self.id, repr(self.target))
[docs] def __getstate__(self): return ('Deferred', self._value)
[docs] def __setstate__(self, state): if self.id != state[0]: raise exceptions.FastrValueError('Unvalid state for {}, state is for type {}'.format(self.id, state[0])) self._value = state[1]
[docs] @classmethod def lookup(cls, value): """ Look up the deferred target and return that object :param: value :return: The value the deferred points to :rtype: DataType :raises FastrKeyError: if the deferred is not available (yet) :raises FastrValueError: if the value is not a valid deferrred url """ parsed_url = urllib.parse.urlparse(value) if parsed_url.scheme == 'val': datafile = os.path.join(config.mounts[parsed_url.netloc], os.path.normpath(parsed_url.path[1:])) query = urllib.parse.parse_qs(parsed_url.query) # Open Job file data = load(datafile) # Attempt to extract data try: outputname = query['outputname'][0] cardinality_nr = int(query['nr'][0]) if 'sampleid' in query: sample_id = query['sampleid'][0] value = data.output_data[outputname][sample_id][cardinality_nr] else: value = data.output_data[outputname][cardinality_nr] except (IndexError, KeyError) as exception: log.debug('Output data for query: {}'.format(data.output_data)) message = 'Could not get value from {}, encountered {}: {}'.format(value, type(exception).__name__, exception.args[0]) raise exceptions.FastrKeyError(message) else: raise exceptions.FastrValueError('Cannot lookup value {}, wrong url scheme'.format(value)) if isinstance(value, Deferred): value = value.target return value
@property def target(self): """ Target object for this deferred. :raises FastrKeyError: if the deferred is not available (yet) :raises FastrValueError: if the value is not a valid deferrred url """ if self._data is None: try: self._data = self.lookup(self._value) except (exceptions.FastrKeyError, exceptions.FastrValueError, exceptions.FastrFileNotFound): pass return self._data @property def value(self): """ The value of object instantiation of this DataType. """ try: target = self.target except exceptions.FastrKeyError: return None if target is None: return None return target.value @property def parsed_value(self): """ The value of object instantiation of this DataType. """ try: target = self.target except exceptions.FastrKeyError: return None if target is None: return None return target.parsed_value @property def provenance(self): return self._get_data()[1] @property def job(self): try: return self._get_data()[2] except exceptions.FastrValueError: return None def _validate(self): """ The actual validation function to be overwritten by subclasses. """ # This function doesn't use self, but is intended to be potentially # overwritten by the subclasses. # pyline: disbale=no-self-use target = self.target if target is None: return False return target.valid
[docs] def checksum(self): """ Generate a checksum for the value of this DataType :return: the checksum of the value :rtype: str """ target = self.target if target is None: return hashsum('__FASTR_NOT_AVAILABLE_HASH__') return target.checksum()
[docs]def fastr_isinstance(obj, datatype): """ Check if an object is of a specific datatype. :param obj: Object to inspect :param datatype: The datatype(s) to check :type datatype: tuple, BaseDataType :return: flag indicating object is of datatype :rtype: bool """ if not isinstance(datatype, tuple): datatype = datatype, for dtype in datatype: if issubclass(dtype, TypeGroup): if type(obj) in dtype.members: return True elif isinstance(obj, datatype): return True return False
DataTypeClass = Type[BaseDataType]
[docs]class DataTypeManager(BasePluginManager[DataTypeClass]): """ The DataTypeManager hold a mapping of all DataTypes in the fast system and can create new DataTypes from files/data structures. """
[docs] def __init__(self): """ The DataTypeManager constructor will create a new DataTypeManager and populate it with all DataTypes it can find in the paths set in ``config.types_path``. :return: the created DataTypeManager """ self.types_map = {} super(DataTypeManager, self).__init__(config.types_path)
@property def preferred_types(self): return [self.data[t] for t in config.preferred_types if t in self.data] @property def fullid(self): """ The fullid of the datatype manager """ return 'fastr://types' @property def plugin_class(self): """ The PluginClass of the items of the BasePluginManager """ return BaseDataType # Allow key to be a id string or DataType
[docs] def __keytransform__(self, key): """ Key transformation for this mapping. The key transformation allows indexing by both the DataType name as well as the DataType it self. :param key: The name of the requested datatype or the datatype itself :type key: fastr.datatypes.BaseDataType or str :return: The requested datatype """ if self.isdatatype(key): if key.name in self.data and self.data[key.name] is key: return key.name else: raise exceptions.FastrDataTypeMismatchError('key DataType {} not {}'.format(key.name, type(self).__name__)) else: return key
[docs] def populate(self): """ Populate Manager. After scanning for DataTypes, create the AnyType and set the preferred types """ super(DataTypeManager, self).populate() # Add the any type self['AnyType'] = AnyType self['AnyFile'] = AnyFile self['Deferred'] = Deferred self['Missing'] = Missing
@property def _instantiate(self): """ Flag indicating that the plugin should NOT be instantiated prior to saving """ return False def _print_key(self, key): if key.startswith('__') and key.endswith('__'): return None return key
[docs] def has_type(self, name): """ Check if the datatype with requested name exists :param str name: the name of the requested datatype :return: flag indicating if the datatype exists :rtype: bool """ return name in self.data
[docs] def poll_datatype(self, filename): """ Poll an xml file to see if there is a definition of a datatype in it. :param str filename: path of the file to poll :return: tuple with (id, version, basetype) if a datatype is found or (None, None, None) if no datatype is found """ if os.path.exists(filename): tree = ElementTree.parse(filename) root = tree.getroot() if root.tag not in ('type', 'typegroup'): message = 'Invalid root tag ({}) in file!'.format(root.tag) log.warning(message) return (None, None, None) id_ = root.get('id') version = Version(root.get('version')) return (id_, version, root.tag) else: message = '{} not a valid filename'.format(filename) log.warning(message) return (None, None, None)
[docs] def get_type(self, name: str) -> DataTypeClass: """Read a type given a typename. This will scan all directories in types_path and attempt to load the newest version of the DataType. :param str name: Name of the datatype that should be imported in the system :return: the datatype with the requested name, or None if datatype is not found .. note:: If type is already in TypeManager it will not load anything and return the already loaded version. """ log.debug('Attemping to get datatype {}'.format(name)) if name in self: return self[name] latest_version = Version('0.0') latest_filename = '' for path in config.types_path: filename = os.path.join(path, name + '.xml') if os.path.exists(filename): (pollname, version, _) = self.poll_datatype(filename) if pollname == name and version > latest_version: latest_version = version latest_filename = filename if latest_filename == '': message = 'Could not find type with name {}'.format(name) log.error(message) return None log.debug('Found {} (version {}) in {}'.format(name, latest_version, latest_filename)) self._load_item(latest_filename) return self[name]
def _store_item(self, name, value): """ Store an item in the BaseManager, will ignore the item if the key is already present in the BaseManager. :param name: the key of the item to save :param value: the value of the item to save :return: None """ super(DataTypeManager, self)._store_item(name, value) if value.id is not None: value.__module__ = 'fastr.datatypes' setattr(types, value.id, value)
[docs] def create_enumtype(self, type_id: str, options: Iterable[str], name: str=None) -> Type[EnumType]: """ Create a python class based on an XML file. This function return a completely functional python class based on the contents of a DataType XML file. Such a class will be of type EnumType. :param str type_id: the id of the new class :param iterable options: an iterable of options, each option should be str :return: the newly created subclass of EnumType :raises FastrTypeError: if the options is not an iterable of str """ if type_id in self: if self[type_id].options != set(options): raise exceptions.FastrDataTypeMismatchError('Conflicting definition of Enum {}!' ' (options {} vs {})'.format(type_id, self[type_id].options, options)) log.debug('Returning existing DataType {}!'.format(type_id)) return self[type_id] attributes = {} try: if isinstance(options, str): options = (options,) attributes['_options'] = frozenset(options) except TypeError: message = 'options must be a iterable containing the valid options for the Enum, found options {}'.format(options) log.error(message) raise exceptions.FastrTypeError(message) if not all(isinstance(x, str) for x in attributes['_options']): message = 'all options for an Enum must be of type str, found options {}'.format(options) log.error(message) raise exceptions.FastrTypeError(message) attributes['parent'] = self attributes['_sourcepath'] = None attributes['_hash'] = hashsum([type_id, name, options]) attributes['__module__'] = 'fastr.datatypes' supertypes = (EnumType,) log.debug('Creating EnumType {} from script'.format(type_id)) out: Type[EnumType] = type(type_id, supertypes, attributes) self[type_id] = out setattr(types, out.id, out) return out
[docs] def guess_type(self, value: str, exists: Optional[bool] = True, options: Optional[Union[DataTypeClass, Tuple[DataTypeClass]]] = None, preferred: Optional[Sequence[DataTypeClass]] = None) -> Optional[DataTypeClass]: """ Guess the DataType based on a value str. :param str value: the value to guess the type for :param options: The options that are allowed to be guessed from :type options: TypeGroup, DataType or tuple of DataTypes :param bool exists: Indicate the value exists (if file) and can be checked for validity, if false skip validity check :param iterable preferred: An iterable of preferred types in case multiple types match. :return: The resulting DataType or None if no match was found :raises FastrTypeError: if the options argument is of the wrong type The function will first create a list of all candidate DataTypes. Subsequently, it will check for each candidate if the value would valid. If there are multiple matches, the config value for preferred types is consulted to break the ties. If non of the DataTypes are in the preferred types list, a somewhat random DataType will be picked as the most optimal result. """ extra_preferred = None log.debug('Guesstype value: {}, options: {}, preferred: {}'.format(value, options, preferred)) if options is None: options = {x for x in self.values() if issubclass(x, DataType)} elif issubclass(options, TypeGroup): extra_preferred = options.preference options = set(options.members) elif issubclass(options, DataType): options = {options} elif isinstance(options, tuple): options = set(options) else: raise exceptions.FastrTypeError('Invalid type for options ({})'.format(options)) log.debug('Guesstype options: {}'.format(options)) candidates = set() scheme = None if url.isurl(value): scheme = url.get_url_scheme(value) if scheme not in ['vfs', 'val']: log.warning('Cannot determine DataType based on URL with scheme {}'.format(url.get_url_scheme(value))) return None for option in options: if not issubclass(option, DataType): continue elif issubclass(option, Deferred): continue # We never want to find a deferred (it is not a valid type for an instance) elif option.dot_extension is None or (isinstance(value, str) and value.endswith(option.dot_extension)): candidates.add(option) log.debug('Guesstype candidates: {}'.format(candidates)) if len(candidates) == 0: log.debug('No valid combinations of options and candidates!') return None if len(candidates) != 1 and exists: # Test validity of value for each DataType final_candidates = [] for candidate in candidates: temp = candidate(value) if temp.valid: final_candidates.append(candidate) else: final_candidates = list(candidates) # Remove types in these order in case of mutliple matches, that means # the Int has precidence over the Boolean, Float and String in case # of a tie types_to_remove = ['String', 'Float', 'Boolean', 'Int', 'UnsignedInt'] log.debug('Final candidates: {}'.format(final_candidates)) if len(final_candidates) > 1: for type_to_remove in types_to_remove: if types[type_to_remove] in final_candidates: final_candidates.remove(types[type_to_remove]) if len(final_candidates) == 1: break if len(final_candidates) == 0: return None elif len(final_candidates) == 1: log.debug('Matched a single type: {}'.format(final_candidates[0])) return final_candidates[0] else: log.debug('Multiple DataTypes match, trying to find a preferred match! Remaining candidates: {}'.format(final_candidates)) # Get preferred types from argument list if preferred is not None: for type_ in preferred: if type_ in final_candidates: log.info('Found preferred match (from keyword): {}'.format(type_)) return type_ # Get preferred types from argument list if extra_preferred is not None: for type_ in extra_preferred: if type_ in final_candidates: log.info('Found preferred match (from keyword): {}'.format(type_)) return type_ # Get preferred information from the config for type_ in self.preferred_types: if type_ in final_candidates: log.debug('Found preferred match: {}'.format(type_)) return type_ # Fall back to possible typegroup preferred types log.debug('Mutliple matches, removing matches without extension') backup_candidate = final_candidates[0] final_candidates = [x for x in final_candidates if x.extension is not None] if len(final_candidates) == 1: return final_candidates[0] else: if len(final_candidates) > 1: log.error('Multiple DataTypes match, but no preferred match, picking one at random! Remaining candidates: {}'.format(final_candidates)) return final_candidates[0] else: log.error('No final DataTypes match value "{}", using the backup type: {}'.format(value, backup_candidate)) return backup_candidate
[docs] def match_types(self, *args, **kwargs): """ Find the match between a list of DataTypes/TypeGroups, see :ref:`resolve-datatype` for details :param args: A list of DataType/TypeGroup objects to match :param kwargs: A 'preferred' keyword argument can be used to indicate a list of DataTypes to prefer in case of ties (first has precedence over later in list) :return: The best DataType match, or None if no match is possible. :raises FastrTypeError: if not all args are subclasses of BaseDataType """ options = self.match_types_any(*args) # Check if it is a preferred type if 'preferred' in kwargs and kwargs['preferred'] is not None: if not all([self.isdatatype(item) for item in kwargs['preferred']]): message = 'All preferred types must be DataTypes!' log.warning(message) preferred = kwargs['preferred'] else: preferred = self.preferred_types if len(options) == 0: log.warning("No matching DataType available (args {})".format(args)) return None elif len(options) == 1: # This is a perfect match, no preferences needed result = options.pop() return result else: # Get preferred information from the config for type_ in preferred: if type_ in options: return type_ # Find a single argument that is a list if len(args) == 1 and isinstance(args[0], (list, tuple)): args = args[0] # Check all typegroups in args for preferred types and use those if possible for option in args: if not issubclass(option, TypeGroup): continue for type_ in option.preference: if type_ in options: return type_ log.debug("No preferred DataType matches, (options {}, preferred {})".format(options, preferred)) return None
[docs] def match_types_any(self, *args): """ Find the match between a list of DataTypes/TypeGroups, see :ref:`resolve-datatype` for details :param args: A list of DataType/TypeGroup objects to match :return: A set with all DataTypes that match. :rtype: set :raises FastrTypeError: if not all args are subclasses of BaseDataType """ # Find a single argument that is a list if len(args) == 1 and isinstance(args[0], (list, tuple)): args = args[0] # Remove typeless str args = tuple(arg for arg in args if arg != str) if not all([self.isdatatype(item) for item in args]): message = 'All arguments must be DataTypes! (Found {})'.format(args) log.error(message) raise exceptions.FastrTypeError(message) # In case there are no args if len(args) == 0: log.debug("No DataTypes given to match") return None # Create an initial options set (make sure to copy and not reference the set!) if issubclass(args[0], TypeGroup): options = set(args[0].members) else: options = {args[0]} # Find intersection of all arguments for datatype in args[1:]: if isinstance(datatype, DataType): datatype = type(datatype) if issubclass(datatype, TypeGroup): options &= datatype.members else: options &= {datatype} return options
[docs] @staticmethod def isdatatype(item): """ Check if item is a valid datatype for the fastr system. :param item: item to check :return: flag indicating if the item is a fastr datatype :rtype: bool """ return isinstance(item, type) and issubclass(item, BaseDataType)
types = DataTypeManager()