# Copyright 2011-2014 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import urlparse
import prov
from prov.model import ProvDocument
import fastr
from fastr import exceptions
from fastr.data import url
from fastr.datatypes import Deferred
from fastr.utils import iohelpers
[docs]class Provenance(object):
"""
The Provenance object keeps track of everything that happens to a data object.
"""
[docs] def __init__(self, host=None):
if host is not None:
self.host = host.rstrip('/')
else:
self.host = fastr.config.web_url()
self.namespaces = {}
self.document = ProvDocument()
# Define default namespaces
self.fastr = self._add_namespace('fastr')
self.tool = self._add_namespace('tool')
self.node = self._add_namespace('node')
self.job = self._add_namespace('job')
self.data = self._add_namespace('data')
self.worker = self._add_namespace('worker')
self.network = self._add_namespace('network')
self.fastr_info = self._add_namespace('fastrinfo')
# Add fastr agent
self.fastr_agent = self.agent(self.fastr[fastr.version.version])
def _add_namespace(self, name, parent=None, url=None):
if parent is None:
host = self.host
else:
if parent in self.namespaces.keys():
host = self.namespaces[parent].uri
else:
return None
if url is None:
self.namespaces[name] = self.document.add_namespace(name, "{}/{}/".format(host, name))
else:
self.namespaces[name] = self.document.add_namespace(name, "{}/{}".format(host, url))
return self.namespaces[name]
[docs] def agent(self, identifier, other_attributes=None):
return self.document.agent(identifier, other_attributes)
[docs] def activity(self, identifier, start_time=None, end_time=None, other_attributes=None):
return self.document.activity(identifier, start_time, end_time, other_attributes)
[docs] def entity(self, identifier, other_attributes=None):
return self.document.entity(identifier, other_attributes)
[docs] def init_provenance(self, job):
"""
Create initial provenance document
"""
self.job_activity = self.activity(self.job[job.id])
self.tool_agent = self.agent(
self.tool["{}/{}".format(job.tool_name, job.tool_version)],
{
'fastrinfo:tool_dump': fastr.toollist[job.tool_name, job.tool_version].dumps()
}
)
self.node_agent = self.agent(self.node[job.node_id])
#FIXME: Find a better way to collect the current network id
self.network_agent = self.agent(self.network["{}/{}".format(job.network_id, job.network_version)])
self.document.actedOnBehalfOf(self.network_agent, self.fastr_agent)
self.document.actedOnBehalfOf(self.node_agent, self.tool_agent)
self.document.actedOnBehalfOf(self.node_agent, self.network_agent)
self.document.wasAssociatedWith(self.job_activity, self.node_agent)
[docs] def collect_provenance(self, job, advanced_flow=False):
"""
Collect the provenance for this job
"""
tool = job.tool
for input_argument_key, input_argument_value in job.input_arguments.items():
input_description = tool.inputs[input_argument_key]
if input_description.hidden:
# Skip hidden inputs (they are used for the system to feed
# additional data to sources/sinks etc) not interesting
# for provenance
continue
if not advanced_flow:
self.collect_input_argument_provenance(input_argument_value)
else:
for sample in input_argument_value:
self.collect_input_argument_provenance(sample)
self.document._records = list(set(self.document._records))
@staticmethod
[docs] def data_uri(value, job):
fastr.log.info('DATA URI for: {}, job: {}'.format(value.raw_value, job))
if isinstance(value.raw_value, str) and value.raw_value.startswith('val://'):
val_uri = urlparse.urlparse(value.raw_value)
val_query = urlparse.parse_qs(val_uri.query)
sample_id = val_query['sampleid'][0] if 'sampleid' in val_query else job.sample_id
return 'fastr://data/job/{job_id}/output/{outputid}/sample/{sample}/cardinality/{cardinality}'.format(
job_id=job.id,
outputid=val_query['outputname'][0],
sample=sample_id,
cardinality=val_query['nr'][0]
)
else:
if url.isurl(value.raw_value):
return value.raw_value
else:
return 'fastr://data/constant/{}'.format(value.raw_value)
@staticmethod
[docs] def get_parent_provenance(value):
"""
Find the provenance of the parent job
:param str value: url for the value for which to find the job
:return: the provenance of the job that created the value
:raises FastrKeyError: if the deferred is not available (yet)
:raises FastrValueError: if the value is not a valid deferred url
"""
if not isinstance(value, Deferred):
return None, None
parsed_url = urlparse.urlparse(value.raw_value)
if parsed_url.scheme != 'val':
raise exceptions.FastrValueError('Cannot lookup value {}, wrong url scheme'.format(value.raw_value))
# First load job to find location of the prov file
datafile = os.path.join(fastr.config.mounts[parsed_url.netloc],
os.path.normpath(parsed_url.path[1:]))
# Open Job file
data = iohelpers.load_gpickle(datafile)
# Get provenance file
prov.read(data.provfile, 'json')
return prov, data
[docs] def serialize(self, filename, format):
with open(filename, 'w') as fh_out:
self.document.serialize(destination=fh_out, format=format)
# Make sure the data gets flushed
fh_out.flush()
os.fsync(fh_out.fileno())