You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

854 lines
29 KiB
Python

import psycopg2
from psycopg2.errors import *
from psycopg2.errorcodes import UNIQUE_VIOLATION
class DBAdapter:
def __init__(self, user: str, password: str, host: str = "localhost", port: int = 5432, database: str = "artnet"):
self.db = None
self.db_cursor = None
try:
self.db = psycopg2.connect(host=host, port=port, database=database, user=user, password=password)
self.db_cursor = self.db.cursor()
except psycopg2.OperationalError as e:
raise ValueError("Invalid DB credentials!")
print("DB connection to {0}:{1}/{2}".format(host, port, database))
def save_image(self, ID: str, title: str, authors: list, path: str, tags: list, link: str, md5_hash: str):
"""
Updates or saves the given image data to the DB
:param ID:
:param title:
:param authors:
:param path:
:param tags:
:param link:
:param md5_hash: md5 hash as a hex digest
:return:
"""
print("Saving Image {0}:{1} authors: {2} path: {3} tags: {4} link: {5} hash:{6}"
.format(ID, title, authors, path, tags, link, md5_hash))
d = {"title": title, "path": path, "id": ID, "link": link, "hash": md5_hash}
if self.get_art_by_path(path) is None:
if ID is None:
self.db_cursor.execute(
"INSERT INTO art (path, title, link, md5_hash) VALUES (%(path)s, %(title)s, %(link)s, %(hash)s)",
d)
ID = self.get_art_by_path(path)["ID"]
d["id"] = ID
elif self.get_tag_by_ID(ID) is None:
self.db_cursor.execute(
"INSERT INTO art (path, title, link, md5_hash) VALUES (%(path)s, %(title)s, %(link)s, %(hash)s)", d)
else:
self.db_cursor.execute("UPDATE art SET path = %(path)s, title = %(title)s, link = %(link)s, " +
"md5_hash = %(hash)s WHERE id = %(id)s", d)
if ID is None:
ID = self.get_art_by_path(path)["ID"]
assert(ID != None)
old_tags = [self.get_tag_by_ID(x)[0][1].strip() for x in self.get_art_tags_by_ID(ID)]
for tag in tags:
if tag in old_tags: # already present
continue
d = {"id": ID, "tag": self.get_tag_ID(tag)}
try:
self.db_cursor.execute("INSERT INTO art_tag (art_id, tag_ID) VALUES (%(id)s, %(tag)s)", d)
except psycopg2.Error as e:
if e.pgcode == UNIQUE_VIOLATION:
print(e)
print("Skipping Unique Violation ...")
else:
raise e
for old_tag in old_tags:
if old_tag not in tags: # need to remove old tag
self.remove_tag_from_image(art_ID=ID, tag_ID=self.get_tag_ID(old_tag))
old_authors = self.get_authors_of_art_by_ID(ID)
for author in authors:
if author in old_authors:
continue
d = {"id": ID, "author_name": author[0], "author_domain": author[1]}
self.db_cursor.execute("INSERT INTO art_author (presence_name, presence_domain, art_ID) " +
"VALUES (%(author_name)s, %(author_domain)s, %(id)s)", d)
for old_author_name, old_author_domain in old_authors:
if (old_author_name, old_author_domain) not in authors: # need to remove old author
self.remove_artist_from_image(art_ID=ID, presence_name=old_author_name,
presence_domain=old_author_domain)
self.db.commit()
def remove_image(self, hash: str):
"""
Removes an image completely from the database.
Does not affect tags or artists but removes art-tag, art-artist relations
:param hash: md5 hash of the image to be deleted
:return:
"""
image_data = self.get_art_by_hash(hash)
art_ID = image_data["ID"]
print(f"Deleting image #{art_ID} {image_data['title']}")
tags = self.get_art_tags_by_ID(art_ID)
for tag_ID in tags:
pass
#self.remove_tag_from_image(art_ID=art_ID, tag_ID=tag_ID)
authors = self.get_authors_of_art_by_ID(art_ID)
for presence_name, presence_domain in authors:
pass
#self.remove_artist_from_image(art_ID=art_ID, presence_name=presence_name, presence_domain=presence_domain)
#self.db_cursor.execute("DELETE FROM art WHERE md5_hash = %(hash)s", {"hash": hash})
def remove_artist_from_image(self, art_ID, presence_name, presence_domain):
"""
Removes the relationship between an artist and the image
:param art_ID:
:param presence_name:
:param presence_domain:
:return:
"""
d = {"name": presence_name, "domain": presence_domain, "id": art_ID}
self.db_cursor.execute("DELETE FROM art_author " +
"WHERE presence_name = %(name)s and " +
"presence_domain = %(domain)s and art_ID = %(id)s",
d)
def remove_tag_from_image(self, art_ID, tag_ID):
"""
Remove the relationship between a tag and the image
:param art_ID:
:param tag_ID:
:return:
"""
d = {"id": art_ID, "tag": tag_ID}
self.db_cursor.execute("DELETE FROM art_tag WHERE art_id = %(id)s and tag_id = %(tag)s", d)
def save_presence(self, name: str, domain: str, artist_ID: int, link: str):
"""
Save the presence entry with the given data.
If it doesn't exist a new one will be created.
If the artist ID is unknown an exception will be thrown
:param name:
:param domain:
:param artist_ID:
:param link:
:return:
"""
print("Saving Presence {0}:{1} Artist: {2} Link: {3}".format(name, domain, artist_ID, link))
artist = self.get_artist(artist_ID)
if artist is None or len(artist) == 0:
raise Exception("Unknown Artist to create/update Presence with!")
presence = self.get_presence(name, domain)
d = {"name": name, "domain": domain, "artist": artist_ID, "link": link}
if len(presence) == 0: # presence does not exist yet
self.db_cursor.execute("INSERT INTO presence (name, domain, artist, link) " +
"VALUES (%(name)s, %(domain)s, %(artist)s, %(link)s)", d)
else: # presence exists, update it
self.db_cursor.execute("UPDATE presence SET artist = %(artist)s, link = %(link)s" +
"WHERE name = %(name)s and domain = %(domain)s", d)
self.db.commit()
def remove_presence(self, name: str, domain: str):
"""
Remove a presence from the database
:return:
"""
print("Removing Presence {0}:{1}".format(name, domain))
d = {"name": name, "domain": domain}
self.db_cursor.execute("DELETE FROM presence WHERE name = %(name)s and domain = %(domain)s", d)
self.db.commit()
def get_presence(self, name: str, domain: str):
"""
Queries the database for the presence and returns the result
:param name:
:param domain:
:return:
"""
d = {"name": name, "domain": domain}
self.db_cursor.execute("SELECT name, domain, artist, link FROM presence " +
"WHERE name = %(name)s and domain = %(domain)s",
d)
rows = self.db_cursor.fetchall()
new_rows = []
for row in rows:
new_rows.append((row[0].strip(), row[1].strip(), row[2], row[3]))
return new_rows
def save_artist(self, ID: int, description: str):
"""
Save (or update if ID is already taken) an artist to the DB
:param ID:
:param description:
:return:
"""
print("Saving artist {0}:{1}".format(ID, description))
d = {"id": ID, "description": description}
if ID is None: # no ID given, auto generate it
self.db_cursor.execute("INSERT INTO artist (description) VALUES (%(description)s)", d)
elif len(self.get_artist(ID)) != 0: # artist exists already:
self.db_cursor.execute("UPDATE artist SET description = %(description)s WHERE id = %(id)s", d)
else: # artist needs to be created
self.db_cursor.execute("INSERT INTO artist (id, description) VALUES (%(id)s, %(description)s)", d)
self.db.commit()
def remove_artist(self, ID: int):
"""
Deletes the artist from the DB
:param ID:
:return:
"""
print("Deleting artist {0}".format(ID))
d = {"id": ID}
self.db_cursor.execute("DELETE FROM Artist WHERE ID = %(id)s", d)
self.db.commit()
def save_category(self, name: str):
"""
Save the category to the DB.
Does not check if the category already exists
:param name:
:return:
"""
print("Saving category {0}!".format(name))
d = {"name": name}
self.db_cursor.execute("INSERT INTO tag_category (name) VALUES (%(name)s)", d)
self.db.commit()
def remove_category(self, name: str):
"""
Remove the category from the DB.
:param name:
:return:
"""
d = {"name": name}
self.db_cursor.execute("DELETE FROM tag_category WHERE name = %(name)s", d)
self.db.commit()
def get_artist(self, ID: int):
"""
Queries the database for the artist (not presence) and returns the result
:param ID:
:return:
"""
d = {"id": ID}
self.db_cursor.execute("SELECT id, description FROM artist WHERE id = %(id)s", d)
return self.db_cursor.fetchall()
def get_artist_presences(self, id: int):
"""
Queries the databse for all presences associated with the artist and returns the result
:param id:
:return:
"""
d = {"id": id}
self.db_cursor.execute("SELECT name, domain, artist FROM Presence WHERE artist = %(id)s", d)
return self.db_cursor.fetchall()
def get_all_artists(self):
"""
Lists all available artists (not presences) and returns the result
:return:
"""
self.db_cursor.execute("SELECT id, description FROM artist")
def get_art_by_hash(self, file_hash: str) -> dict:
"""
Queries the database for the art via its hash and returns it if available.
Currently uses an md5 hash
:param file_hash:
:return:
"""
d = {"hash": file_hash}
self.db_cursor.execute("SELECT id, path, title, link, md5_hash FROM art where md5_hash = %(hash)s", d)
result = self.db_cursor.fetchall()
if len(result) == 0:
return None
else:
result = result[0]
image_data = {
"ID": result[0],
"path": result[1],
"title": result[2],
"link": result[3],
"md5_hash": result[4],
}
return image_data
def get_art_by_path(self, path: str) -> dict:
"""
Queries the database for the art via its path and returns it if available.
Otherwise None
:param path:
:return:
"""
d = {"path": path}
self.db_cursor.execute("SELECT id, path, title, link, md5_hash FROM art where path = %(path)s", d)
result = self.db_cursor.fetchall()
if len(result) == 0:
return None
else:
result = result[0]
image_data = {
"ID": result[0],
"path": result[1],
"title": result[2],
"link": result[3],
"md5_hash": result[4],
}
return image_data
@DeprecationWarning
def get_authors_of_art(self, path: str):
"""
Get the authors (presence and author) of the given art.
:param path:
:return:
"""
art_data = self.get_art_by_path(path)
art_id: int = None
if art_data is not None:
art_id = art_data["ID"]
if art_id is None:
return None
d = {"id": art_id}
self.db_cursor.execute("SELECT presence_name, presence_domain FROM art_author WHERE art_ID = %(id)s", d)
presences = self.db_cursor.fetchall()
result = []
for name, domain in presences:
result.append((name.strip(), domain.strip()))
return result
def get_authors_of_art_by_ID(self, id: int):
"""
Get the authors (presence and author) of the given art.
:param id:
:return:
"""
d = {"id": id}
self.db_cursor.execute("SELECT presence_name, presence_domain FROM art_author WHERE art_ID = %(id)s", d)
presences = self.db_cursor.fetchall()
result = []
for name, domain in presences:
result.append((name.strip(), domain.strip()))
return result
def get_art_tags_by_ID(self, art_ID: int):
"""
Query the database for all tags associated with a given art ID
:param art_ID:
:return:
"""
d = {"id": art_ID}
self.db_cursor.execute("SELECT tag_ID FROM art_tag WHERE art_id = %(id)s", d)
rows = self.db_cursor.fetchall()
new_rows = []
for row in rows:
new_rows.append(row[0])
return new_rows
def search_fuzzy_presence(self, name: str, domain: str, all_if_empty: bool = False):
"""
Search a list of presences fitting the (name, domain) fuzzy.
:param name:
:param domain:
:param all_if_empty:
:return:
"""
if all_if_empty and (name is None or len(name) == 0) and (domain is None or len(domain) == 0):
self.db_cursor.execute("SELECT name, domain, artist FROM presence")
else:
self.db_cursor.execute("SELECT name, domain, artist FROM presence WHERE LOWER(name) LIKE LOWER('%{0}%')".format(name) +
" AND domain LIKE '%{0}%'".format(domain))
rows = self.db_cursor.fetchall()
result = []
for row in rows:
result.append((row[0].strip(), row[1].strip()))
return result
def get_presences_art(self, name, domain):
"""
Query a list of all art that includes the given presence as their author
:param name:
:param domain:
:return:
"""
d = {"name": name, "domain": domain}
self.db_cursor.execute(
"SELECT art_ID FROM Art_Author WHERE presence_name = %(name)s and presence_domain = %(domain)s", d)
return self.db_cursor.fetchall()
def get_all_categories(self) -> list:
"""
Return all categories in the database.
:return:
"""
self.db_cursor.execute("SELECT name FROM tag_category")
rows = []
for row in self.db_cursor.fetchall():
rows.append(row[0].strip())
return rows
def search_fuzzy_categories(self, search: str) -> list:
"""
Search a list of categories fitting search fuzzy.
:param search:
:return:
"""
self.db_cursor.execute("SELECT name FROM tag_category WHERE LOWER(name) LIKE LOWER('%{0}%')".format(search))
rows = []
for row in self.db_cursor.fetchall():
rows.append(row[0])
return rows
def search_fuzzy_tag(self, name: str, all_if_empty: bool = False) -> list:
"""
Search a list of tags fitting name fuzzy.
:param name:
:param all_if_empty:
:return:
"""
if all_if_empty and len(name) == 0:
self.db_cursor.execute("SELECT name, description, category FROM tag")
else:
self.db_cursor.execute("SELECT name, description, category FROM tag WHERE LOWER(name) LIKE LOWER('%{0}%')".format(name))
rows = self.db_cursor.fetchall()
new_rows = []
for i in range(len(rows)):
new_rows.append([])
for j in range(len(rows[i])):
if rows[i][j] is None:
new_rows[i].append("")
else:
new_rows[i].append(rows[i][j].strip())
return new_rows
def search_fuzzy_artists(self, ID: int, description: str, all_if_empty: bool = False):
"""
Search a list of fitting artists fuzzy.
If ID is None it will search using the description
:param ID:
:param description:
:param all_if_empty:
:return:
"""
if ID is not None:
self.db_cursor.execute("SELECT id, description FROM artist WHERE id = %(id)s", {"id": ID})
elif all_if_empty and ID is None and len(description) == 0:
self.db_cursor.execute("SELECT id, description FROM artist")
else:
self.db_cursor.execute("SELECT id, description FROM artist WHERE LOWER(description) LIKE LOWER('%{0}%')"
.format(description))
return self.db_cursor.fetchall()
def create_tag(self, name: str, description: str, aliases: list, implications: list, category: str = None):
name = name.strip()
if aliases is None:
aliases = []
if implications is None:
implications = []
d = {"name": name, "description": description, "category": category}
self.db_cursor.execute("INSERT INTO tag (name, description, category) " +
"VALUES (LOWER(%(name)s), %(description)s, %(category)s)", d)
for alias in aliases:
self.add_alias(name, alias)
for implicant in implications:
self.add_implication(name, implicant)
self.db.commit()
def edit_tag(self, name: str, description: str, aliases: list=None, implications: list=None, category: str = None,
old_tag: str = None):
"""
Edit a tag with the new given data
:param name: unique tag name to apply edits to (distinct from old_tag)
:param description: new description to apply, set to None to omit
:param aliases: list of tag names to be the alias of this tag, set to None to omit
:param implications: list of tag names to be implied by this tag, set to None to omit
:param category:
:param old_tag: old tag name from which to transfer all data
:return:
"""
name = name.strip().lower()
d = {
"name": name,
"description": description,
"alias": aliases,
"implications": implications,
"category": category,
}
if old_tag is not None and len(old_tag) > 0:
# tag name changed, need to transfer all data and remove old tag
old_tag = self.get_tag_by_name(old_tag)
d["ID"] = old_tag[0][-1]
self.db_cursor.execute("UPDATE tag SET name = %(name)s WHERE tag_ID = %(ID)s", d)
if description is not None:
self.db_cursor.execute("UPDATE tag SET description = %(description)s, category = %(category)s " +
"WHERE name = %(name)s", d)
if aliases is not None:
old_aliases = self.get_tag_aliases(name)
for alias in aliases:
if alias in old_aliases: # is this already set?
continue
self.add_alias(name, alias)
for old_alias in old_aliases:
if old_alias not in aliases: # got to delete an alias?
self.remove_alias(name, old_alias)
if implications is not None:
old_implicants = self.get_tag_implications(name)
for implicant in implications:
if implicant in old_implicants: # is this already set?
continue
self.add_implication(name, implicant)
for old_implicant in old_implicants:
if old_implicant not in implications: # got to delete an implicant?
self.remove_implication(name, old_implicant)
def add_alias(self, name: str, alias: str):
"""
Add the alias pair to the database
:param name:
:param alias:
:return:
"""
d = {
"name": self.get_tag_ID(name),
"alias": self.get_tag_ID(alias)
}
self.db_cursor.execute("INSERT INTO tag_alias (tag1, tag2) VALUES (%(name)s, %(alias)s)", d)
def remove_alias(self, name: str, alias: str):
"""
Remove alias pair from the database
:param name:
:param alias:
:return:
"""
d = {
"ID": self.get_tag_ID(name),
"alias": self.get_tag_ID(alias)
}
self.db_cursor.execute("DELETE FROM tag_alias WHERE (tag1 = %(ID)s and tag2 = %(alias)s) " +
"or (tag1 = %(alias)s and tag2 = %(ID)s)", d)
def add_implication(self, name: str, implicant: str):
"""
Add the implication to the database
:param name:
:param implicant:
:return:
"""
d = {
"name": self.get_tag_ID(name),
"implicant": self.get_tag_ID(implicant)
}
self.db_cursor.execute("INSERT INTO tag_implication (root_tag, implicate) VALUES (%(name)s, %(implicant)s)",
d)
self.db.commit()
def remove_implication(self, name: str, implicant: str):
"""
Remove the implication pair from the database
:param name:
:param implicant:
:return:
"""
d = {"name": self.get_tag_ID(name),
"implicant": self.get_tag_ID(implicant)}
self.db_cursor.execute("DELETE FROM tag_implication WHERE root_tag = %(name)s " +
"and implicate = %(implicant)s", d)
self.db.commit()
def remove_tag(self, name: str):
"""
Remove the given tag from the database
:param str:
:return:
"""
d = {"name": name}
self.db_cursor.execute("DELETE FROM tag WHERE name = %(name)s", d)
self.db.commit()
def get_tag_ID(self, name: str) -> int:
"""
Get the tag ID by querying it by its unique name
:param name:
:return:
"""
d = {"name": name}
self.db_cursor.execute("SELECT tag_ID, name FROM tag where LOWER(name) = LOWER(%(name)s)", d)
rows = []
for row in self.db_cursor.fetchall():
rows.append(row[0])
if len(rows) > 1:
exit(1) # something went horribly horribly wrong!
elif len(rows) == 0:
return None
return rows[0]
def get_tag_by_name(self, name: str) -> list:
"""
Search the tag in the DB via its name.
Note: name is constrained to be unique by the DB
:param name:
:return: Returns the tag's name, description, category, tag ID
"""
d = {"name": name}
self.db_cursor.execute("SELECT name, description, category, tag_ID FROM tag where LOWER(name) = LOWER(%(name)s)", d)
rows = []
for row in self.db_cursor.fetchall():
new_row = []
for value in row:
if value is None:
new_row.append("")
elif type(value) == str:
new_row.append(value.strip())
else:
new_row.append(value)
rows.append(new_row)
return rows
def get_tag_by_ID(self, ID: int) -> list:
"""
Search the tag in the DB via its ID.
Note: name is constrained to be unique by the DB
:param ID:
:return: Returns the tag's ID, name, description, category
"""
d = {"ID": ID}
self.db_cursor.execute("SELECT tag_ID, name, description, category FROM tag where tag_ID = %(ID)s", d)
return self.db_cursor.fetchall()
def get_tag_aliases(self, name: str) -> list:
"""
Search for the tag's aliases and the tag's aliases
:param name:
:return: List of tag Names that are aliases of this one
"""
aliases = self.get_tag_aliases_by_ID(self.get_tag_ID(name))
result = []
for alias in aliases:
tag_data = self.get_tag_by_ID(alias)
if len(tag_data) > 0:
result.append(tag_data[0][1].strip())
return result
def get_tag_aliases_by_ID(self, tag_ID: int) -> list:
"""
Search for the tag's aliases and the tag's aliases
:param name:
:return: List of tag IDs that are aliases of this one
"""
marked = []
to_search = [tag_ID]
while len(to_search) != 0:
curr_alias = to_search.pop()
found_aliases = self.__get_tag_aliases_no_recurse(curr_alias)
marked.append(curr_alias)
for found_alias in found_aliases:
if found_alias not in marked:
to_search.append(found_alias)
marked.remove(tag_ID)
return marked
def get_all_tag_implications(self, name: str) -> list:
"""
Search for the tag's implications and those that are implied by them.
:param name:
:return:
"""
root_ID = self.get_tag_ID(name)
collected_tags = self.get_tag_implications_by_ID(root_ID)
collected_tags.append(root_ID)
to_search = self.get_tag_implications_by_ID(root_ID)
while len(to_search) != 0:
curr_tag = to_search.pop()
found_implications = self.get_tag_implications_by_ID(curr_tag)
for found in found_implications:
if found in collected_tags:
continue
else:
collected_tags.append(found)
to_search.append(found)
collected_tags.remove(root_ID)
result = []
for tag_ID in collected_tags:
tag_data = self.get_tag_by_ID(tag_ID)
if len(tag_data) != 0:
result.append(tag_data[0][1].strip())
return result
def get_tag_implications_by_ID(self, ID: int) -> list:
"""
Search for the tag's implications
:param ID: ID of the tag
:return: returns a list of IDs that are implied
"""
d = {"ID": ID}
if d["ID"] is None:
return []
self.db_cursor.execute("SELECT root_tag, implicate FROM Tag_Implication WHERE root_tag = %(ID)s", d)
rows = self.db_cursor.fetchall()
if len(rows) == 0:
return []
r = []
for row in rows:
r.append(row[1])
return r
def get_tag_implications(self, name: str) -> list:
"""
Search for the tag's implications
:param name:
:param output_ID: Don't resolve the tag ids into names in the resulting list
:return: List of tag names
"""
d = {"ID": self.get_tag_ID(name)}
if d["ID"] is None:
return []
self.db_cursor.execute("SELECT root_tag, implicate FROM Tag_Implication WHERE root_tag = %(ID)s", d)
rows = self.db_cursor.fetchall()
if len(rows) == 0:
return []
r = []
for row in rows:
r.append(row[1])
result = [self.get_tag_by_ID(v) for v in r]
r = []
for value in result:
r.append(value[0][1].strip())
return r
def get_tag_impliers(self, name: str) -> list:
"""
Search for tags that imply this one.
:param name:
:return:
"""
d = {"ID": self.get_tag_ID(name)}
self.db_cursor.execute("SELECT root_tag, implicate FROM Tag_Implication WHERE implicate = %(ID)s", d)
rows = self.db_cursor.fetchall()
r = []
for row in rows:
r.append(row[0])
return r
def __get_tag_aliases_no_recurse(self, tag_id: int) -> list:
"""
Search for the tag's aliases
:param tag_id:
:return:
"""
d = {"ID": tag_id}
if d["ID"] is None:
return []
self.db_cursor.execute("SELECT tag1, tag2 FROM Tag_Alias "
"WHERE tag1 = %(ID)s or tag2 = %(ID)s", d)
rows = self.db_cursor.fetchall()
r = []
for row in rows:
if row[0] == tag_id:
r.append(row[1])
elif row[1] == tag_id:
r.append(row[0])
else:
raise Exception("Something went terribly wrong!")
return r
if __name__ == "__main__":
db = DBAdapter(user="artnet_editor", password="G606Rm9sFEXe6wfTLxVu",
database="artnet", host="localhost", port=5432)
search = 'Ajin/66699b4e41f533982db1766e8f72494a.gif2222'
print("Art search result:", search, db.get_art_by_path('Ajin/66699b4e41f533982db1766e8f72494a.gif'))
search = "tag"
print("Fuzzy Search Result:", search, db.search_fuzzy_tag(search))
search = "tag1"
print("Search Result:", search, db.get_tag_by_name(search))
search = "tag1"
print("Alias Search Result:", search, db.get_tag_aliases(search))
search = "tag1"
print("Implication Search Result:", search, db.get_tag_implications(search))
target = "tag1"
db.edit_tag(target, "new description! ;D", aliases=["tag1_alias"], implications=["tag1_implicated"])
print("Editing:", target, db.get_tag_by_name(target), db.get_tag_aliases(target), db.get_tag_implications(target))