Source code for drmaaexecution

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import os
import Queue
import sys
import threading

import fastr
from fastr.core.baseplugin import PluginState

try:
    import drmaa
    load_drmaa = True
except (ImportError, RuntimeError):
    load_drmaa = False

from fastr.execution.executionpluginmanager import ExecutionPlugin
from fastr.utils.classproperty import classproperty


class DRMAAExecution(ExecutionPlugin):
    """
    A DRMAA execution plugin to execute Jobs on a Grid Engine cluster. It uses
    a configuration option for selecting the queue to submit to. It uses the
    python ``drmaa`` package.

    .. note::

        To use this plugin, make sure the ``drmaa`` package is installed and
        that the execution is started on an SGE submit host with DRMAA
        libraries installed.

    .. note::

        This plugin is at the moment tailored to SGE, but it should be fairly
        easy to make different subclasses for different DRMAA supporting
        systems.
    """
    if load_drmaa:
        try:
            # Try to create a session
            session = drmaa.Session()
            session.initialize()
            session.exit()
            del session
        except:
            _status = (PluginState.failed, 'Could not create a DRMAA session')
    else:
        _status = (PluginState.failed, 'Could not load DRMAA module required for cluster communication')

[docs] def __init__(self, finished_callback=None, cancelled_callback=None, status_callback=None): super(DRMAAExecution, self).__init__(finished_callback, cancelled_callback, status_callback) # Some default self.default_queue = fastr.config.drmaa_queue # Create the DRMAA session self.session = drmaa.Session() self.session.initialize() fastr.log.info('A DRMAA session was started successfully') response = self.session.contact fastr.log.info('session contact returns: ' + response) # Create job translation table self.job_translation_table = dict() self.job_lookup_table = dict() # Create even queue lock self.submit_queue = Queue.Queue() self.queue_lock = threading.Event() # Create callback collector and job submitter self.running = True fastr.log.debug('Creating job collector') self.collector = threading.Thread(name='DRMAAJobCollector-0', target=self.collect_jobs, args=()) self.collector.daemon = True fastr.log.debug('Starting job collector') self.collector.start() fastr.log.debug('Creating job submitter') self.submitter = threading.Thread(name='DRMAAJobSubmitter-0', target=self.submit_jobs, args=()) self.submitter.daemon = True fastr.log.debug('Starting job submitter') self.submitter.start()
@classproperty def configuration_fields(cls): return { "drmaa_queue": (str, "week", "The default queue to use for jobs send to the scheduler") }
[docs] def cleanup(self): # Stop submissions and callbacks self.running = False # Signal collector thread to stop running super(DRMAAExecution, self).cleanup() # See if there are leftovers in the job translation table that can be cancelled while len(self.job_translation_table) > 0: drmaa_job_id, job = self.job_translation_table.popitem() fastr.log.info('Terminating left-over job {}'.format(drmaa_job_id)) self.session.control(drmaa_job_id, 'terminate') fastr.log.debug('Stopping DRMAA executor') # Destroy DRMAA try: self.session.exit() fastr.log.debug('Exiting DRMAA session') except drmaa.NoActiveSessionException: pass if self.collector.isAlive(): fastr.log.debug('Terminating job collector thread') self.collector.join() if self.submitter.isAlive(): fastr.log.debug('Terminating job submitter thread') self.submitter.join() fastr.log.debug('DRMAA executor stopped!')
def _queue_job(self, job): self.submit_queue.put(job, block=True) def _cancel_job(self, job): if job.id not in self.job_lookup_table: fastr.log.info('Job {} not found in DRMAA lookup'.format(job.id)) return drmaa_job_id = self.job_lookup_table[job.id] fastr.log.info('Cancelling job {}'.format(drmaa_job_id)) try: self.session.control(drmaa_job_id, 'terminate') except drmaa.InvalidJobException: fastr.log.warning('Trying to cancel an unknown job, already finished/cancelled?') del self.job_translation_table[drmaa_job_id] def _release_job(self, job): pass def _job_finished(self, result): pass # FIXME This needs to be more generic! This is for our SGE cluster only!
[docs] def send_job(self, command, arguments, queue=None, walltime=None, job_name=None, memory=None, ncores=None, joinLogFiles=False, outputLog=None, errorLog=None, hold_job=None): # Create job template jt = self.session.createJobTemplate() jt.remoteCommand = command jt.args = arguments jt.joinFiles = joinLogFiles env = os.environ # Make sure environment modules do not annoy use with bash warnings # after the shellshock bug was fixed env.pop('BASH_FUNC_module()', None) jt.jobEnvironment = env if queue is None: queue = self.default_queue native_spec = '-cwd -q %s' % queue if walltime is not None: native_spec += ' -l h_rt=%s' % walltime if memory is not None: native_spec += ' -l h_vmem=%s' % memory if ncores is not None: native_spec += ' -pe smp %d' % ncores if outputLog is not None: native_spec += ' -o %s' % outputLog if errorLog is not None: native_spec += ' -e %s' % errorLog if hold_job is not None: if isinstance(hold_job, int): native_spec += ' -hold_jid {}'.format(hold_job) elif isinstance(hold_job, list) or isinstance(hold_job, tuple): if len(hold_job) > 0: jid_list = ','.join([str(x) for x in hold_job]) native_spec += ' -hold_jid {}'.format(jid_list) else: fastr.log.error('Incorrect hold_job type!') fastr.log.debug('Setting native spec to: {}'.format(native_spec)) jt.nativeSpecification = native_spec if job_name is None: job_name = command job_name = job_name.replace(' ', '_') job_name = job_name.replace('"', '') if len(job_name) > 32: job_name = job_name[0:32] jt.jobName = job_name # Send job to cluster jobid = self.session.runJob(jt) # Remove job template self.session.deleteJobTemplate(jt) return jobid
[docs] def submit_jobs(self): while self.running: try: job = self.submit_queue.get(block=True, timeout=2) # Get job command and write to file command = [sys.executable, os.path.join(fastr.config.executionscript), job.commandfile] fastr.log.debug('Command to queue: {}'.format(command)) # Make sure we do not submit after it stopped running if not self.running: break fastr.log.info('Queueing {} via DRMAA'.format(job.jobid)) # Submit command to scheduler cl_job_id = self.send_job(command[0], command[1:], job_name='fastr_{}'.format(job.jobid), memory=job.required_memory, ncores=job.required_cores, walltime=job.required_time, outputLog=job.stdoutfile, errorLog=job.stderrfile, hold_job=[self.job_lookup_table[x] for x in job.hold_jobs if x in self.job_lookup_table], ) # Register job in the translation tables self.job_translation_table[cl_job_id] = job fastr.log.debug('Inserting {} in lookup table pointing to {}'.format(job.jobid, cl_job_id)) self.job_lookup_table[job.jobid] = cl_job_id fastr.log.info('Job {} queued via DRMAA'.format(job.jobid)) # Set the queue lock to indicate there is content in the queue if not self.queue_lock.is_set(): fastr.log.debug('Setting queue_lock') self.queue_lock.set() except Queue.Empty: pass fastr.log.info('DRMAA submission thread ended!')
[docs] def collect_jobs(self): first_wait = True second_wait = True while self.running: # Wait for the queue to contain if first_wait and not self.queue_lock.is_set(): fastr.log.debug('Waiting jobs to be queued...') if not self.queue_lock.wait(2): first_wait = False continue first_wait = True if second_wait: fastr.log.debug('Waiting for a job to finish...') try: info = self.session.wait(drmaa.Session.JOB_IDS_SESSION_ANY, 5) second_wait = True except drmaa.ExitTimeoutException: second_wait = False continue except drmaa.InvalidJobException: fastr.log.debug('No valid jobs (session queue appears to be empty)') fastr.log.debug('Clearing queue_lock') self.queue_lock.clear() continue except drmaa.NoActiveSessionException: if not self.running: fastr.log.info('DRMAA session no longer active, quiting collector...') else: fastr.log.critical('DRMAA session no longer active, but DRMAA executor not stopped properly! Quitting') self.running = False continue except drmaa.errors.DrmaaException as exception: # Avoid the collector getting completely killed on another DRMAA exception fastr.log.warning('Encountered unexpected DRMAA exception: {}'.format(exception)) second_wait = True continue except Exception as exception: #if exception.message == 'code 24: no usage information was returned for the completed job': if exception.message.startswith('code 24:'): # Avoid the collector getting completely killed this specific exception fastr.log.warning('Encountered unexpected DRMAA exception: {}'.format(exception)) second_wait = True continue else: raise fastr.log.info('Cluster DRMAA job {} finished'.format(info.jobId)) # Create a copy of the job that finished and remove from the translation table job = copy.deepcopy(self.job_translation_table.get(info.jobId)) if job is not None: del self.job_translation_table[info.jobId] # Send the result to the callback function self.job_finished(job) else: fastr.log.warning('Job {} no longer available (got cancelled?)'.format(info.jobId)) fastr.log.info('DRMAA collection thread ended!')