import logging from typing import List, Tuple, Dict import psycopg2 from psycopg2.errorcodes import UNIQUE_VIOLATION class DBAdapter: EXPECTED_TABLES = ['art', 'artist', 'tag', 'topic', 'presence', 'artist_to_topic', 'art_to_presence', 'art_to_tag', 'tag_alias', 'tag_implication', 'tag_category', 'art_collection', 'art_to_art_collection'] def __init__(self, user: str, password: str, host: str = "localhost", port: int = 5432, database: str = "artnet"): self.db = None self.db_cursor = None try: self.db = psycopg2.connect(host=host, port=port, database=database, user=user, password=password, connect_timeout=3) self.db_cursor = self.db.cursor() except psycopg2.OperationalError as e: raise ValueError("Invalid DB credentials!") logging.debug("DB connection established to {0}:{1}/{2}".format(host, port, database)) if not self.check_tables(): logging.debug("Database schema check has failed! The expected tables have not been present.") raise ValueError("Invalid DB schema!") def check_tables(self) -> bool: """ Check if all the expected tables are present in the connected database. :return: """ try: self.db_cursor.execute("select relname from pg_class where relkind='r' and relname !~ '^(pg_|sql_)';") present_tables = self.db_cursor.fetchall() except psycopg2.DatabaseError as e: logging.error(f"Database error occured on check_tables(): {e}") self.db.rollback() return False unknown_tables = [] missing = [x for x in DBAdapter.EXPECTED_TABLES] for table in present_tables: table = table[0] if table not in DBAdapter.EXPECTED_TABLES: unknown_tables.append(table) else: missing.remove(table) if len(missing) > 0: logging.error("The following tables are missing from the currently connected database: {0}".format(missing)) return False if len(unknown_tables) > 0: logging.error( "The following tables are unknown and not expected inside the database: {0}".format(unknown_tables)) return True def save_image(self, ID: str, title: str, authors: list, path: str, tags: list, link: str, md5_hash: str, desc: str, collections: list): """ Updates or saves the given image data to the DB :param ID: :param title: :param authors: list of presences List[Tuple[str, str]] :param path: :param tags: list of tag names List[str] :param link: :param md5_hash: md5 hash as a hex digest :param desc: image description or None for empty :param collections: list of collections included by the image :return: """ logging.debug("Saving Image {0}:{1} authors: {2} path: {3} tags: {4} link: {5} hash:{6} desc:{7}" .format(ID, title, authors, path, [t["name"] for t in tags], link, md5_hash, desc)) d = {"title": title, "path": path, "id": ID, "link": link, "hash": md5_hash, "desc": desc} # TODO implement try/except for database errors and rollback! if self.get_art_by_path(path) is None: if ID is None: self.db_cursor.execute( "INSERT INTO art (path, title, link, md5_hash, description) " "VALUES (%(path)s, %(title)s, %(link)s, %(hash)s, %(desc)s)", d) self.db.commit() ID = self.get_art_by_path(path)["ID"] d["id"] = ID elif self.get_tag_by_ID(ID) is None: self.db_cursor.execute( "INSERT INTO art (path, title, link, md5_hash, description) " "VALUES (%(path)s, %(title)s, %(link)s, %(hash)s, %(desc)s)", d) self.db.commit() else: self.db_cursor.execute("UPDATE art SET path = %(path)s, title = %(title)s, link = %(link)s, " + "md5_hash = %(hash)s, description = %(desc)s WHERE id = %(id)s", d) self.db.commit() if ID is None: ID = self.get_art_by_path(path)["ID"] assert (ID != None) # was checked before, should never fail old_tags = [self.get_tag_by_ID(x)["name"].strip() for x in self.get_art_tags_by_ID(ID)] for tag in tags: if tag["name"] in old_tags: # already present continue d = {"id": ID, "tag": tag["id"]} try: self.db_cursor.execute("INSERT INTO art_to_tag (art_id, tag_ID) VALUES (%(id)s, %(tag)s)", d) self.db.commit() except psycopg2.Error as e: if e.pgcode == UNIQUE_VIOLATION: # tag was already set logging.debug(e) logging.info("Skipping Unique Violation ...") self.db.rollback() else: raise e for old_tag in old_tags: is_in_tags = False for t in tags: if t["name"] == old_tag: is_in_tags = True if not is_in_tags: # need to remove old tag self.remove_tag_from_image(art_ID=ID, tag_ID=self.get_tag_ID(old_tag)) old_authors = self.get_authors_of_art_by_ID(ID) for author in authors: if author in old_authors: continue d = {"id": ID, "author_name": author[0], "author_domain": author[1]} self.db_cursor.execute("INSERT INTO art_to_presence (presence_name, presence_domain, art_ID) " + "VALUES (%(author_name)s, %(author_domain)s, %(id)s)", d) self.db.commit() for old_author_name, old_author_domain in old_authors: if (old_author_name, old_author_domain) not in authors: # need to remove old author self.remove_artist_from_image(art_ID=ID, presence_name=old_author_name, presence_domain=old_author_domain) self.db.commit() def remove_image(self, hash: str): """ Removes an image completely from the database. Does not affect tags or artists but removes art-tag, art-artist relations :param hash: md5 hash of the image to be deleted :return: """ image_data = self.get_art_by_hash(hash) art_ID = image_data["ID"] logging.debug(f"Deleting image #{art_ID} {image_data['title']}") tags = self.get_art_tags_by_ID(art_ID) # for tag_ID in tags: # pass # self.remove_tag_from_image(art_ID=art_ID, tag_ID=tag_ID) authors = self.get_authors_of_art_by_ID(art_ID) # for presence_name, presence_domain in authors: # pass # self.remove_artist_from_image(art_ID=art_ID, presence_name=presence_name, presence_domain=presence_domain) # self.db_cursor.execute("DELETE FROM art WHERE md5_hash = %(hash)s", {"hash": hash}) def remove_artist_from_image(self, art_ID, presence_name, presence_domain): """ Removes the relationship between an artist and the image :param art_ID: :param presence_name: :param presence_domain: :return: """ d = {"name": presence_name, "domain": presence_domain, "id": art_ID} self.db_cursor.execute("DELETE FROM art_to_presence " + "WHERE presence_name = %(name)s and " + "presence_domain = %(domain)s and art_ID = %(id)s", d) def remove_tag_from_image(self, art_ID, tag_ID): """ Remove the relationship between a tag and the image :param art_ID: :param tag_ID: :return: """ d = {"id": art_ID, "tag": tag_ID} self.db_cursor.execute("DELETE FROM art_to_tag WHERE art_id = %(id)s and tag_id = %(tag)s", d) def save_presence(self, name: str, domain: str, artist_ID: int, link: str): """ Save the presence entry with the given data. If it doesn't exist a new one will be created. If the artist ID is unknown an exception will be thrown :param name: :param domain: :param artist_ID: :param link: :return: """ logging.debug("Saving Presence {0}:{1} Artist: {2} Link: {3}".format(name, domain, artist_ID, link)) artist = self.get_artist(artist_ID) if artist is None or len(artist) == 0: raise Exception("Unknown Artist to create/update Presence with!") presence = self.get_presence(name, domain) d = {"name": name, "domain": domain, "artist": artist_ID, "link": link} if len(presence) == 0: # presence does not exist yet self.db_cursor.execute("INSERT INTO presence (name, domain, artist_ID, link) " + "VALUES (%(name)s, %(domain)s, %(artist)s, %(link)s)", d) else: # presence exists, update it self.db_cursor.execute("UPDATE presence SET artist_ID = %(artist)s, link = %(link)s" + "WHERE name = %(name)s and domain = %(domain)s", d) self.db.commit() def remove_presence(self, name: str, domain: str): """ Remove a presence from the database :return: """ logging.debug("Removing Presence {0}:{1}".format(name, domain)) d = {"name": name, "domain": domain} self.db_cursor.execute("DELETE FROM presence WHERE name = %(name)s and domain = %(domain)s", d) self.db.commit() def get_presence(self, name: str, domain: str): """ Queries the database for the presence and returns the result :param name: :param domain: :return: """ d = {"name": name, "domain": domain} self.db_cursor.execute("SELECT name, domain, artist_ID, link FROM presence " + "WHERE name = %(name)s and domain = %(domain)s", d) rows = self.db_cursor.fetchall() new_rows = [] for row in rows: new_rows.append((row[0].strip(), row[1].strip(), row[2], row[3])) return new_rows def save_artist(self, ID: int, name: str): """ Save (or update if ID is already taken) an artist to the DB :param ID: :param name: :return: """ logging.debug("Saving artist {0}:{1}".format(ID, name)) d = {"id": ID, "name": name} if ID is None: # no ID given, auto generate it self.db_cursor.execute("INSERT INTO artist (name) VALUES (%(name)s)", d) elif len(self.get_artist(ID)) != 0: # artist exists already: self.db_cursor.execute("UPDATE artist SET name = %(name)s WHERE id = %(id)s", d) else: # artist needs to be created self.db_cursor.execute("INSERT INTO artist (id, name) VALUES (%(id)s, %(name)s)", d) self.db.commit() def remove_artist(self, ID: int): """ Deletes the artist from the DB :param ID: :return: """ logging.debug("Deleting artist {0}".format(ID)) d = {"id": ID} self.db_cursor.execute("DELETE FROM Artist WHERE ID = %(id)s", d) self.db.commit() def save_category(self, name: str): """ Save the category to the DB. Does not check if the category already exists :param name: :return: """ logging.debug("Saving category {0}!".format(name)) d = {"name": name} self.db_cursor.execute("INSERT INTO tag_category (name) VALUES (%(name)s)", d) self.db.commit() def remove_category(self, name: str): """ Remove the category from the DB. :param name: :return: """ d = {"name": name} self.db_cursor.execute("DELETE FROM tag_category WHERE name = %(name)s", d) self.db.commit() def get_category_by_ID(self, id: int): """ Queries the database for the category by its ID :param id: :return: (id, "name") """ d = {"id": id} self.db_cursor.execute("SELECT category_id, name FROM tag_category WHERE category_id = %(id)s", d) r = self.db_cursor.fetchall() if len(r) == 0: return None nr = [] for i in range(len(r[0])): if isinstance(r[0][i], str): nr.append(r[0][i].strip()) else: nr.append(r[0][i]) return nr def get_category_by_name(self, name: str): """ Queries the database for the category by its name :return: """ d = {"name": name} self.db_cursor.execute("SELECT category_id, name FROM tag_category WHERE name = %(name)s", d) r = self.db_cursor.fetchall() if len(r) == 0: return None return r[0] def get_artist(self, ID: int): """ Queries the database for the artist (not presence) and returns the result :param ID: :return: """ d = {"id": ID} self.db_cursor.execute("SELECT id, name FROM artist WHERE id = %(id)s", d) return self.db_cursor.fetchall() def get_artist_presences(self, id: int): """ Queries the databse for all presences associated with the artist and returns the result :param id: :return: """ d = {"id": id} self.db_cursor.execute("SELECT name, domain, artist_id FROM Presence WHERE artist_id = %(id)s", d) return self.db_cursor.fetchall() def get_all_artists(self): """ Lists all available artists (not presences) and returns the result :return: """ self.db_cursor.execute("SELECT id, name FROM artist") def get_art_by_ID(self, ID: int): """ Queries the database for the art via its ID and returns it if available. """ d = {"ID": ID} self.db_cursor.execute("SELECT id, path, title, link, md5_hash, description FROM art WHERE id = %(ID)s", d) result = self.db_cursor.fetchall() if len(result) == 0: return None else: result = result[0] image_data = { "ID": result[0], "path": result[1], "title": result[2], "link": result[3], "md5_hash": result[4], "description": result[5], } return image_data def get_art_by_hash(self, file_hash: str) -> dict: """ Queries the database for the art via its hash and returns it if available. Currently uses an md5 hash :param file_hash: :return: """ d = {"hash": file_hash} self.db_cursor.execute("SELECT id, path, title, link, md5_hash, description FROM art where md5_hash = %(hash)s", d) result = self.db_cursor.fetchall() if len(result) == 0: return None else: result = result[0] image_data = { "ID": result[0], "path": result[1], "title": result[2], "link": result[3], "md5_hash": result[4], "description": result[5], } return image_data def get_art_by_path(self, path: str) -> dict: """ Queries the database for the art via its path and returns it if available. Otherwise None :param path: :return: """ d = {"path": path} self.db_cursor.execute("SELECT id, path, title, link, md5_hash FROM art where path = %(path)s", d) result = self.db_cursor.fetchall() if len(result) == 0: return None else: result = result[0] image_data = { "ID": result[0], "path": result[1], "title": result[2], "link": result[3], "md5_hash": result[4], } return image_data @DeprecationWarning def get_authors_of_art(self, path: str): """ Get the authors (presence and author) of the given art. :param path: :return: """ art_data = self.get_art_by_path(path) art_id: int = None if art_data is not None: art_id = art_data["ID"] if art_id is None: return None d = {"id": art_id} self.db_cursor.execute("SELECT presence_name, presence_domain FROM art_to_presence WHERE art_ID = %(id)s", d) presences = self.db_cursor.fetchall() result = [] for name, domain in presences: result.append((name.strip(), domain.strip())) return result def get_authors_of_art_by_ID(self, id: int): """ Get the authors (presence and author) of the given art. :param id: :return: """ d = {"id": id} self.db_cursor.execute("SELECT presence_name, presence_domain FROM art_to_presence WHERE art_ID = %(id)s", d) presences = self.db_cursor.fetchall() result = [] for name, domain in presences: result.append((name.strip(), domain.strip())) return result def get_art_tags_by_ID(self, art_ID: int): """ Query the database for all tags associated with a given art ID :param art_ID: :return: """ d = {"id": art_ID} self.db_cursor.execute("SELECT tag_ID FROM art_to_tag WHERE art_id = %(id)s", d) rows = self.db_cursor.fetchall() new_rows = [] for row in rows: new_rows.append(row[0]) return new_rows def search_art(self, tags: list, presences: list, resolve_tags: bool = True): """ Search with the tags and given presences for fitting art. :param tags: List[int] list of tag ids that are to be present in the art. See resolve_tags for implication. :param presences: List[Tuple(str, str)] list of presences to be present in the art. :param resolve_tags: flag if tag implications should be resolved fully. """ d = {"tags": tags, "presences": presences} l = list() for i in range(len(presences)): # converting for psycopg2 name, domain = presences[i] casted = [name, domain] l.append(casted) d["presences"] = l search_query = "SELECT art.id FROM art " \ "INNER JOIN art_to_tag ON art_to_tag.art_id = art.id " \ "INNER JOIN tag ON art_to_tag.tag_id = tag.id " \ "INNER JOIN art_to_presence as AtP ON art.id = AtP.art_id " \ "GROUP BY art.id " \ "HAVING array_agg(tag.name) @> %(tags)s::varchar[] AND " \ "array_agg(array[AtP.presence_name, AtP.presence_domain]) @> %(presences)s::varchar[]" # 1. Join all related tables # 2. Group them over art.id # 3. check which rows contain the searched tags or presences in the value arrays self.db_cursor.execute(search_query, d) result = self.db_cursor.fetchall() return result def search_fuzzy_presence(self, name: str, domain: str, all_if_empty: bool = False): """ Search a list of presences fitting the (name, domain) fuzzy. :param name: :param domain: :param all_if_empty: return all presences (or a large selection) if an empty name and domain is given :return: """ if all_if_empty and (name is None or len(name) == 0) and (domain is None or len(domain) == 0): self.db_cursor.execute("SELECT name, domain, artist_id FROM presence") else: d = {"name": "%" + name + "%", "domain": "%" + domain + "%"} self.db_cursor.execute("SELECT name, domain, artist_id FROM presence WHERE LOWER(name) LIKE " "LOWER(%(name)s) AND domain LIKE %(domain)s", d) rows = self.db_cursor.fetchall() result = [] for row in rows: result.append((row[0].strip(), row[1].strip())) return result def get_presences_art(self, name, domain): """ Query a list of all art that includes the given presence as their author :param name: :param domain: :return: """ d = {"name": name, "domain": domain} self.db_cursor.execute( "SELECT art_ID FROM Art_Author WHERE presence_name = %(name)s and presence_domain = %(domain)s", d) return self.db_cursor.fetchall() def get_all_categories(self) -> list: """ Return all categories in the database. :return: """ self.db_cursor.execute("SELECT name FROM tag_category") rows = [] for row in self.db_cursor.fetchall(): rows.append(row[0].strip()) return rows def search_fuzzy_categories(self, search: str) -> list: """ Search a list of categories fitting search fuzzy. :param search: :return: """ d = {"search": "%" + search + "%"} self.db_cursor.execute("SELECT name FROM tag_category WHERE LOWER(name) LIKE LOWER(%(search)s)", d) rows = [] for row in self.db_cursor.fetchall(): rows.append(row[0]) return rows def search_fuzzy_tag(self, name: str, all_if_empty: bool = False) -> list: """ Search a list of tags fitting name fuzzy. :param name: :param all_if_empty: :return: """ if all_if_empty and len(name) == 0: self.db_cursor.execute("SELECT name, description, category_id, ID FROM tag") else: d = {"name": "%" + name + "%"} self.db_cursor.execute( "SELECT name, description, category_id, ID FROM tag WHERE LOWER(name) LIKE LOWER(%(name)s)", d) rows = self.db_cursor.fetchall() new_rows = [] for i in range(len(rows)): new_rows.append({"name": rows[i][0].strip(), "description": rows[i][1].strip(), "category": int(rows[i][2]), "id": int(rows[i][3])}) return new_rows def search_fuzzy_artists(self, ID: int, name: str, all_if_empty: bool = False): """ Search a list of fitting artists fuzzy. If ID is None it will search using the description :param ID: :param name: :param all_if_empty: :return: """ if ID is not None: self.db_cursor.execute("SELECT id, name FROM artist WHERE id = %(id)s", {"id": ID}) elif all_if_empty and ID is None and len(name) == 0: self.db_cursor.execute("SELECT id, name FROM artist") else: d = {"name": "%" + name + "%"} self.db_cursor.execute("SELECT id, name FROM artist WHERE LOWER(name) LIKE LOWER(%(name)s)", d) return self.db_cursor.fetchall() def create_tag(self, name: str, description: str, aliases: list, implications: list, category: str = None): name = name.strip() if aliases is None: aliases = [] if implications is None: implications = [] category_id = self.get_category_by_name(category)[0] d = {"name": name, "description": description, "category_id": category_id} self.db_cursor.execute("INSERT INTO tag (name, description, category_id) " + "VALUES (LOWER(%(name)s), %(description)s, %(category_id)s)", d) for alias in aliases: self.add_alias_by_name(name, alias['name']) for implicant in implications: self.add_implication_by_ID(self.get_tag_ID(name), implicant['id']) self.db.commit() def edit_tag(self, tag_id: int, name: str, description: str, aliases: list = None, implications: list = None, category_id: int = None): """ Edit a tag with the new given data :param tag_id: tag id to uniquely identify the tag :param name: unique tag name to apply edits to (distinct from old_tag) :param description: new description to apply, set to None to omit :param aliases: list of tag names to be the alias of this tag, set to None to omit :param implications: list of tag names to be implied by this tag, set to None to omit :param category_id: :return: """ name = name.strip().lower() logging.debug(f"Editing Tag name={name}, desc={description}, aliases={aliases}, implications={implications}, category={category_id}") #implications = [self.get_tag_ID(x) for x in implications] #aliases = [self.get_tag_ID(x) for x in aliases] d = { "id": tag_id, "name": name, "description": description, "alias": aliases, "implications": implications, "category_id": category_id, } self.db_cursor.execute( "UPDATE tag SET description = %(description)s, category_id = %(category_id)s, name = %(name)s " "WHERE ID = %(id)s", d) if aliases is not None: old_aliases = self.get_tag_aliases_by_ID(tag_id) for alias in aliases: if alias["id"] in old_aliases: # is this already set? continue self.add_alias_by_ID(tag_id, alias["id"]) for old_alias in old_aliases: if old_alias not in [a["id"] for a in aliases]: # got to delete an alias? self.remove_alias_by_ID(tag_id, old_alias) if implications is not None: old_implicants = self.get_tag_implications_by_ID(tag_id) for implicant in implications: if implicant["id"] in old_implicants: # is this already set? continue self.add_implication_by_ID(tag_id, implicant["id"]) for old_implicant in old_implicants: if old_implicant not in [i["id"] for i in implications]: # got to delete an implicant? self.remove_implication_by_ID(tag_id, old_implicant) def add_alias_by_name(self, name: str, alias: str): """ Add the alias pair to the database :param name: :param alias: :return: """ self.add_alias_by_ID(self.get_tag_ID(name), self.get_tag_ID(alias)) def add_alias_by_ID(self, tag1: int, tag2: int): """ Add the alias pair to the database :return: """ d = { "tag1": tag1, "tag2": tag2 } self.db_cursor.execute("INSERT INTO tag_alias (tag1, tag2) VALUES (%(tag1)s, %(tag2)s)", d) self.db.commit() def remove_alias_by_ID(self, tag1: int, tag2: int): """ Remove alias pair from the database :param tag1: :param tag2: :return: """ d = { "tag1": tag1, "tag2": tag2 } self.db_cursor.execute("DELETE FROM tag_alias WHERE (tag1 = %(tag1)s and tag2 = %(tag2)s) or " + "(tag1 = %(tag2)s and tag2 = %(tag1)s)", d) self.db.commit() def remove_alias_by_name(self, name: str, alias: str): """ Remove alias pair from the database :param name: :param alias: :return: """ self.remove_alias_by_ID(self.get_tag_ID(name), self.get_tag_ID(alias)) def add_implication_by_ID(self, tag_id: int, implicant: int): """ Add the implication to the database :param tag_id: :param implicant: :return: """ d = { "tag": tag_id, "implicant": implicant } self.db_cursor.execute("INSERT INTO tag_implication (root_tag, implicate) VALUES (%(tag)s, %(implicant)s)", d) self.db.commit() @DeprecationWarning def add_implication_by_name(self, name: str, implicant: str): """ Add the implication to the database :param name: :param implicant: :return: """ logging.warning("This method is deprecated! Please resolve the name and use add_implication_by_id() instead.") print(f"Name: {name} ({self.get_tag_ID(name)}), implcant: {implicant} ({self.get_tag_ID(implicant)})") self.add_implication_by_ID(self.get_tag_ID(name), self.get_tag_ID(implicant)) def remove_implication_by_ID(self, tag: int, implicant: int): """ Remove the implication pair from the database :param tag: :param implicant: :return: """ d = { "tag": tag, "implicant": implicant } self.db_cursor.execute("DELETE FROM tag_implication WHERE root_tag = %(tag)s " + "and implicate = %(implicant)s", d) self.db.commit() def remove_implication_by_name(self, name: str, implicant: str): """ Remove the implication pair from the database :param name: :param implicant: :return: """ self.remove_implication_by_ID(self.get_tag_ID(name), self.get_tag_ID(implicant)) def remove_tag_by_ID(self, tag: int): """ Remove the given tag from the database :param tag: :return: """ d = {"tag": tag} self.db_cursor.execute("DELETE FROM tag WHERE id = %(tag)s", d) self.db.commit() def remove_tag_by_name(self, name: str): """ Remove the given tag from the database :param name: :return: """ self.remove_tag_by_ID(self.get_tag_ID(name)) def get_tag_ID(self, name: str) -> int: """ Get the tag ID by querying it by its unique name :param name: :return: """ d = {"name": name} try: self.db_cursor.execute("SELECT ID FROM tag where LOWER(name) = LOWER(%(name)s)", d) rows = [] for row in self.db_cursor.fetchall(): rows.append(row[0]) except psycopg2.DatabaseError as e: return None if len(rows) > 1: raise Exception("Found multiple tags by the same name!") elif len(rows) == 0: return None return rows[0] def get_tag_by_name(self, name: str) -> dict: """ Search the tag in the DB via its name. Note: name is constrained to be unique by the DB :param name: :return: Returns the tag's name, description, category, tag ID """ d = {"name": name} self.db_cursor.execute( "SELECT name, description, category_id, ID FROM tag where LOWER(name) = LOWER(%(name)s)", d) new_row = [] row = self.db_cursor.fetchall() if len(row) == 0: return None row = row[0] for value in row: if value is None: new_row.append("") elif type(value) == str: new_row.append(value.strip()) else: new_row.append(value) return {"name": new_row[0], "description": new_row[1], "category": new_row[2], "id": new_row[3]} def get_tag_by_ID(self, ID: int) -> dict: """ Search the tag in the DB via its ID. Note: name is constrained to be unique by the DB :param ID: :return: Returns the tag's ID, name, description, category """ d = {"ID": ID} self.db_cursor.execute("SELECT ID, name, description, category_id FROM tag where ID = %(ID)s", d) new_row = [] rows = self.db_cursor.fetchall() if len(rows) != 1: raise Exception("Something went terribly wrong!") row = rows[0] for value in row: if value is None: new_row.append("") elif type(value) == str: new_row.append(value.strip()) else: new_row.append(value) return {"name": new_row[1], "description": new_row[2], "category": new_row[3], "id": new_row[0]} def get_tag_aliases_by_name(self, name: str) -> List[dict]: """ Search for the tag's aliases and the alias's aliases :param name: :return: List of tag Names that are aliases of this one """ aliases = self.get_tag_aliases_by_ID(self.get_tag_ID(name)) result = [] for alias in aliases: tag_data = self.get_tag_by_ID(alias) if len(tag_data) > 0: # tag exists result.append(tag_data) return result def get_tag_aliases_by_ID(self, tag_ID: int) -> list: """ Search for the tag's aliases and the tag's aliases :param tag_ID: :return: List of tag IDs that are aliases of this one """ marked = [] to_search = [tag_ID] while len(to_search) != 0: curr_alias = to_search.pop() found_aliases = self.__get_tag_aliases_no_recurse(curr_alias) marked.append(curr_alias) for found_alias in found_aliases: if found_alias not in marked: to_search.append(found_alias) marked.remove(tag_ID) return marked def get_all_tag_implications_by_name(self, name: str) -> list: """ Search for the tag's implications and those that are implied by them. :param name: :return: """ root_ID = self.get_tag_ID(name) collected_tags = self.get_tag_implications_by_ID(root_ID) collected_tags.append(root_ID) to_search = self.get_tag_implications_by_ID(root_ID) while len(to_search) != 0: curr_tag = to_search.pop() found_implications = self.get_tag_implications_by_ID(curr_tag) for found in found_implications: if found in collected_tags: continue else: collected_tags.append(found) to_search.append(found) if root_ID in collected_tags: collected_tags.remove(root_ID) result = [] for tag_ID in collected_tags: tag_data = self.get_tag_by_ID(tag_ID) if len(tag_data) != 0: result.append(tag_data) return result def get_all_tag_impliers_by_ID(self, ID: int) -> list: """ Search for all tags that imply this one or imply a tag that implies this one. :param ID: :return: """ collected_tags = self.get_tag_impliers_by_ID(ID) to_search = self.get_tag_impliers_by_ID(ID) collected_tags.append(ID) while len(to_search) != 0: curr_tag = to_search.pop() found_impliers = self.get_tag_impliers_by_ID(curr_tag) for found in found_impliers: if found in collected_tags: continue else: collected_tags.append(found) to_search.append(found) if ID in collected_tags: collected_tags.remove(ID) return collected_tags def get_tag_implications_by_ID(self, ID: int) -> list: """ Search for the tag's implications :param ID: ID of the tag :return: returns a list of IDs that are implied """ d = {"ID": ID} if d["ID"] is None: return [] self.db_cursor.execute("SELECT root_tag, implicate FROM Tag_Implication WHERE root_tag = %(ID)s", d) rows = self.db_cursor.fetchall() if len(rows) == 0: return [] r = [] for row in rows: r.append(row[1]) return r def get_tag_implications(self, name: str) -> list: """ Search for the tag's implications :param name: :return: List of tag ids """ d = {"ID": self.get_tag_ID(name)} if d["ID"] is None: return [] self.db_cursor.execute("SELECT root_tag, implicate FROM Tag_Implication WHERE root_tag = %(ID)s", d) rows = self.db_cursor.fetchall() if len(rows) == 0: return [] r = [] for row in rows: r.append(row[1]) result = [self.get_tag_by_ID(v) for v in r] return result def get_tag_impliers_by_ID(self, ID: int): """ Search for tags that imply this one. :param ID: :return: """ d = {"ID": ID} self.db_cursor.execute("SELECT root_tag FROM Tag_Implication WHERE implicate = %(ID)s", d) rows = self.db_cursor.fetchall() return rows def get_tag_impliers_by_name(self, name: str) -> list: """ Search for tags that imply this one. :param name: :return: """ d = {"ID": self.get_tag_ID(name)} self.db_cursor.execute("SELECT root_tag FROM Tag_Implication WHERE implicate = %(ID)s", d) rows = self.db_cursor.fetchall() return rows def __get_tag_aliases_no_recurse(self, tag_id: int) -> list: """ Search for the tag's aliases :param tag_id: :return: """ d = {"ID": tag_id} if d["ID"] is None: return [] self.db_cursor.execute("SELECT tag1, tag2 FROM Tag_Alias " "WHERE tag1 = %(ID)s or tag2 = %(ID)s", d) rows = self.db_cursor.fetchall() r = [] for row in rows: if row[0] == tag_id: r.append(row[1]) elif row[1] == tag_id: r.append(row[0]) else: raise Exception("Something went terribly wrong!") return r def create_collection(self, collection_name: str, description: str) -> int: """ Create the collection with name and description as given. """ d = {"name": collection_name, "description": description} try: self.db_cursor.execute("INSERT INTO art_collection (name, description) VALUES (%(name)s, %(description)s) " "RETURNING id", d) self.db.commit() except psycopg2.DatabaseError as e: logging.error(f"Encountered database error on create_collection() with collection_name: {collection_name} " f"description: {description}. Rolling it back now ...") logging.error(e) self.db.rollback() return None return self.db_cursor.fetchall()[0] def delete_collection(self, collection_id: int): """ Delete the collection given by the id. """ d = {"ID": collection_id} try: self.db_cursor.execute("DELETE FROM art_collection WHERE ID = %(ID)s", d) self.db.commit() except psycopg2.DatabaseError as e: logging.error(f"Encountered database error on delete_collection() ID: {collection_id}. " f"Rolling it back now ...") logging.error(e) self.db.rollback() def get_collections_of_art_by_ID(self, art_id: int): """ Queries the database for collections that include the given art_id :return: [(ID, name, description)] """ d = {"art_id": art_id} self.db_cursor.execute("SELECT art_collection.id, art_collection.name, art_collection.description " "FROM art_collection " "INNER JOIN art_to_art_collection as atac ON art_collection.id = atac.collection_id " "WHERE atac.art_id = %(art_id)s", d) return self.db_cursor.fetchall() def get_collection(self, collection_id: int): """ Returns the collection given by the id. """ d = {"ID": collection_id} self.db_cursor.execute("SELECT collection.id, collection.name, collection.description, array_agg(art.id), " "array_agg(art.title), array_agg(art.path), array_agg(atac.ranking) " "FROM art_collection as collection " "LEFT OUTER JOIN art_to_art_collection as atac ON atac.collection_id = collection.id " "LEFT OUTER JOIN art ON atac.art_id = art.id " "WHERE collection.ID = %(ID)s " "GROUP BY collection.id", d) r = self.db_cursor.fetchall() if len(r) > 0: r = r[0] collection = dict() collection["id"] = r[0] collection["name"] = r[1] collection["description"] = r[2] collection["entries"] = [] for i in range(len(r[3])): art_data = {"id": r[3][i], "title": r[4][i], "path": r[5][i], "ranking": r[6][i]} if art_data["id"] is None: # skipping non-existing art entries continue # might happen once when the collection has no entries collection["entries"].append(art_data) return collection def search_collections_by_name(self, name: str) -> List[Tuple[int, str]]: """ Returns the collection by the given name """ d = {"name": name} try: self.db_cursor.execute("SELECT art_collection.id, art_collection.name " "FROM art_collection " "WHERE art_collection.name = %(name)s", d) except psycopg2.DatabaseError as e: logging.error(f"Encountered database error on get_collection_by_name() name: {name}." f"Rolling it back now ...") logging.error(e) self.db.rollback() return None return self.db_cursor.fetchall() def search_collections(self, collection_name: str = None, presence_name: str = None, presence_domain: str = None, art_name: str = None): """ Search for collections that contain the given parameters. Search is always fuzzy. :param collection_name: :param presence_name: :param presence_domain: :param art_name: Search for collections containing art with this title :result: List[collection_id, collection_name, collection_desc, List[art_id]] """ d = {"collection_name": "%"+collection_name+"%", "presence_name": "%"+presence_name+"%", "presence_domain": "%"+presence_domain+"%", "art_name": "%"+art_name+"%"} try: self.db_cursor.execute("SELECT " "(art_collection.id) as collection_id, " "(art_collection.name) as collection_name, " "(art_collection.description) as collection_desc, " "array_agg(art.id) as art_ID, array_agg(art.title) as art_titles, " "array_agg(art_to_presence.presence_name) as presence_names, " "array_agg(art_to_presence.presence_domain) as presence_domains " "FROM art_collection " "LEFT OUTER JOIN art_to_art_collection On art_to_art_collection.collection_id = art_collection.id " "LEFT OUTER JOIN art ON art_to_art_collection.art_id = art.id " "LEFT OUTER JOIN art_to_presence ON art_to_presence.art_ID = art.id " "WHERE LOWER(art_collection.name) LIKE LOWER(%(collection_name)s) " "GROUP BY art_collection.id " "HAVING LOWER(array_to_string(array_agg(art_to_presence.presence_name), '|', '')) LIKE LOWER(%(presence_name)s) " "AND LOWER(array_to_string(array_agg(art_to_presence.presence_domain), '|', '')) LIKE LOWER(%(presence_domain)s) " "AND LOWER(array_to_string(array_agg(art.title), '|', '')) LIKE LOWER(%(art_name)s);", d) except psycopg2.DatabaseError as e: logging.error(f"Encountered database error on search_collections(): {e}") self.db.rollback() raise e return self.db_cursor.fetchall()