Source code for strongrexecution

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import base64
import subprocess
import threading
import time

import fastr
import fastr.resources
from fastr.plugins.executionplugin import ExecutionPlugin
from fastr.helpers.classproperty import classproperty


class StrongrExecution(ExecutionPlugin):
    _queue = []
    _mappings = {}

[docs] def __init__(self, finished_callback=None, cancelled_callback=None): super(StrongrExecution, self).__init__(finished_callback, cancelled_callback) fastr.log.debug('Creating strongr job collector') self.collector = threading.Thread(name='StrongrJobCollector-0', target=self.check_finished, args=()) self.collector.daemon = True fastr.log.debug('Starting strongr job collector') self.collector.start()
[docs] def cleanup(self): # Stop thread super(StrongrExecution, self).cleanup()
[docs] @classmethod def test(cls): pass
@classproperty def configuration_fields(cls): return {} def _job_finished(self, result): pass def _cancel_job(self, job): pass def _queue_job(self, job): cmd = ['/opt/strongr/addtask', '\'{}\''.format( base64.b64encode('/bin/bash -c "{} {} {} {}"'.format( '', 'python', '`python -c \'from fastr.execution import executionscript; print(executionscript.__file__)\'`', str(job.commandfile) ))), '1', '1'] print(cmd) with open(job.stdoutfile, 'a') as fh_stdout, open(job.stderrfile, 'a') as fh_stderr: taskinfo = subprocess.check_output(cmd, stderr=fh_stderr) taskid = json.loads(taskinfo)['job_id'] self._queue.append(taskid) self._mappings[taskid] = job
[docs] def check_finished(self): while self.running: sout = subprocess.check_output('/opt/strongr/queryqueue') print(sout) queueinfo = json.loads(sout) if queueinfo == None: time.sleep(5.0) continue finished = [t for t in self._queue if t not in queueinfo] self._queue = [t for t in self._queue if t in queueinfo] fastr.log.info('# FINISHED: {}'.format(finished)) fastr.log.info('# QUEUE: {}'.format(self._queue)) for taskid in finished: fastr.log.info('## TASK ID: {}'.format(taskid)) self.job_finished(self._mappings[taskid]) time.sleep(1.0)