Source code for provdbconnector.prov_db

import json
import logging
import os
from collections import namedtuple
from io import StringIO
from uuid import uuid4

from prov.constants import PROV_ATTRIBUTES, PROV_MENTION, PROV_BUNDLE, PROV_LABEL, PROV_TYPE
from prov.model import ProvDocument, ProvEntity, ProvBundle, ProvRecord, ProvElement, ProvRelation, QualifiedName, \
    ProvAssociation, PROV_REC_CLS, ProvActivity, ProvAgent, PROV_AGENT,PROV_ENTITY,PROV_ACTIVITY, PROV_ATTR_AGENT,PROV_ATTR_ACTIVITY, PROV_ATTR_ENTITY,PROV_ATTR_BUNDLE
from provdbconnector.db_adapters.baseadapter import METADATA_KEY_PROV_TYPE, METADATA_KEY_IDENTIFIER, \
    METADATA_KEY_NAMESPACES, \
    METADATA_KEY_TYPE_MAP, METADATA_KEY_IDENTIFIER_ORIGINAL
from provdbconnector.exceptions.provapi import NoDataBaseAdapterException, InvalidArgumentTypeException, \
    InvalidProvRecordException
from provdbconnector.exceptions.utils import ParseException
from provdbconnector.exceptions.database import NotFoundException
from provdbconnector.utils.converter import form_string, to_json, to_provn, to_xml
from provdbconnector.utils.serializer import encode_json_representation, add_namespaces_to_bundle, create_prov_record, \
    PROV_ATTR_BASE_CLS, serialize_namespace

LOG_LEVEL = os.environ.get('LOG_LEVEL', '')
NUMERIC_LEVEL = getattr(logging, LOG_LEVEL.upper(), None)
logging.basicConfig(level=NUMERIC_LEVEL)
logging.getLogger("prov.model").setLevel(logging.WARN)
log = logging.getLogger(__name__)

PROV_API_BUNDLE_IDENTIFIER_PREFIX = "prov:bundle:{}"


