Source code for provdbconnector.db_adapters.neo4j.neo4jadapter

import os
import provdbconnector.db_adapters.neo4j.cypher_commands as cypher_commands
from provdbconnector.db_adapters.baseadapter import BaseAdapter
from provdbconnector.db_adapters.baseadapter import METADATA_KEY_PROV_TYPE, METADATA_KEY_TYPE_MAP, \
    METADATA_KEY_IDENTIFIER, METADATA_KEY_NAMESPACES

from provdbconnector.exceptions.database import InvalidOptionsException, AuthException, \
    DatabaseException, CreateRecordException, NotFoundException, CreateRelationException, MergeException

from neo4j.v1.exceptions import ProtocolError
from neo4j.v1 import GraphDatabase, basic_auth, Relationship
from prov.constants import PROV_N_MAP
from collections import namedtuple
from provdbconnector.utils.serializer import encode_string_value_to_primitive, encode_dict_values_to_primitive, \
    split_into_formal_and_other_attributes

import logging

logging.getLogger("neo4j.bolt").setLevel(logging.WARN)
log = logging.getLogger(__name__)

NEO4J_USER = os.environ.get('NEO4J_USERNAME', 'neo4j')
NEO4J_PASS = os.environ.get('NEO4J_PASSWORD', 'neo4jneo4j')
NEO4J_HOST = os.environ.get('NEO4J_HOST', 'localhost')
NEO4J_BOLT_PORT = os.environ.get('NEO4J_BOLT_PORT', '7687')
NEO4J_HTTP_PORT = os.environ.get('NEO4J_HTTP_PORT', '7474')

NEO4J_META_PREFIX = "meta:"



