Source code for fastr.helpers.checksum

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module contains a number of functions for checksumming files and objects
"""
import hashlib
import _hashlib
import io
import os
import pickle
from fastr import exceptions


[docs]def checksum(filepath, algorithm='md5', hasher=None, chunksize=32768): """ Generate the checksum of a file :param filepath: path of the file(s) to checksum :type filepath: str, list :param str algorithm: the algorithm to use :param _hashlib.HASH hasher: a hasher to continue updating (rather then creating a new one) :return: the checksum :rtype: str """ if isinstance(filepath, str): filepath = [filepath] if hasher is None: hasher = hashlib.new(algorithm) for path in filepath: if os.path.isdir(path): checksum_directory(path, hasher=hasher) else: _checksum_file(path, hasher=hasher, chunksize=chunksize) return hasher.hexdigest()
def _checksum_file(path: str, hasher: _hashlib.HASH, chunksize: int=32768): """ Hash a file to a hasher :param path: :param hasher: """ with open(path, 'rb') as file_handle: data = file_handle.read(chunksize) while data: hasher.update(data) data = file_handle.read(chunksize)
[docs]def checksum_directory(directory, algorithm='md5', hasher=None): """ Generate the checksum of an entire directory :param str directory: path of the file(s) to checksum :param str algorithm: the algorithm to use :param _hashlib.HASH hasher: a hasher to continue updating (rather then creating a new one) :return: the checksum :rtype: str """ if not os.path.isdir(directory): raise exceptions.FastrValueError('{} does not point to a valid directory!'.format(directory)) if hasher is None: hasher = hashlib.new(algorithm) for root, dirs, files in os.walk(directory): relroot = os.path.relpath(root, directory) for d in dirs: hasher.update(os.path.join(relroot, d).encode()) for filename in files: hasher.update(os.path.join(relroot, filename).encode()) _checksum_file(os.path.join(root, filename), hasher) return hasher.hexdigest()
[docs]def hashsum(objects, hasher=None): """ Generate the md5 checksum of (a) python object(s) :param objects: the objects to hash :param hasher: the hasher to use as a base :return: the hash generated :rtype: str """ if hasher is None: hasher = hashlib.new('md5') if not isinstance(objects, list): objects = [objects] for obj in objects: if isinstance(obj, str): hasher.update(obj.encode()) elif isinstance(obj, (bool, int, float)): hasher.update(str(obj).encode()) elif isinstance(obj, io.TextIOBase): # Make sure to hash entire file obj.seek(0) while True: data = obj.read(32768) if not data: break hasher.update(data.encode()) elif isinstance(obj, io.IOBase): # Make sure to hash entire file obj.seek(0) while True: data = obj.read(32768) if not data: break hasher.update(data) elif isinstance(obj, list): hasher.update(b'_||LIST||_') for element in obj: hashsum(element, hasher=hasher) hasher.update(b'_|ENDELEM|_') hasher.update(b'_||ENDLIST||_') elif isinstance(obj, tuple): hasher.update(b'_||TUPLE||_') for element in obj: hashsum(element, hasher=hasher) hasher.update(b'_|ENDELEM|_') hasher.update(b'_||ENDTUPLE||_') elif isinstance(obj, dict): hasher.update(b'_||DICT||_') for key in sorted(obj.keys()): hashsum(key, hasher=hasher) hasher.update(b'_|ENDKEY|_') hashsum(obj[key], hasher=hasher) hasher.update(b'_|ENDVALUE|_') hasher.update(b'_||ENDDICT||_') elif isinstance(obj, set): hasher.update(b'_||SET||_') for element in sorted(obj): hashsum(element, hasher=hasher) hasher.update(b'_|ENDELEM|_') hasher.update(b'_||ENDSET||_') else: hasher.update(pickle.dumps(obj)) return hasher.hexdigest()
[docs]def md5_checksum(filepath): """ Generate the md5 checksum of a file :param filepath: path of the file(s) to checksum :type filepath: str, list :return: the checksum :rtype: str """ return checksum(filepath)
[docs]def sha1_checksum(filepath): """ Generate the sha1 checksum of a file :param filepath: path of the file(s) to checksum :type filepath: str, list :return: the checksum :rtype: str """ return checksum(filepath, 'sha1')