from abc import ABCMeta, abstractmethod
from collections import deque
import datetime
import getpass
import itertools
import json
from logging import LogRecord
import os
import time
from typing import Dict
from threading import Lock, Thread

import requests
import fastr

from fastr.helpers.classproperty import classproperty
from fastr.plugins.reportingplugin import ReportingPlugin
from fastr.execution.job import JobState, Job
from fastr.execution.networkrun import NetworkRun

[docs]class BasePimAPI(metaclass=ABCMeta): """ Base class for PIM API classes which specifies the methods required to function """
[docs] @abstractmethod def pim_update_status(self, job: Job): """ Update the status of a job :param job: The job which to update """
[docs] @abstractmethod def pim_register_run(self, network: NetworkRun): """ Send the basic Network layout to PIM and register the run. :param network: The network run to register to PIM """
[docs] @abstractmethod def pim_finish_run(self, network: NetworkRun): """ Set the PIM run to finished and clean up :param network: The network run to finish """
[docs] @abstractmethod def pim_log_line(self, record: LogRecord): """ Send a new line of log record to PIM :param record: the log record to send """
[docs]class PimAPIv2(object): """ Class to publish to PIM """ PIM_STATUS_MAPPING = { JobState.nonexistent: 5, JobState.created: 0, JobState.queued: 0, JobState.hold: 0, JobState.running: 1, JobState.execution_done: 1, JobState.execution_failed: 1, JobState.processing_callback: 1, JobState.finished: 2, JobState.failed: 3, JobState.cancelled: 4, } NODE_CLASSES = { 'NodeRun': 'node', 'SourceNodeRun': 'source', 'ConstantNodeRun': 'constant', 'SinkNodeRun': 'sink' } STATUS_TYPES = [ { "color": "#aaccff", "description": "Jobs that are waiting for input", "title": "idle" }, { "color": "#daa520", "description": "Jobs that are running", "title": "running" }, { "color": "#23b22f", "description": "Jobs that finished successfully", "title": "success" }, { "color": "#dd3311", "description": "Jobs that have failed", "title": "failed" }, { "color": "#334477", "description": "Jobs which were cancelled", "title": "cancelled" }, { "color": "#ccaa99", "description": "Jobs with an undefined state", "title": "undefined" } ]
[docs] def __init__(self, uri=None): self.pim_uri = uri self.registered = False self.run_id = None self.jobs_uri = None # Data for sending will be cached and flushed self.running = True self.submit_thread = Thread(target=self.job_update_loop, name='PimSubmitter', daemon=True) self.update_interval = fastr.config.pim_update_interval self.batch_size = fastr.config.pim_batch_size self.queued_job_updates = deque() self.submitted_job_updates = [] self.updates_lock = Lock() # Some data self.counter = itertools.count() self.scopes = {None: 'root'} self.nodes = {} self.job_states = {}
[docs] def create_job_data(self, job: Job) -> Dict: """ Create a job data json part that is ready to send to PIM :param job: the job to convert :return: """ try: node = self.nodes[job.node_global_id] except KeyError:'NODES: {}'.format(self.nodes)) raise # Create PIM job data pim_job_data = { "path": '{}/{}'.format(node,, "title": "", "customData": { "sample_id": list(job.sample_id), "sample_index": list(job.sample_index), "errors": job.errors, }, "status": self.PIM_STATUS_MAPPING[job.status], "description": "", } # Send the data to PIM fastr.log.debug('Updating PIM job status {} => {} ({})'.format(, job.status, self.PIM_STATUS_MAPPING[job.status])) if (job.status == JobState.failed or fastr.config.pim_debug) and os.path.exists(job.extrainfofile): with open(job.extrainfofile) as extra_info_file: extra_info = json.load(extra_info_file) process = extra_info.get('process') if process: # Process information pim_job_data['customData']['__stdout__'] = process.get('stdout') pim_job_data['customData']['__stderr__'] = process.get('stderr') pim_job_data['customData']['command'] = process.get('command') pim_job_data['customData']['returncode'] = process.get('returncode') pim_job_data['customData']['time_elapsed'] = process.get('time_elapsed') # Host information pim_job_data['customData']['hostinfo'] = extra_info.get('hostinfo') # Input output hashes for validation of files pim_job_data['customData']['input_hash'] = extra_info.get('input_hash') pim_job_data['customData']['output_hash'] = extra_info.get('output_hash') # Tool information pim_job_data['customData']['tool_name'] = extra_info.get('tool_name') pim_job_data['customData']['tool_version'] = extra_info.get('tool_version') return pim_job_data
[docs] def pim_update_status(self, job): if self.pim_uri is None: return if not self.registered: fastr.log.debug('Did not register a RUN with PIM yet! Cannot' ' send status updates!') return if job.status not in [ JobState.created, JobState.running, JobState.finished, JobState.cancelled, JobState.failed, ]: return if in self.job_states and self.job_states[] == self.PIM_STATUS_MAPPING[job.status]: # Not a valid update fastr.log.debug('Ignoring non-PIM update') return else: self.job_states[] = self.PIM_STATUS_MAPPING[job.status] with self.updates_lock: self.queued_job_updates.append(self.create_job_data(job))
[docs] def job_update_loop(self): """ Loop that periodically updates job data """ while self.running or len(self.submitted_job_updates) != 0 or len(self.queued_job_updates) != 0: # Not time of start of this loop iteration last_update = time.time() # Select job data to submit with self.updates_lock: # Find number of jobs we can add number_of_jobs = min(self.batch_size - len(self.submitted_job_updates), len(self.queued_job_updates)) # Append extra jobs to fill the batch size for _ in range(number_of_jobs): self.submitted_job_updates.append( self.queued_job_updates.popleft() ) # If there are jobs update to submit, do so if len(self.submitted_job_updates) > 0: try: response = requests.put(self.jobs_uri, json=self.submitted_job_updates, timeout=5) except requests.ConnectionError as exception: fastr.log.warning('Could no publish status to PIM, encountered exception: {}'.format(exception)) except requests.Timeout: fastr.log.warning('Connection to PIM timed out during job update submission') else: if response.status_code >= 300: fastr.log.warning('Response of jobs update: [{r.status_code}] {r.text}'.format(r=response)) else: # Success self.submitted_job_updates = [] # Sleep to get a roughly update_interval second loop time time_to_sleep = last_update + self.update_interval - time.time() time_to_sleep = time_to_sleep if time_to_sleep > 0 else 0 time.sleep(time_to_sleep)
[docs] def pim_serialize_node(self, node, scope, links): # Fish out macros and use specialized function if type(node).__name__ == 'MacroNodeRun': return self.pim_serialize_macro(node, scope, links) node_data = { "name":, "title":, "children": [], "customData": {}, "inPorts": [ { "name":, "title":, "customData": { "input_group": output.input_group, "datatype":, "dimension_names": [ for x in output.dimensions], }, } for output in node.inputs.values() ], "outPorts": [ { "name":, "title":, "customData": { "datatype":, "dimension_names": [ for x in output.dimensions], }, } for output in node.outputs.values() ], "type": type(node).__name__, } if type(node).__name__ == 'SourceNodeRun': node_data['inPorts'].append( { "name": 'input', "title": 'input', "customData": { "datatype":, "dimension_names": [], }, } ) if type(node).__name__ == 'SinkNodeRun': node_data['outPorts'].append( { "name": 'output', "title": 'output', "customData": { "datatype":, "dimension_names": node.dimnames, }, } ) # Register node id mapping self.nodes[node.global_id] = '{}/{}'.format(scope, return node_data
[docs] def pim_serialize_macro(self, node, scope, links): new_scope = '{}/{}'.format(scope, self.nodes[node.global_id] = new_scope # Set node data node_data = { "name":, "title":, "children": [], "customData": {}, "inPorts": [], "outPorts": [], "type": type(node).__name__, } # Serialize underlying network self.pim_serialize_network(node.network_run, new_scope, node_data, links) return node_data
[docs] def pim_serialize_network(self, network, scope, parent, links): visited_nodes = set() for step_name, step_nodes in network.stepids.items(): step_data = { "name": step_name, "title": step_name, "children": [], "customData": {}, "inPorts": [], "outPorts": [], "type": "NetworkStep", } parent['children'].append(step_data) # Serialize nodes to parents child list for node in step_nodes: step_data['children'].append(self.pim_serialize_node(node, '{}/{}'.format(scope, step_name), links)) visited_nodes.add( # Serialize nodes to parents child list for node in network.nodelist.values(): if not in visited_nodes: parent['children'].append(self.pim_serialize_node(node, scope, links)) # Serialize links to global link list for link in network.linklist.values(): links.append(self.pim_serialize_link(link))
[docs] def pim_register_run(self, network): if self.pim_uri is None: fastr.log.warning('No valid PIM uri known. Cannot register to PIM!') return self.run_id = pim_run_data = { "title": self.run_id, "name": self.run_id, "assignedTo": [], "user": getpass.getuser(), "root": { "name": "root", "title": network.network_id, "description": "", "children": [], "customData": {}, "inPorts": [], "outPorts": [], "type": "NetworkRun", }, "links": [], "description": "Run of {} started at {}".format(, network.timestamp), "customData": { "workflow_engine": "fastr", "tmpdir": network.tmpdir, }, "statusTypes": self.STATUS_TYPES, } self.pim_serialize_network(network=network, scope="root", parent=pim_run_data["root"], links=pim_run_data["links"]) uri = '{pim}/api/runs/'.format(pim=fastr.config.pim_host)'Registering {} with PIM at {}'.format(self.run_id, uri)) fastr.log.debug('Send PUT to pim at {}:\n{}'.format(uri, json.dumps(pim_run_data, indent=2))) # Cache the jobs uri self.jobs_uri = '{pim}/api/runs/{run_id}/jobs'.format(pim=fastr.config.pim_host, run_id=self.run_id) # Send out the response and record if we registered correctly try: response =, json=pim_run_data) if response.status_code in [200, 201]: self.registered = True self.submit_thread.start()'Run registered in PIM at {}/runs/{}'.format(fastr.config.pim_host, self.run_id)) else: fastr.log.warning('Could not register run at PIM, got a {} response'.format(response.status_code)) fastr.log.warning('Response: {}'.format(response.text)) except requests.ConnectionError as exception: fastr.log.error('Could no register network to PIM, encountered' ' exception: {}'.format(exception)) # Create dummy job root_job_data = { "path": 'root/master', "title": "", "customData": { }, "status": self.PIM_STATUS_MAPPING[JobState.running], "description": "", } self.queued_job_updates.append(root_job_data)
[docs] def pim_finish_run(self, run): # Finish dummy job root_job_data = { "path": 'root/master', "title": "", "customData": { }, "status": self.PIM_STATUS_MAPPING[JobState.finished if run.result else JobState.failed], "description": "", } self.queued_job_updates.append(root_job_data) self.running = False while len(self.queued_job_updates) != 0 or len(self.submitted_job_updates) != 0: time.sleep(self.update_interval)'Waiting for all jobs to be published to PIM...')
[docs] def pim_log_line(self, record: LogRecord): timestamp = datetime.datetime.utcfromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S.%f') root_job_data = { "path": 'root/master', "customData": { '__log__': { timestamp: { 'process_name': record.processName, 'thread_name': record.threadName, 'level_name': record.levelname, 'module': record.module, 'function': record.funcName, 'lineno': record.lineno, 'message': record.msg } } } } self.queued_job_updates.append(root_job_data)
[docs]class PimReporter(ReportingPlugin): SUPPORTED_APIS = { 2: PimAPIv2, }
[docs] def __init__(self): super().__init__() self.api = None
[docs] def activate(self): """ Activate the reporting plugin """ super().activate() # Parse URI if fastr.config.pim_host == '':"No valid PIM host given, PIM publishing will be disabled!") self.pim_uri = None self.api = None return self.pim_uri = fastr.config.pim_host try: response = requests.get('{pim}/api/info'.format(pim=self.pim_uri)) if response.status_code >= 300: version = 1 else: version = response.json().get('version', 1) except requests.ConnectionError as exception: fastr.log.error('Could no publish status to PIM, encountered exception: {}'.format(exception)) return try: api_class = self.SUPPORTED_APIS[version]'Using PIM API version {}'.format(version)) except KeyError: fastr.log.error('PIM API version {} not supported!'.format(version)) return self.api = api_class(self.pim_uri)
@classproperty def configuration_fields(cls): return { "pim_host": (str, "", "The PIM host to report to"), "pim_update_interval": (float, 2.5, "The interval in which to send jobs to PIM"), "pim_batch_size": (int, 100, "Maximum number of jobs that can be send to PIM in a single interval"), "pim_debug": (bool, False, "Setup PIM debug mode to send stdout stderr on job succes"), }
[docs] def job_updated(self, job: Job): if self.pim_uri and self.api: self.api.pim_update_status(job)
[docs] def run_started(self, run: NetworkRun): if self.pim_uri and self.api: self.api.pim_register_run(run)
[docs] def run_finished(self, run: NetworkRun): if self.pim_uri and self.api: self.api.pim_finish_run(run)
[docs] def log_record_emitted(self, record: LogRecord): if self.pim_uri and self.api and self.api.running: self.api.pim_log_line(record)