# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import jsonpickle
import json
import base64
import os
import subprocess
import sys
import threading
import time
import traceback
import fastr
import fastr.resources
from fastr.core.baseplugin import PluginState
from fastr.execution.executionpluginmanager import ExecutionPlugin, JobAction
from fastr.execution.job import Job, JobState
from fastr.utils.classproperty import classproperty
class StrongrExecution(ExecutionPlugin):
"""
A execution plugin based on Redis Queue. Fastr will submit jobs to the
redis queue and workers will peel the jobs from the queue and process
them.
This system requires a running redis database and the database url has to
be set in the fastr configuration.
.. note::
This execution plugin required the ``redis`` and ``rq`` packages to
be installed before it can be loaded properly.
"""
_queue = []
_mappings = {}
[docs] def __init__(self, finished_callback=None, cancelled_callback=None, status_callback=None):
super(StrongrExecution, self).__init__(finished_callback, cancelled_callback, status_callback)
self.running = True
fastr.log.debug('Creating strongr job collector')
self.collector = threading.Thread(name='StrongrJobCollector-0', target=self.check_finished, args=())
self.collector.daemon = True
fastr.log.debug('Starting strongr job collector')
self.collector.start()
@classmethod
@classproperty
def configuration_fields(cls):
return {}
[docs] def cleanup(self):
super(StrongrExecution, self).cleanup()
def _job_finished(self, result):
pass
def _cancel_job(self, job):
pass
def _queue_job(self, job):
cmd = ['/opt/strongr/addtask', '\'{}\''.format(
base64.b64encode('/bin/bash -c "{} {} {} {}"'.format(
'source /opt/bbmri/bashrcappend && ',
'python',
# '`python -c \'from fastr.execution import executionscript; print(executionscript.__file__)\'`',
'/home/ubuntu/fastr/fastr/execution/executionscript.py',
job.commandfile
))), '1', '1']
print(cmd)
with open(job.stdoutfile, 'a') as fh_stdout, open(job.stderrfile, 'a') as fh_stderr:
taskinfo = subprocess.check_output(cmd, stderr=fh_stderr)
taskid = json.loads(taskinfo)['taskid']
self._queue.append(taskid)
self._mappings[taskid] = job
[docs] def check_finished(self):
while self.running:
sout = subprocess.check_output('/opt/strongr/queryqueue')
print(sout)
queueinfo = json.loads(sout)
if queueinfo == None:
time.sleep(5.0)
continue
finished = [t for t in self._queue if t not in queueinfo]
self._queue = [t for t in self._queue if t in queueinfo]
fastr.log.info('# FINISHED: {}'.format(finished))
fastr.log.info('# QUEUE: {}'.format(self._queue))
for taskid in finished:
fastr.log.info('## TASK ID: {}'.format(taskid))
self.job_finished(self._mappings[taskid])
time.sleep(1.0)