[docs]class ProvDb(object): """ The public api class. This class provide methods to save and get documents or part of ProvDocuments """ def __init__(self, api_id=None, adapter=None, auth_info=None, *args): """ Save a new instance of ProvAPI :param api_id: The id of the api, optional :type api_id: int or str :param adapter: The adapter class, must enhance from BaseAdapter :type adapter: Baseadapter :param auth_info: A dict object that contains the information for authentication :type auth_info: dict or None """ if api_id is None: self.api_id = uuid4() else: self.api_id = api_id if adapter is None: raise NoDataBaseAdapterException() self._adapter = adapter() self._adapter.connect(auth_info) # Converter Methods
[docs] def save_document_from_json(self, content=None): """ Saves a new document in the database :param content: The content :type content: str or buffer :return: document_id :rtype: str or buffer """ prov_document = form_string(content=content) return self.save_document(content=prov_document)
[docs] def get_document_as_json(self, document_id=None): """ Get a ProvDocument from the database based on the document_id :param document_id: document id :type document_id: str :return: ProvDocument as json string :rtype: str """ prov_document = self.get_document_as_prov(document_id=document_id) return to_json(prov_document)
[docs] def save_document_from_xml(self, content=None): """ Saves a prov document in the database based on the xml file :param content: The content :type content: str or buffer :return: document_id :rtype: str """ prov_document = form_string(content=content) return self.save_document(content=prov_document)
[docs] def get_document_as_xml(self, document_id=None): """ Get a ProvDocument from the database based on the document_id :param document_id: The id :type document_id: str :return: ProvDocument as XML string :rtype: str """ prov_document = self.get_document_as_prov(document_id=document_id) return to_xml(prov_document)
[docs] def save_document_from_provn(self, content=None): """ Saves a prov document in the database based on the provn string or buffer :param content: provn object :type content: str or buffer :return: Document_id :rtype: str """ prov_document = form_string(content=content) return self.save_document(content=prov_document)
[docs] def get_document_as_provn(self, document_id=None): """ Get a ProvDocument from the database based on the document_id :param document_id: The id :type document_id: str :return: ProvDocument :rtype: ProvDocument """ prov_document = self.get_document_as_prov(document_id=document_id) return to_provn(prov_document)
[docs] def save_document_from_prov(self, content=None): """ Saves a prov document in the database based on the prov document :param content: Prov document :type content: ProvDocument :return: document_id :rtype: str """ if not isinstance(content, ProvDocument): raise InvalidArgumentTypeException() return self.save_document(content=content)
# Methods that consume ProvDocument instances and produce ProvDocument instances
[docs] def save_document(self, content=None): """ The main method to Save a document in the db :param content: The content can be a xml, json or provn string or buffer or a ProvDocument instance :type content: str or buffer or ProvDocument :return: Document id :rtype: str """ # Try to convert the content into the provDocument, if it is already a ProvDocument instance the function will return this document try: content = form_string(content=content) except ParseException as e: raise InvalidArgumentTypeException(e) prov_document = content doc_id = self._save_bundle_internal(prov_document) for bundle in prov_document.bundles: self.save_bundle(prov_bundle=bundle) return doc_id
[docs] def get_document_as_prov(self, document_id=None): """ Get a ProvDocument from the database based on the document id :param document_id: The id :type document_id: str :return: Prov Document :rtype: ProvDocument """ if type(document_id) is not str: raise InvalidArgumentTypeException() filter_meta = dict() filter_prop = dict() filter_meta.update({document_id: True}) filter_prop.update({PROV_TYPE: PROV_BUNDLE}) bundle_entities = self._adapter.get_records_by_filter(metadata_dict=filter_meta, attributes_dict=filter_prop) document_records = self._adapter.get_records_by_filter(metadata_dict=filter_meta) # parse document prov_document = ProvDocument() for record in document_records: self._parse_record(prov_document, record) bundle_doc = ProvDocument()# Document with all bundle entities for bundle_record in bundle_entities: # skip if we got some relations instead of only the bundle nodes if str(PROV_TYPE) not in bundle_record.attributes: continue if str(bundle_record.attributes[str(PROV_TYPE)]) != str(PROV_BUNDLE): continue bundle_entity= self._parse_record(bundle_doc,bundle_record) prov_bundle = self.get_bundle(bundle_entity.identifier) prov_document.add_bundle(prov_bundle,identifier=bundle_entity.identifier) return prov_document
[docs] def save_element(self, prov_element, bundle_id=None): """ Saves a activity, entity, agent .. code:: python doc = ProvDocument() agent = doc.agent("ex:yourAgent") activity = doc.activity("ex:yourActivity") entity = doc.entity("ex:yourEntity") # Save the elements agent_id = prov_db.save_element(agent) activity_id = prov_db.save_element(activity) entity_id = prov_db.save_element(entity) :param prov_element: The ProvElement :type prov_element: prov.model.ProvElement :param bundle_id: :type bundle_id: str :return: Identifier of the element :rtype: prov.model.QualifiedName """ if not isinstance(prov_element, ProvElement): raise InvalidArgumentTypeException("Should be {} but was {}".format(ProvElement, type(prov_element))) (metadata, attributes) = self._get_metadata_and_attributes_for_record(prov_element, bundle_id=bundle_id) self._adapter.save_element(attributes=attributes, metadata=metadata) #Add bundle relation only if the record belongs to a bundle not to document if not isinstance(prov_element.bundle, ProvDocument): bundle_id_qualified = prov_element.bundle.valid_qualified_name(prov_element.bundle.identifier) self._create_bundle_association([prov_element], bundle_id_qualified) return prov_element.identifier
[docs] def get_elements(self, prov_element_cls): """ Return a document that contains the requested type .. code:: python from prov.model import ProvEntity, ProvAgent, ProvActivity document_with_all_entities = prov_db.get_elements(ProvEntity) document_with_all_agents = prov_db.get_elements(ProvAgent) document_with_all_activities = prov_db.get_elements(ProvActivity) print(document_with_all_entities) print(document_with_all_agents) print(document_with_all_activities) :param prov_element_cls: :return: Prov document :rtype prov.model.ProvDocument """ if prov_element_cls is ProvAgent: prov_type = PROV_AGENT elif prov_element_cls is ProvActivity: prov_type = PROV_ACTIVITY elif prov_element_cls is ProvEntity: prov_type = PROV_ENTITY else: raise InvalidArgumentTypeException("You provide a wrong type : {}".format(type(prov_element_cls))) meta_filter = dict() meta_filter.update({METADATA_KEY_PROV_TYPE: prov_type}) doc = ProvDocument() raw_results = self._adapter.get_records_by_filter(metadata_dict=meta_filter) for element in raw_results: if element.metadata[METADATA_KEY_PROV_TYPE] == str(prov_type): self._parse_record(doc,element) return doc
[docs] def get_element(self, identifier): """ Get a element (activity, agent, entity) from the database .. code:: python doc = ProvDocument() identifier = QualifiedName(doc, "ex:yourAgent") prov_element = prov_db.get_element(identifier) :param identifier: :type identifier: prov.model.QualifiedName :return: A prov Element class """ if not isinstance(identifier, QualifiedName): raise InvalidArgumentTypeException("Should be {} but was {}".format(QualifiedName, type(identifier))) # Include namespace uri into the identifier to support e.g. different default namespaces global_identifier = identifier.namespace.uri + identifier.localpart # Setup filter meta_filter = dict() meta_filter.update({METADATA_KEY_IDENTIFIER: global_identifier}) # Get the result results = self._adapter.get_records_by_filter(metadata_dict=meta_filter) # Check if there is some unexpected result if len(results) > 1: raise InvalidProvRecordException("Invalid data result, len should be only one, result was: {}".format(list(results))) if len(results) == 0: raise NotFoundException("Can't find the element with identifier {}".format(identifier)) # get the lonely element in the list element = list(results).pop() doc = ProvDocument() return self._parse_record(doc,element)
[docs] def save_record(self, prov_record, bundle_id=None): """ Saves a realtion or a element (Entity, Agent or Activity) .. code:: python doc = ProvDocument() agent = doc.agent("ex:Alice") ass_rel = doc.association("ex:Alice", "ex:Bob") # Save the elements agent_id = prov_db.save_record(agent) relation_id = prov_db.save_record(ass_rel) :param prov_record: The prov record :type prov.model.ProvRecord :param bundle_id: The bundle id that you got back if you created a bundle or document :type str :return: """ if not isinstance(prov_record,ProvRecord): raise InvalidArgumentTypeException("Wrong type, expected: {}, got {}".format(type(ProvRecord), type(prov_record))) if isinstance(prov_record,ProvRelation): return self.save_relation(prov_relation=prov_record,bundle_id=bundle_id) elif isinstance(prov_record,ProvElement): return self.save_element(prov_element=prov_record,bundle_id=bundle_id) else: raise InvalidArgumentTypeException("Oh no... you provided a not supported prov_record type. The type was: {}".format(type(prov_record)))
@staticmethod def _parse_record(prov_bundle, raw_record): """ This method Saves a ProvRecord in the ProvBundle based on the raw database response :param prov_bundle: ProvBundle instance :type prov_bundle: ProvBundle :param raw_record: DbRelation or DbRecord instance as (namedtuple) :type raw_record: DbRelation or DbRecord :return ProvRecord :rtype prov.model.ProvRecord """ # check if record belongs to this bundle prov_type = raw_record.metadata[METADATA_KEY_PROV_TYPE] prov_type = prov_bundle.valid_qualified_name(prov_type) # skip connections between bundle entities and all records that belong to the bundle prov_label = raw_record.attributes.get(str(PROV_LABEL)) if prov_label is not None and prov_label == "belongsToBundle": return prov_id = raw_record.metadata[METADATA_KEY_IDENTIFIER_ORIGINAL] prov_id_qualified = prov_bundle.valid_qualified_name(prov_id) # skip record if prov:identifier starts with "prov:Unknown" if str(prov_id).startswith("prov:Unknown-"): return # set identifier only if it is not a prov type if prov_id_qualified == prov_type: prov_id = None # get type map type_map = raw_record.metadata[METADATA_KEY_TYPE_MAP] if type(type_map) is str: io = StringIO(type_map) type_map = json.load(io) elif type(type_map) is list: type_map_list = type_map type_map = dict() for type_str in type_map_list: if type(type_str) is not str: raise InvalidArgumentTypeException("The type_map must be a string got: {}".format(type_str)) io = StringIO(type_str) type_map.update(json.load(io)) elif type(type_map) is not dict: raise InvalidArgumentTypeException("The type_map must be a dict or json string got: {}".format(type_map)) add_namespaces_to_bundle(prov_bundle, raw_record.metadata) return create_prov_record(prov_bundle, prov_type, prov_id, raw_record.attributes, type_map)
[docs] def get_bundle(self, identifier): """ Returns the whole bundle for the provided identifier .. code:: python doc = ProvDocument() bundle_name = doc.valid_qualified_name("ex:YourBundleName") # get the bundle prov_bundle = prov_db.get_bundle(bundle_name) doc.add_bundle(prov_bundle) :param identifier: The identifier :type identifier: prov.model.QualifiedName :return: The prov bundle instance :rtype prov.model.ProvBundle """ if not isinstance(identifier, QualifiedName): raise InvalidArgumentTypeException() bundle_entity = self.get_element(identifier) doc = ProvDocument() doc.add_record(bundle_entity)#Add bundle entity to document prov_bundle = doc.bundle(identifier=bundle_entity.identifier) # Include namespace uri into the identifier to support e.g. different default namespaces global_identifier = identifier.namespace.uri + identifier.localpart bundle_records = self._adapter.get_bundle_records(global_identifier) for record in bundle_records: self._parse_record(prov_bundle, record) return prov_bundle
[docs] def save_bundle(self,prov_bundle): """ Public method to save a bundle .. code:: python doc = ProvDocument() bundle = doc.bundle("ex:bundle1") # Save the bundle prov_db.save_bundle(bundle) :param prov_bundle: :type prov_bundle: prov.model.ProvBundle :return: """ if not isinstance(prov_bundle, ProvBundle): raise InvalidArgumentTypeException() if isinstance(prov_bundle, ProvDocument): raise InvalidArgumentTypeException() # create bundle entity bundle_record = ProvEntity(prov_bundle.document, identifier=prov_bundle.identifier, attributes={PROV_TYPE: PROV_BUNDLE}) self.save_element(prov_element=bundle_record) return self._save_bundle_internal(prov_bundle)
def _save_bundle_internal(self, prov_bundle): """ Private method to create a bundle in the database :param prov_bundle: ProvBundle :type prov_bundle: prov.model.ProvBundle :return bundle_id: The bundle from the database adapter :rtype: str """ if not isinstance(prov_bundle, ProvBundle): raise InvalidArgumentTypeException() bundle_id = str(uuid4()) # create nodes for record in prov_bundle.get_records(ProvElement): self.save_element(prov_element=record, bundle_id=bundle_id) # create relations for relation in prov_bundle.get_records(ProvRelation): self.save_relation(relation, bundle_id) return bundle_id
[docs] def save_relation(self, prov_relation, bundle_id=None): """ Saves a relation between 2 nodes that are already in the database. .. code:: python doc = ProvDocument() activity = doc.activity("ex:yourActivity") entity = doc.entity("ex:yourEntity") wasGeneratedBy = entity.wasGeneratedBy("ex:yourAgent") # Save the elements rel_id = prov_db.save_relation(wasGeneratedBy) :param prov_relation: The ProvRelation instance :type prov_relation: ProvRelation :param bundle_id :type bundle_id: str :return: Relation id :rtype: str """ if not isinstance(prov_relation, ProvRelation): raise InvalidArgumentTypeException( "prov_relation was {}, expected: {}".format(type(prov_relation), type(ProvRelation))) # get from and to node from_tuple, to_tuple = prov_relation.formal_attributes[:2] from_qualified_name = from_tuple[1] to_qualified_name = to_tuple[1] # if target or origin record is unknown, save node "Unknown" if from_qualified_name is None: uid = uuid4() from_qualified_name = prov_relation.bundle.valid_qualified_name("prov:Unknown-{}".format(uid)) del uid if to_qualified_name is None: uid = uuid4() to_qualified_name = prov_relation.bundle.valid_qualified_name("prov:Unknown-{}".format(uid)) del uid # Ensure that the from and to node exists relation_cls = PROV_REC_CLS[prov_relation.get_type()] from_type,to_type = relation_cls.FORMAL_ATTRIBUTES[:2] # get the class types from_type_cls = PROV_ATTR_BASE_CLS[from_type] to_type_cls = PROV_ATTR_BASE_CLS[to_type] if from_type_cls is None or to_type_cls is None: raise InvalidArgumentTypeException( "Could not determinate typ for relation from: {}, to: {}, prov_relation was {}, ".format(from_type, to_type, type(prov_relation))) #save from and to node self.save_element(prov_element=from_type_cls(prov_relation.bundle, identifier=from_qualified_name), bundle_id=bundle_id) to_bundle = prov_relation.bundle # If it is a link between bundle the to node not belongs to the current bundle, the to node belongs only to the bundle defined as FORMAL_ATTR[3] if prov_relation.get_type() is PROV_MENTION: #Try to get the destination bundle to_bundle_identifier = list(prov_relation.get_attribute(PROV_ATTR_BUNDLE)).pop() if not isinstance(to_bundle_identifier,QualifiedName): raise InvalidProvRecordException("Should be a qualified name {}, mention: {}".format(to_bundle_identifier, prov_relation)) # Create the bundle, it will be automatically created during the save_element method to_bundle = ProvBundle(identifier=to_bundle_identifier) self.save_element(prov_element=to_type_cls(to_bundle, identifier=to_qualified_name), bundle_id=bundle_id) # split metadata and attributes (metadata, attributes) = self._get_metadata_and_attributes_for_record(prov_relation) # Include namespace uri into the identifier to support e.g. different default namespaces global_from_qualified_name = from_qualified_name.namespace.uri + from_qualified_name.localpart global_to_qualified_name = to_qualified_name.namespace.uri + to_qualified_name.localpart return self._adapter.save_relation(global_from_qualified_name, global_to_qualified_name, attributes, metadata)
def _create_bundle_association(self, prov_elements, prov_bundle_identifier): """ This method saves a relation between the bundle entity and all nodes in the bundle :param prov_bundle_identifier: The bundle identifier :type prov_bundle_identifier: QualifiedName :param prov_elements: List of prov elements :type prov_elements: list """ # Ensure that the bundle entity exist doc = ProvDocument() to_bundle = ProvBundle(document=doc,identifier=prov_bundle_identifier) self.save_bundle(to_bundle) # Save the empty bundle to create the bundle entity if necessary belong_relation = ProvAssociation(bundle=to_bundle, identifier=None, attributes={PROV_TYPE: "prov:bundleAssociation"}) (belong_metadata, belong_attributes) = self._get_metadata_and_attributes_for_record(belong_relation) to_qualified_name = prov_bundle_identifier for record in prov_elements: (metadata, attributes) = self._get_metadata_and_attributes_for_record(record) from_qualified_name = metadata[METADATA_KEY_IDENTIFIER] global_prov_to_identifier= to_qualified_name.namespace.uri + to_qualified_name.localpart self._adapter.save_relation(from_qualified_name, global_prov_to_identifier, belong_attributes, belong_metadata) def _save_bundle_links(self, prov_bundle): """ This function saves the links between nodes in bundles, see https://www.w3.org/TR/prov-links/ :param prov_bundle: For this bundle we will create the links :type prov_bundle: ProvBundle """ for mention in prov_bundle.get_records(ProvRelation): if mention.get_type() is not PROV_MENTION: continue self.save_relation(mention) @staticmethod def _get_metadata_and_attributes_for_record(prov_record, bundle_id=None): """ This function generate some meta data for the record for example: * Namespaces: The prov_record use several namespaces and the metadata contain this namespaces * Type_Map: The type map is important to get exactly the same document back, you have to save this information (like what attribute is a datetime) :param prov_record: The ProvRecord (ProvRelation or ProvElement) :type prov_record: ProvRecord :param bundle_id: The id of the document :type bundle_id: str :return: namedtuple(metadata, attributes) :rtype: namedtuple """ if not isinstance(prov_record, ProvRecord): raise InvalidArgumentTypeException() used_namespaces = dict() bundle = prov_record.bundle prov_type = prov_record.get_type() prov_identifier = prov_record.identifier if prov_type is None and isinstance(prov_record, ProvRecord): prov_type = bundle.valid_qualified_name("prov:Unknown") # if relation without identifier -> use prov_type as identifier if prov_identifier is None and prov_record.identifier is None: prov_identifier = prov_type # Be sure that the prov_identifier is a qualified name instance if not isinstance(prov_identifier, QualifiedName): qualified_name = bundle.valid_qualified_name(prov_identifier) if qualified_name is None: raise InvalidProvRecordException( "The prov record {} is invalid because the prov_identifier {} can't be qualified".format( prov_record, prov_identifier)) else: prov_identifier = qualified_name # extract namespaces from record # add namespace from prov_type namespace = prov_type.namespace used_namespaces.update(serialize_namespace(namespace)) # add namespace from prov identifier namespace = prov_identifier.namespace used_namespaces.update(serialize_namespace(namespace)) attributes = dict(prov_record.attributes.copy()) for key, value in attributes.items(): # ensure key is QualifiedName if isinstance(key, QualifiedName): namespace = key.namespace used_namespaces.update(serialize_namespace(namespace)) else: raise InvalidProvRecordException("Not support key type {}".format(type(key))) # try to add if isinstance(value, QualifiedName): namespace = value.namespace used_namespaces.update(serialize_namespace(namespace)) else: qualified_name = bundle.valid_qualified_name(value) if qualified_name is not None: # Don't update the attribute, so we only save the namespace instead of the attribute as a qualified name. # For some reason the prov-library allow a string with a schema: <namespace_prefix>:<identifier> # This line cause an error during the test: "test_primer_example_alternate" # attributes[key] = qualified_name # update attribute namespace = qualified_name.namespace used_namespaces.update(serialize_namespace(namespace)) # create type dict types_dict = dict() for key, value in attributes.items(): if key not in PROV_ATTRIBUTES: return_type = encode_json_representation(value) if return_type is not None: types_dict.update({str(key): return_type}) # Include namespace uri into the identifier to support e.g. different default namespaces global_prov_identifier = prov_identifier.namespace.uri + prov_identifier.localpart metadata = { METADATA_KEY_PROV_TYPE: prov_type, METADATA_KEY_IDENTIFIER: global_prov_identifier, METADATA_KEY_IDENTIFIER_ORIGINAL: prov_identifier, METADATA_KEY_NAMESPACES: used_namespaces, METADATA_KEY_TYPE_MAP: types_dict } # Add document id to metadata, to restore the if bundle_id: metadata.update({bundle_id: True}) meta_and_attributes = namedtuple("MetaAndAttributes", "metadata, attributes") return meta_and_attributes(metadata, attributes)