# Copyright 2011-2017 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Some helper functions that aid with NFS file sync issues.
"""
from . import config, log
import time
import hashlib
import os.path
from glob import glob
[docs]def filesynchelper_enabled():
return len(config.filesynchelper_url) > 0
if filesynchelper_enabled():
from redis import Redis
[docs]class FileSyncHelper():
_namespace = 'filesynchelper'
_redis = Redis.from_url(config.filesynchelper_url) if filesynchelper_enabled() else None
[docs] def __init__(self):
pass
[docs] def job_finished(self, jobfile):
key = self._generate_key_for_string('joblock' + str(jobfile))
self._redis.setex(key, '', 300)
[docs] def wait_for_job(self, jobfile):
key = self._generate_key_for_string('joblock' + str(jobfile))
# wait for job or timeout
timeoutafter = time.time() + 300
exists = self._redis.exists(key)
while not exists or time.time() > timeoutafter:
time.sleep(5) # wake-up every 5 seconds
exists = self._redis.exists(key)
self._redis.delete(key)
[docs] def wait_for_pickle(self, url, timeout=300):
log.debug('waiting for pickle {}'.format(url))
# wait for file or timeout
timeoutafter = time.time() + timeout
exists = self.has_file_promise(url)
while not exists or time.time() > timeoutafter:
time.sleep(5) # wake-up every 5 seconds
exists = self.has_file_promise(url)
return exists
[docs] def store(self, url, data):
key = self._generate_key_for_string(str(url))
log.debug('storing {} -> {}'.format(url, key))
self._redis.delete(key)
return self._redis.setex(
key,
data,
300
)
[docs] def load(self, url):
key = self._generate_key_for_string(str(url))
log.debug('loading {} from {}'.format(url, key))
return self._redis.get(key)
def _generate_key_for_string(self, input_string):
return self._namespace + self._generate_hash_from_string(input_string)
def _generate_hash_from_string(self, input_string):
return hashlib.sha256(input_string).hexdigest()
[docs] def make_file_promise(self, url):
# ttl of 86400 seconds is 1 day
# we don't really care about the value if suburls == None, thus an empty string is stored
key = self._generate_key_for_string(url)
log.debug('making file promise {} -> {}'.format(url, key))
if os.path.isdir(url):
dir = url
elif url.startswith('vfs://') and os.path.isdir(fastr.vfs.url_to_path(url)):
dir = fastr.vfs.url_to_path(url)
else:
dir = None
if dir is not None:
val = ','.join([self._generate_hash_from_string(str(suburl)) for suburl in self._glob_dir(dir)])
else:
val = ''
self._redis.setex(
key,
val,
300
)
[docs] def has_file_promise(self, url):
# check if key exists, if it does we have a file promise
result = self._redis.exists(self._generate_key_for_string(str(url)))
log.debug('has_file_promise {} {} {}'.format(url, result, self._generate_key_for_string(str(url))))
return result
[docs] def wait_for_vfs_url(self, vfs_url, timeout=300):
log.debug('wait_for_vfs_url {}'.format(vfs_url))
suburls = self._get_suburl_hashes(vfs_url)
return self._wait_for_file_and_suburls(fastr.vfs.url_to_path(vfs_url), suburls, timeout)
[docs] def wait_for_file(self, path, timeout=300):
log.debug('wait_for_file {}'.format(path))
suburls = self._get_suburl_hashes(path)
return self._wait_for_file_and_suburls(path, suburls, timeout)
def _get_suburl_hashes(self, path):
if self.has_file_promise(path):
suburls = self._redis.get(self._generate_key_for_string(str(path)))
if len(suburls) > 0:
return suburls.split(',')
return None
def _glob_dir(self, dir):
if not dir.endswith('/'):
dir = dir + '/'
return [y[len(dir):] for x in os.walk(dir) for y in glob(os.path.join(x[0], '*'))]
def _wait_for_file_and_suburls(self, path, suburls, timeout):
log.debug('waiting for {}'.format(path))
# wait for file or timeout
timeoutafter = time.time() + timeout
fileexists = os.path.isfile(path)
while not fileexists or time.time() > timeoutafter:
time.sleep(5) # wake-up every 5 seconds
fileexists = (os.path.isdir(path) if suburls is not None else os.path.isfile(path))
if suburls is not None:
fileexists = False
lookup = {}
while len(suburls) > 0 or time.time() > timeoutafter:
files = self._glob_dir(path)
for file in files:
if file not in lookup:
lookup[file] = self._generate_hash_from_string(str(file))
if lookup[file] in suburls:
suburls.remove(lookup[file])
time.sleep(5) # wake-up every 5 seconds
if len(suburls) == 0:
fileexists = True
return fileexists