Source code for fastr.utils.pim_publisher

import requests
import fastr

from fastr.core.node import MacroNode
from fastr.execution.job import JobState
from fastr import exceptions


[docs]class PimPublisher(object): """ Class to publish to PIM """ PIM_STATUS_MAPPING = { JobState.nonexistent: 'unknown', JobState.created: 'idle', JobState.queued: 'idle', JobState.hold: 'idle', JobState.running: 'running', JobState.execution_done: 'running', JobState.execution_failed: 'running', JobState.processing_callback: 'running', JobState.finished: 'success', JobState.failed: 'failed', JobState.cancelled: 'failed', }
[docs] def __init__(self, uri=None): if uri is None and fastr.config.pim_host == '': fastr.log.info("No valid PIM host given, PIM publishing will be disabled!") self.pim_uri = None else: self.pim_uri = uri or fastr.config.pim_host self.registered = False self.run_id = None
[docs] def pim_update_status(self, job): if not self.registered: fastr.log.debug('Did not register a RUN with PIM yet! Cannot' ' send status updates!') return run_id = job.run_id # Create PIM job data pim_job_data = { "id": job.id, "node_id": job.node_id, "run_id": run_id, "sample_id": str(job.sample_id), "status": self.PIM_STATUS_MAPPING[job.status] } # Send the data to PIM fastr.log.debug('Updating PIM job status {} => {} ({})'.format(job.id, job.status, self.PIM_STATUS_MAPPING[job.status])) uri = '{pim}/api/runs/{run_id}/jobs/{job_id}'.format(pim=fastr.config.pim_host, run_id=run_id, job_id=job.id) fastr.log.debug('Send PUT to pim at {}:\n{}'.format(uri, pim_job_data)) try: response = requests.put(uri, json=pim_job_data) except requests.ConnectionError as exception: fastr.log.error('Could no publish status to PIM, encountered exception: {}'.format(exception))
@staticmethod
[docs] def pim_serialize_network(network): """ Serialize Network in the correct for to use with PIM. :return: json data for PIM """ node_classes = {'Node': 'node', 'SourceNode': 'source', 'ConstantNode': 'constant', 'SinkNode': 'sink'} network_data = { "description": network.description, "nodes": [], "links": [], "groups": [{"id": 'step_' + x, "description": "undefined", "parent_group": "root"} for x in network.stepids.keys()] } # Add the nodes for node in network.nodelist.values(): if isinstance(node, MacroNode): # MacroNodes are a weird tool-less Node that will fail continue node_class = node.__class__.__name__ step = None for stepid, nodes in network.stepids.items(): if node in nodes: step = stepid break node_data = { "group_id": ('step_' + step) if step else "root", "id": node.id, "in_ports": [{'id': 'in_' + x.id, 'description': x.description} for x in node.tool.inputs.values()], "out_ports": [{'id': 'out_' + x.id, 'description': x.description} for x in node.tool.outputs.values()], "type": node_classes[node_class] if node_class in node_classes else 'node' } network_data["nodes"].append(node_data) # Add the links for link in network.linklist.values(): link_data = { "id": link.id, "from_node": link.source.node.id, "from_port": 'out_' + link.source.id, "to_node": link.target.node.id, "to_port": 'in_' + link.target.id, "type": link.source.resulting_datatype.id } network_data["links"].append(link_data) return network_data
[docs] def pim_register_run(self, network): if self.pim_uri is None: fastr.log.debug('No valid PIM uri known. Cannot register to PIM!') return self.run_id = network.run_id pim_run_data = { "collapse": False, "description": "Run of {} started at {}".format(network.id, network.timestamp), "id": self.run_id, "network": self.pim_serialize_network(network), "workflow_engine": "fastr" } uri = '{pim}/api/runs/'.format(pim=fastr.config.pim_host) fastr.log.info('Registering {} with PIM at {}'.format(self.run_id, uri)) fastr.log.debug('Send PUT to pim at {}:\n{}'.format(uri, pim_run_data)) # Send out the response and record if we registered correctly try: response = requests.put(uri, json=pim_run_data) if response.status_code in [200, 201]: self.registered = True except requests.ConnectionError as exception: fastr.log.error('Could no register network to PIM, encountered' ' exception: {}'.format(exception))