[docs]class Neo4jAdapter(BaseAdapter): """ This is the neo4j adapter to store prov. data in a neo4j database """ def __init__(self, *args): """ Setup the class :param args: None """ super(Neo4jAdapter, self).__init__() self.driver = None pass def _create_session(self): """ Get a session from the driver :return: Session :rtype Session """ try: session = self.driver.session() except OSError as e: raise AuthException(e) if not session.healthy: raise AuthException() return session
[docs] def connect(self, authentication_options): """ The connect method to create a new instance of the db_driver :param authentication_options: Username, password and host :return: None :rtype: None :raises: InvalidOptionsException """ if authentication_options is None: raise InvalidOptionsException() user_name = authentication_options.get("user_name") user_pass = authentication_options.get("user_password") host = authentication_options.get("host") if user_name is None or user_pass is None or host is None: raise InvalidOptionsException() try: self.driver = GraphDatabase.driver("bolt://{}".format(host), auth=basic_auth(user_name, user_pass)) except ProtocolError as e: raise InvalidOptionsException(e) self._create_session()
@staticmethod def _prefix_metadata(metadata): """ Prefix all keys of a dict, only for the neo4j adapter :param metadata: :return: A dict with prefixed keys """ prefixed_metadata = dict() for key, value in metadata.items(): prefixed_metadata["{meta_prefix}{key}".format(key=key, meta_prefix=NEO4J_META_PREFIX)] = value return prefixed_metadata @staticmethod def _parse_to_primitive_attributes(attributes, prefixed_metadata): """ Convert the dict values and keys into a neo4j friendly type (dict=>json, list,int,float, QualifiedName=>str, datetime=>str) :param attributes: :param prefixed_metadata: :return: """ all_attributes = attributes.copy() all_attributes.update(prefixed_metadata) db_attributes = dict() # transform values for key, value in all_attributes.items(): key_primitive = encode_string_value_to_primitive(key) value_primitive = encode_string_value_to_primitive(value) db_attributes.update({key_primitive: value_primitive}) return db_attributes @staticmethod def _get_attributes_identifiers_cypher_string(key_list): """ This function return a cypher string with all keys as cypher parameters :param key_list: :return: """ db_attributes_identifiers = map(lambda key: "`{}`: {{`{}`}}".format(key, key), key_list) return ",".join(db_attributes_identifiers) @staticmethod def _get_attributes_set_cypher_string(key_list, cypher_template=cypher_commands.NEO4J_CREATE_NODE_SET_PART): """ Returns a set cypher command for all keys of the keylist :param key_list: :param cypher_template: :return: """ statements = list() for key in key_list: statements.append(cypher_template.format(attr_name=key)) return " ".join(statements)
[docs] def save_element(self, attributes, metadata): """ Saves a single record :param attributes: The attributes dict :type attributes: dict :param metadata: The metadata dict :type metadata: dict :return: The id of the record :rtype: str """ metadata = metadata.copy() prefixed_metadata = self._prefix_metadata(metadata) # setup merge attributes (formal_attributes, other_attributes) = split_into_formal_and_other_attributes(attributes, metadata) merge_relevant_keys = list() merge_relevant_keys.append("meta:{}".format(METADATA_KEY_IDENTIFIER)) merge_relevant_keys = merge_relevant_keys + list(formal_attributes.keys()) other_db_attribute_keys = list() other_db_attribute_keys = other_db_attribute_keys + list(other_attributes.keys()) other_db_attribute_keys = other_db_attribute_keys + list(prefixed_metadata.keys()) # get set statement for non formal attributes attr_for_simple_set = other_db_attribute_keys.copy() attr_for_simple_set.remove("meta:" + METADATA_KEY_NAMESPACES) attr_for_simple_set.remove("meta:" + METADATA_KEY_TYPE_MAP) cypher_set_statement = self._get_attributes_set_cypher_string(attr_for_simple_set) attr_for_list_merge = list() attr_for_list_merge.append("meta:" + METADATA_KEY_NAMESPACES) attr_for_list_merge.append("meta:" + METADATA_KEY_TYPE_MAP) cypher_set_statement += self._get_attributes_set_cypher_string(attr_for_list_merge, cypher_commands.NEO4J_CREATE_NODE_SET_PART_MERGE_ATTR) # get CASE WHEN ... statement to check if a attribute is different cypher_merge_check_statement = self._get_attributes_set_cypher_string(attr_for_simple_set, cypher_commands.NEO4J_CREATE_NODE_MERGE_CHECK_PART) # get cypher string for the merge relevant attributes cypher_merge_relevant_str = self._get_attributes_identifiers_cypher_string(merge_relevant_keys) # get prov type provtype = metadata[METADATA_KEY_PROV_TYPE] # get db_attributes as dict db_attributes = self._parse_to_primitive_attributes(attributes, prefixed_metadata) session = self._create_session() command = cypher_commands.NEO4J_CREATE_NODE_RETURN_ID.format(label=provtype.localpart, formal_attributes=cypher_merge_relevant_str, set_statement=cypher_set_statement, merge_check_statement=cypher_merge_check_statement) with session.begin_transaction() as tx: result = tx.run(command, dict(db_attributes)) record_id = None merge_success = 0 for record in result: record_id = record["ID"] merge_success = record["check"] if record_id is None: raise CreateRecordException("No ID property returned by database for the command {}".format(command)) if merge_success == 0: tx.success = True else: tx.success = False raise MergeException( "The attributes {other} could not merged into the existing node, All attributes: {all} ".format( other=other_db_attribute_keys, all=db_attributes)) return str(record_id)
[docs] def save_relation(self, from_node, to_node, attributes, metadata): """ Save a single relation :param from_node: The from node as QualifiedName :type from_node: QualifiedName :param to_node: The to node as QualifiedName :type to_node: QualifiedName :param attributes: The attributes dict :type attributes: dict :param metadata: The metadata dict :type metadata: dict :return: Id of the relation :rtype: str """ metadata = metadata.copy() prefixed_metadata = self._prefix_metadata(metadata) # setup merge attributes (formal_attributes, other_attributes) = split_into_formal_and_other_attributes(attributes, metadata) merge_relevant_keys = list() merge_relevant_keys.append("meta:{}".format(METADATA_KEY_IDENTIFIER)) merge_relevant_keys = merge_relevant_keys + list(formal_attributes.keys()) other_db_attribute_keys = list() other_db_attribute_keys = other_db_attribute_keys + list(other_attributes.keys()) other_db_attribute_keys = other_db_attribute_keys + list(prefixed_metadata.keys()) # get set statement for non formal attributes # Remove namespace and type_map from the direct set statement, because this attributes need to be merged attr_for_simple_set = other_db_attribute_keys.copy() attr_for_simple_set.remove("meta:" + METADATA_KEY_NAMESPACES) attr_for_simple_set.remove("meta:" + METADATA_KEY_TYPE_MAP) cypher_set_statement = self._get_attributes_set_cypher_string(attr_for_simple_set) # Add separate cypher command to merge the namespaces and tpye map into a list attr_for_list_merge = list() attr_for_list_merge.append("meta:" + METADATA_KEY_NAMESPACES) attr_for_list_merge.append("meta:" + METADATA_KEY_TYPE_MAP) cypher_set_statement += self._get_attributes_set_cypher_string(attr_for_list_merge, cypher_commands.NEO4J_CREATE_NODE_SET_PART_MERGE_ATTR) # get CASE WHEN ... statement to check if a attribute is different cypher_merge_check_statement = self._get_attributes_set_cypher_string(attr_for_simple_set, cypher_commands.NEO4J_CREATE_NODE_MERGE_CHECK_PART) # get cypher string for the merge relevant attributes cypher_merge_relevant_str = self._get_attributes_identifiers_cypher_string(merge_relevant_keys) # get db_attributes as dict db_attributes = self._parse_to_primitive_attributes(attributes, prefixed_metadata) session = self._create_session() relationtype = PROV_N_MAP[metadata[METADATA_KEY_PROV_TYPE]] command = cypher_commands.NEO4J_CREATE_RELATION_RETURN_ID.format(from_identifier=str(from_node), to_identifier=str(to_node), relation_type=relationtype, formal_attributes=cypher_merge_relevant_str, merge_check_statement=cypher_merge_check_statement, set_statement=cypher_set_statement ) with session.begin_transaction() as tx: result = tx.run(command, dict(db_attributes)) record_id = None merge_success = 0 for record in result: record_id = record["ID"] merge_success = record["check"] if record_id is None: raise CreateRelationException("No ID property returned by database for the command {}".format(command)) if merge_success == 0: tx.success = True else: tx.success = False raise MergeException("The attributes {other} could not merged into the existing node ".format( other=other_db_attribute_keys)) return str(record_id)
@staticmethod def _split_attributes_metadata_from_node(db_node): """ This functions splits a db node back into attributes and metadata, based on the prefix :param db_node: :type db_node: dict :return: namedTuple(attributes,metadata) """ record = namedtuple('Record', 'attributes, metadata') # split data metadata = {k.replace(NEO4J_META_PREFIX, ""): v for k, v in db_node.properties.items() if k.startswith(NEO4J_META_PREFIX, 0, len(NEO4J_META_PREFIX))} attributes = {k: v for k, v in db_node.properties.items() if not k.startswith(NEO4J_META_PREFIX, 0, len(NEO4J_META_PREFIX))} # convert a list of namespace into a string if it is only one item # @todo Kind of a hack to pass all test, it is also allowed to return a list of JSON encoded strings namespaces = metadata[METADATA_KEY_NAMESPACES] if isinstance(namespaces, list): # If len is 1 return only the raw JSON string if len(namespaces) is 1: metadata.update({METADATA_KEY_NAMESPACES: namespaces.pop()}) # convert a list of namespace into a string if it is only one item # @todo Kind of a hack to pass all test, it is also allowed to return a list of JSON encoded strings type_map = metadata[METADATA_KEY_TYPE_MAP] if isinstance(type_map, list): # If len is 1 return only the raw JSON string if len(type_map) is 1: metadata.update({METADATA_KEY_TYPE_MAP: type_map.pop()}) record = record(attributes, metadata) return record def _get_cypher_filter_params(self, properties_dict, metadata_dict): """ This functions returns a tuple with the cypher_str for the cypher filter and the right parameter names :param properties_dict: Search dict :param metadata_dict: Seacrh dict :return: Tuple(Keys with metadata prefix (if necessary), cypher filter str ) """ metadata_dict_prefixed = {"meta:{}".format(k): v for k, v in metadata_dict.items()} # Merge the 2 dicts into one filter = properties_dict.copy() filter.update(metadata_dict_prefixed) encoded_params = encode_dict_values_to_primitive(filter) cypher_str = self._get_attributes_identifiers_cypher_string(filter.keys()) return encoded_params, cypher_str
[docs] def get_records_by_filter(self, attributes_dict=None, metadata_dict=None): """ Return the records by a certain filter :param attributes_dict: Filter dict :type attributes_dict: dict :param metadata_dict: Filter dict for metadata :type metadata_dict: dict :return: list of all nodes and relations that fit the conditions :rtype: list(DbRecord and DbRelation) """ if attributes_dict is None: attributes_dict = dict() if metadata_dict is None: metadata_dict = dict() (encoded_params, cypher_str) = self._get_cypher_filter_params(attributes_dict, metadata_dict) session = self._create_session() records = list() result_set = session.run(cypher_commands.NEO4J_GET_RECORDS_BY_PROPERTY_DICT.format(filter_dict=cypher_str), encoded_params) for result in result_set: record = result["re"] if record is None: raise DatabaseException("Record response should not be None") relation_record = self._split_attributes_metadata_from_node(record) records.append(relation_record) return records
[docs] def get_records_tail(self, attributes_dict=None, metadata_dict=None, depth=None): """ Return all connected nodes form the origin. :param attributes_dict: Filter dict :type attributes_dict: dict :param metadata_dict: Filter dict for metadata :type metadata_dict: dict :param depth: Max steps :return: list of all nodes and relations that fit the conditions :rtype: list(DbRecord and DbRelation) """ if attributes_dict is None: attributes_dict = dict() if metadata_dict is None: metadata_dict = dict() (encoded_params, cypher_str) = self._get_cypher_filter_params(attributes_dict, metadata_dict) depth_str = "" if depth is not None: depth_str = "1..{max}".format(max=depth) session = self._create_session() result_set = session.run(cypher_commands.NEO4J_GET_RECORDS_TAIL_BY_FILTER.format(filter_dict=cypher_str, depth=depth_str), encoded_params) records = list() for result in result_set: record = result["re"] if record is None: raise DatabaseException("Record response should not be None") relation_record = self._split_attributes_metadata_from_node(record) records.append(relation_record) return records
[docs] def get_bundle_records(self, bundle_identifier): """ Return all records and relations for the bundle :param bundle_identifier: :return: """ session = self._create_session() result_set = session.run(cypher_commands.NEO4J_GET_BUNDLE_RECORDS, {'meta:{}'.format(METADATA_KEY_IDENTIFIER): str(bundle_identifier)}) records = list() for result in result_set: record = result["re"] if record is None: raise DatabaseException("Record response should not be None") relation_record = self._split_attributes_metadata_from_node(record) records.append(relation_record) return records
[docs] def get_record(self, record_id): """ Try to find the record in the database :param record_id: :return: DbRecord :rtype: DbRecord """ session = self._create_session() result_set = session.run(cypher_commands.NEO4J_GET_RECORD_RETURN_NODE, {"record_id": int(record_id)}) node = None for result in result_set: if node is not None: raise DatabaseException( "get_record should return only one node for the id {}, command {}".format(record_id, cypher_commands.NEO4J_GET_RECORD_RETURN_NODE)) node = result["node"] if node is None: raise NotFoundException("We cant find the node with the id: {}, database command {}".format(record_id, cypher_commands.NEO4J_GET_RECORD_RETURN_NODE)) return self._split_attributes_metadata_from_node(node)
[docs] def get_relation(self, relation_id): """ Get a relation :param relation_id: :return: The relation :rtype: DbRelation """ session = self._create_session() result_set = session.run(cypher_commands.NEO4J_GET_RELATION_RETURN_NODE, {"relation_id": int(relation_id)}) relation = None for result in result_set: if not isinstance(result["relation"], Relationship): raise DatabaseException( " should return only relationship {}, command {}".format(relation_id, cypher_commands.NEO4J_GET_RECORD_RETURN_NODE)) relation = result["relation"] if relation is None: raise NotFoundException("We cant find the relation with the id: {}, database command {}".format(relation_id, cypher_commands.NEO4J_GET_RECORD_RETURN_NODE)) return self._split_attributes_metadata_from_node(relation)
[docs] def delete_records_by_filter(self, attributes_dict=None, metadata_dict=None): """ Delete records and relations by a filter :param attributes_dict: :param metadata_dict: :return: """ if attributes_dict is None: attributes_dict = dict() if metadata_dict is None: metadata_dict = dict() (encoded_params, cypher_str) = self._get_cypher_filter_params(attributes_dict, metadata_dict) session = self._create_session() session.run(cypher_commands.NEO4J_DELETE_NODE_BY_PROPERTIES.format(filter_dict=cypher_str), encoded_params) return True
[docs] def delete_record(self, record_id): """ Delete a single record :param record_id: :return: """ session = self._create_session() session.run(cypher_commands.NEO4J_DELETE__NODE_BY_ID, {"node_id": int(record_id)}) return True
[docs] def delete_relation(self, relation_id): """ Delete a single relation :param relation_id: :return: """ session = self._create_session() session.run(cypher_commands.NEO4J_DELETE_RELATION_BY_ID, {"relation_id": int(relation_id)}) return True