Source code for elasticsearchreporter

# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import datetime
import os
import json

import fastr
from fastr.helpers.classproperty import classproperty
from fastr.abc.baseplugin import PluginState
from fastr.plugins.reportingplugin import ReportingPlugin
from fastr.execution.job import JobState, Job
from fastr.exceptions import FastrOptionalModuleNotAvailableError


try:
    from elasticsearch import Elasticsearch
    elasticsearch_loaded = True
except ImportError:
    elasticsearch_loaded = False


class ElasticsearchReporter(ReportingPlugin):
    if not elasticsearch_loaded:
        _status = (PluginState.failed, 'Could not load elasticsearch module required for cluster communication')

[docs] def __init__(self): super().__init__()
[docs] @classmethod def test(cls): if not elasticsearch_loaded: raise FastrOptionalModuleNotAvailableError('Could not import the required elasticsearch for this plugin')
[docs] def activate(self): """ Activate the reporting plugin """ super().activate() # Parse URI if fastr.config.elasticsearch_host == '': fastr.log.info("No valid elasticsearchsearch host given, elasticsearch Reporting will be disabled!") self.elasticsearch_uri = None return fastr.log.info("") self.elasticsearch_uri = fastr.config.elasticsearch_host self.elasticsearch_index = fastr.config.elasticsearch_index # Create Elastic search index fastr.log.info("ES Logging to {} at index {}".format(self.elasticsearch_uri, self.elasticsearch_index)) es = Elasticsearch([self.elasticsearch_uri]) es.indices.create(index=self.elasticsearch_index, ignore=400)
@classproperty def configuration_fields(cls): return { "elasticsearch_host": (str, "", "The elasticsearch host to report to"), "elasticsearch_index": (str, "fastr", "The elasticsearch index to store data in"), "elasticsearch_debug": (bool, False, "Setup elasticsearch debug mode to send stdout stderr on job succes"), }
[docs] def elasticsearch_update_status(self, job): es = Elasticsearch([self.elasticsearch_uri]) es.indices.create(index=self.elasticsearch_index, ignore=400) node = job.node job_data = { "timestamp": datetime.datetime.utcnow().isoformat(), "network_id" : node.parent.long_id, "network_version" : str(node.parent.network_version), "network_tmpurl" : node.parent.tmpurl, "run_id" : node.parent.id, "node" : str(node), "node_id" : node.id, "node_global_id" : node.global_id, "tool_name" : node.tool.ns_id, "tool_version" : str(node.tool.command['version']), "sample_index" : list(job.sample_index), "sample_id": list(job.sample_id), "errors": str(job.errors), "status": str(job.status), } if os.path.exists(job.extrainfofile): with open(job.extrainfofile) as extra_info_file: extra_info = json.load(extra_info_file) process = extra_info.get('process') job_data['process'] = process es.index(index=self.elasticsearch_index, doc_type='fastr-job', body=job_data)
[docs] def job_updated(self, job: Job): if self.elasticsearch_uri: self.elasticsearch_update_status(job)
# def run_started(self, run: NetworkRun): # if self.elasticsearch_uri: # self.api.elasticsearch_register_run(run) # def run_finished(self, run: NetworkRun): # if self.elasticsearch_uri: # self.api.elasticsearch_finish_run(run) # def log_record_emitted(self, record: LogRecord): # if self.elasticsearch_uri: # self.api.elasticsearch_log_line(record)