Source code for strongrexecution

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import jsonpickle
import json
import base64
import os
import subprocess
import sys
import threading
import time
import traceback

import fastr
import fastr.resources
from fastr.core.baseplugin import PluginState
from fastr.execution.executionpluginmanager import ExecutionPlugin, JobAction
from fastr.execution.job import Job, JobState
from fastr.utils.classproperty import classproperty


class StrongrExecution(ExecutionPlugin):
    """
    A execution plugin based on Redis Queue. Fastr will submit jobs to the
    redis queue and workers will peel the jobs from the queue and process
    them.

    This system requires a running redis database and the database url has to
    be set in the fastr configuration.

    .. note::

        This execution plugin required the ``redis`` and ``rq`` packages to
        be installed before it can be loaded properly.
    """
    _queue = []
    _mappings = {}

[docs] def __init__(self, finished_callback=None, cancelled_callback=None, status_callback=None): super(StrongrExecution, self).__init__(finished_callback, cancelled_callback, status_callback) self.running = True fastr.log.debug('Creating strongr job collector') self.collector = threading.Thread(name='StrongrJobCollector-0', target=self.check_finished, args=()) self.collector.daemon = True fastr.log.debug('Starting strongr job collector') self.collector.start()
@classmethod
[docs] def test(cls): pass
@classproperty def configuration_fields(cls): return {}
[docs] def cleanup(self): super(StrongrExecution, self).cleanup()
def _job_finished(self, result): pass def _cancel_job(self, job): pass def _queue_job(self, job): cmd = ['/opt/strongr/addtask', '\'{}\''.format( base64.b64encode('/bin/bash -c "{} {} {} {}"'.format( 'source /opt/bbmri/bashrcappend && ', 'python', # '`python -c \'from fastr.execution import executionscript; print(executionscript.__file__)\'`', '/home/ubuntu/fastr/fastr/execution/executionscript.py', job.commandfile ))), '1', '1'] print(cmd) with open(job.stdoutfile, 'a') as fh_stdout, open(job.stderrfile, 'a') as fh_stderr: taskinfo = subprocess.check_output(cmd, stderr=fh_stderr) taskid = json.loads(taskinfo)['taskid'] self._queue.append(taskid) self._mappings[taskid] = job
[docs] def check_finished(self): while self.running: sout = subprocess.check_output('/opt/strongr/queryqueue') print(sout) queueinfo = json.loads(sout) if queueinfo == None: time.sleep(5.0) continue finished = [t for t in self._queue if t not in queueinfo] self._queue = [t for t in self._queue if t in queueinfo] fastr.log.info('# FINISHED: {}'.format(finished)) fastr.log.info('# QUEUE: {}'.format(self._queue)) for taskid in finished: fastr.log.info('## TASK ID: {}'.format(taskid)) self.job_finished(self._mappings[taskid]) time.sleep(1.0)