Source code for fastr.utils.verify

import os
import gzip
import shutil
from tempfile import mkdtemp

import fastr
from ..abc.serializable import Serializable, ReadWriteHandler
from ..core.tool import Tool
from ..helpers import iohelpers, config
from .. import exceptions


[docs]def verify_resource_loading(filename: str, log=fastr.log): """ Verify that a resource file can be loaded. Returns loaded object. :param filename: path of the object to load :param log: the logger to use to send messages to :return: loaded resource """ name, ext = os.path.splitext(filename) # Check if file is gzipped if ext == '.gz': compressed = True name, ext = os.path.splitext(filename) else: compressed = False # Read file data log.info('Trying to read file with compression {}'.format('ON' if compressed else 'OFF')) if compressed: try: with gzip.open(filename, 'r') as file_handle: data = file_handle.read() except: log.error('Problem reading gzipped file: {}'.format(filename)) return None else: try: with open(filename, 'r') as file_handle: data = file_handle.read() except: log.error('Problem reading normal file: {}'.format(filename)) return None log.info('Read data from file successfully') # Try to read tool doc based on serializer matching the extension serializer = ext[1:] log.info('Trying to load file using serializer "{}"'.format(serializer)) try: serializer = ReadWriteHandler.get_handler(serializer) except KeyError: log.error('No matching serializer found for "{}"'.format(serializer)) return None load_func = serializer.loads try: doc = load_func(data) except Exception as exception: log.error('Could not load data using serializer "{}", encountered exception: {}'.format(serializer, exception)) return None return doc
[docs]def verify_tool_schema(doc, log=fastr.log): """ Verify the tool schema. Returns checked loaded object. :param doc: loaded object to check :param log: the logger to use to send messages to :return: object with checked schema """ # Match the data to the schema for Tools log.info('Validating data against Tool schema') serializer = Tool.get_serializer() try: doc = serializer.instantiate(doc) except Exception as exception: log.error('Encountered a problem when verifying the Tool schema: {}'.format(exception)) return None return doc
[docs]def verify_tool_instantiate(doc, filename, log=fastr.log): """ Verify the tool schema. Returns checked loaded object. :param doc: loaded object :param filename: filename of the tool definition :param log: the logger to use to send messages to :return: Tool object """ # Create the Tool object as the final test log.info(f'Instantiating Tool object') try: tool = Tool(doc) tool.filename = filename except Exception as exception: log.error('Encountered a problem when creating the Tool object: {}'.format(exception)) return tool
[docs]def verify_tool(filename, log=fastr.log, perform_tests=True): """ Verify that a tool correctly works. Returns Tool. :param filename: filename of the tool definition :param log: the logger to use to send messages to :param perform_test: Boolean to :return: Tool object """ # Load the file doc = verify_resource_loading(filename, log) if not doc: log.error('Could not load data successfully from {}'.format(filename)) return None # Match the data to the schema for Tools doc = verify_tool_schema(doc, log) # Create the Tool object as the final test tool = verify_tool_instantiate(doc, filename, log) if perform_tests: log.info('Testing tool...') try: tool.test() except fastr.exceptions.FastrValueError as e: log.error('Tool is not valid: {}'.format(e)) return tool
[docs]def create_tool_test(filename, log=fastr.log): """ Create test for fastr verify tool. By running `fastr verify -c tool FILENAME` the input data in the folders under 'tests' in the tool definition is processed by the tool. The output data is written to a folder in each test folder. In each test folder a gzipped pickle is created which is used to verify the working of the tool at a later time. :param filename: filename of the tool definition :param log: the logger to use to send messages to """ # Load the file doc = verify_resource_loading(filename, log) if not doc: log.error('Could not load data successfully from {}'.format(filename)) return None doc = verify_tool_schema(doc, log) tool = verify_tool_instantiate(doc, filename, log) log.info('Loaded tool {} successfully'.format(tool)) tool_dir = os.path.dirname(tool.filename) for test in tool.tests: reference_data_dir = os.path.abspath(os.path.join(tool_dir, test)) try: if not isinstance(reference_data_dir, str): raise exceptions.FastrTypeError('reference_data_dir should be a string!') if reference_data_dir.startswith('vfs://'): reference_data_dir = vfs_plugin.url_to_path(reference_data_dir) if not os.path.isdir(reference_data_dir): raise exceptions.FastrTypeError('The reference_data_dir should be pointing to an existing directory!' ' {} does not exist'.format(reference_data_dir)) test_data = iohelpers.load_json( os.path.join(reference_data_dir, tool.TOOL_REFERENCE_FILE_NAME) ) input_data = {} for key, value in test_data['input_data'].items(): if not isinstance(value, (tuple, list)): value = value, # Set the $REFDIR correctly (the avoid problems with moving the reference dir) value = tuple(x.replace('$REFDIR', reference_data_dir) if isinstance(x, str) else x for x in value) input_data[key] = value temp_results_dir = None try: # Create temporary output directory temp_results_dir = os.path.normpath(mkdtemp( prefix='fastr_tool_test_{}_'.format(tool.id), dir=config.mounts['tmp'] )) # Create a new reference for comparison log.info('Creating new reference data for comparison...') try: if not os.path.exists(os.path.join(reference_data_dir, tool.TOOL_RESULT_FILE_NAME)): # Copy original __fastr_tool_ref__.json, # so it doesn't get overwritten. Afterwards move it back. shutil.copy(os.path.join(reference_data_dir, tool.TOOL_REFERENCE_FILE_NAME), os.path.join(reference_data_dir, ''.join([tool.TOOL_REFERENCE_FILE_NAME, 'bak']))) tool.create_reference(input_data, reference_data_dir, mount_name='__ref_tmp__', copy_input=False, input_datatypes=test_data['input_datatypes']) log.info( 'Reference result for testing the {}/{} created in {}.'.format( tool.ns_id, tool.command_version, reference_data_dir )) shutil.copy(os.path.join(reference_data_dir, tool.TOOL_REFERENCE_FILE_NAME), os.path.join(reference_data_dir, ''.join(['__output', tool.TOOL_REFERENCE_FILE_NAME]))) shutil.move(os.path.join(reference_data_dir, ''.join([tool.TOOL_REFERENCE_FILE_NAME, 'bak'])), os.path.join(reference_data_dir, tool.TOOL_REFERENCE_FILE_NAME)) else: log.warning( 'Reference result for testing the {}/{} tool already exists in {}!'.format( tool.ns_id, tool.command_version, reference_data_dir )) except Exception as exception: log.warning('Encountered exception when trying to run the {}/{} tool!'.format( tool.ns_id, tool.command_version) ) log.warning('Exception: [{}] {}'.format(type(exception).__name__, exception)) finally: # Clean up log.info('Removing temp result directory {}'.format(temp_results_dir)) if temp_results_dir is not None and os.path.isdir(temp_results_dir): shutil.rmtree(temp_results_dir, ignore_errors=True) except exceptions.FastrTypeError: message = 'Reference data in {} is not valid!'.format(reference_data_dir) log.warning(message) return