import datetime
import functools
import multiprocessing
import threading
import fastr
from fastr.web import app
from flask import url_for, abort
from flask_restplus import Api, Resource, reqparse, fields
api = Api(app,
doc='/api/doc/',
version='0.1',
title='Fastr REST API',
description='A REST API to interact with Fastr via the web',
default_mediatype='application/json')
runs = {}
STATUS_MANAGER = multiprocessing.Manager()
[docs]def update_status(job, job_status):
job_status[job.id] = str(job.status), str(job.node_id)
[docs]def update_job_result(job, job_status, job_results):
job_results[job.id] = str(job.output_data)
[docs]def network_lock_thread(lock, network):
fastr.log.debug('WAITING FOR LOCK')
with lock:
fastr.log.debug('CALLING NETWORK ABORT')
network.abort()
[docs]def network_runner(network, source_data, sink_data, chuck_status, job_status, job_results, abort_lock):
network.job_finished_callback = functools.partial(update_job_result, job_status=job_status, job_results=job_results)
network.job_status_callback = functools.partial(update_status, job_status=job_status)
abort_lock.acquire()
abort_thread = threading.Thread(name="NetworkAbort", target=network_lock_thread, args=(abort_lock, network))
abort_thread.start()
network.execute(source_data, sink_data)
[docs]class Run(object):
[docs] def __init__(self, id_, network, source_data, sink_data):
self.id = id_
self.chunks = STATUS_MANAGER.list()
self.jobs = STATUS_MANAGER.dict()
self.job_results = STATUS_MANAGER.dict()
self.source_data = source_data
self.sink_data = sink_data
self.network = network.id
self.abort_lock = multiprocessing.Lock()
self.process = self.run_network(network, source_data, sink_data, self.abort_lock)
[docs] def run_network(self, network, source_data, sink_data, abort_lock):
process = multiprocessing.Process(target=network_runner,
args=(network,
source_data,
sink_data,
self.chunks,
self.jobs,
self.job_results,
abort_lock),
name=self.id)
process.start()
return process
[docs] def status(self):
return {'job_status': dict(self.jobs),
'job_results': dict(self.job_results)}
[docs] def abort(self):
fastr.log.debug('RELEASING ABORT LOCK')
self.abort_lock.release()
if self.process:
fastr.log.debug('JOINING PROCESS')
self.process.join(timeout=3)
if self.process.is_alive():
fastr.log.debug('TERMINATING PROCESS')
self.process.terminate()
[docs]class ObjectUrl(fields.Raw):
__schema_type__ = "string"
[docs] def __init__(self, object_classs, **kwargs):
super(ObjectUrl, self).__init__(**kwargs)
self._object_class = object_classs
[docs]class SubUrl(fields.Raw):
__schema_type__ = "string"
[docs] def __init__(self, object_classs, subfield, **kwargs):
super(SubUrl, self).__init__(**kwargs)
self._object_class = object_classs
self._subfield = subfield
tool_list_model = api.model('ToolList', {
'tools': fields.List(fields.String),
})
[docs]class NetworkApi(Resource):
@api.response(200, 'Success')
@api.response(404, 'Network not found')
[docs] def get(self, id_):
"""
Get a Network json description from the server
"""
try:
return fastr.networklist[id_].dumps(method='dict')
except KeyError:
abort(404)
network_list_model = api.model('NetworkList', {
'networks': fields.List(ObjectUrl('api_network', attribute='id'))
})
[docs]class NetworkListApi(Resource):
@api.marshal_with(network_list_model)
[docs] def get(self):
"""
Get a list of the networks
"""
data = {'networks': fastr.networklist.values()}
print('Data: {}'.format(data))
return data
run_model = api.model('Run', {
'url': fields.Url,
'network': ObjectUrl('api_network', attribute='network'),
'status': ObjectUrl('api_status', attribute='id'),
'source_data': fields.Raw,
'sink_data': fields.Raw,
})
[docs]class RunApi(Resource):
"""
Run API documentation
"""
@api.response(200, 'Success')
@api.response(404, 'Network not found')
@api.marshal_with(run_model)
[docs] def get(self, id_):
"""
Get information about a Network run
"""
try:
return runs[id_]
except KeyError:
abort(404)
@api.response(204, 'Aborted Network run')
@api.response(404, 'Network not found')
[docs] def delete(self, id_):
"""
Abort a Network run and stop all associated execution
"""
if id_ in runs:
runs[id_].abort()
return None, 204
else:
return None, 404
run_list_model = api.model("RunList", {
'runs': fields.List(ObjectUrl('api_run', attribute='id'))
})
[docs]class RunListApi(Resource):
request_parser = reqparse.RequestParser()
request_parser.add_argument('network', type=str, required=True, location='json',
help='No network id specified')
request_parser.add_argument('source_data', type=dict, required=True, location='json',
help='No source data was supplied')
request_parser.add_argument('sink_data', type=dict, required=True, location='json',
help='No sink data was supplied')
@api.marshal_with(run_list_model)
[docs] def get(self):
"""
Get a list of all Network runs on the server
"""
return {'runs': runs.values()}
@api.expect(request_parser)
@api.response(201, "Created Network run")
[docs] def post(self):
"""
Create a new Network run and start execution
"""
args = self.request_parser.parse_args()
network = fastr.networklist[args['network']]
run_id = '{}_{}'.format(network.id, datetime.datetime.now().isoformat())
runs[run_id] = Run(run_id, network, args['source_data'], args['sink_data'])
return {'run_id': run_id,
'run': url_for('api_run', id=run_id, _external=True),
'status': url_for('api_status', id=run_id, _external=True)}, 201, {'Location': url_for('api_run', id=run_id)}
[docs]class StatusApi(Resource):
@api.response(200, "Success")
@api.response(404, "Run not found")
[docs] def get(self, id_):
"""
Get the status of a Network Run on the server
"""
try:
return runs[id_].status()
except KeyError:
abort(404)
api.add_resource(NetworkApi, '/api/networks/<id>', endpoint='api_network')
api.add_resource(NetworkListApi, '/api/networks', endpoint='api_networks')
api.add_resource(ToolApi, '/api/tools/<id>', endpoint='api_tool')
api.add_resource(ToolApi, '/api/tools/<id>/<version>', endpoint='api_version_tool')
api.add_resource(ToolListApi, '/api/tools', endpoint='api_tools')
api.add_resource(RunApi, '/api/runs/<id>', endpoint='api_run')
api.add_resource(RunListApi, '/api/runs', endpoint='api_runs')
api.add_resource(StatusApi, '/api/runs/<id>/status', endpoint='api_status')