from typing import List import psycopg2 from sqlalchemy import Column, Boolean, Float, String, Integer, ForeignKey, Table, func, ForeignKeyConstraint import sqlalchemy.orm as db import sqlalchemy from sqlalchemy.orm import declarative_base, relationship, load_only from database.models import Art, ArtnoID, Artist, Presence, TagCategory, TagCategorynoID, TagNoID, Tag, \ Topic, TopicNoId, Collection, CollectionNoID, Art2CollRelationNoID, Art2CollRelation # TODO read sensitive data from environment file or similar SQLALCHEMY_DATABASE_URL = "postgresql://artnet_editor:G606Rm9sFEXe6wfTLxVu@[::1]/test_artnet" engine = sqlalchemy.create_engine(SQLALCHEMY_DATABASE_URL, echo=True) SessionLocal = sqlalchemy.orm.sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = sqlalchemy.orm.declarative_base() art_to_tag_table = Table('art_to_tag', Base.metadata, Column('art_id', Integer, ForeignKey('art.id')), Column('tag_id', Integer, ForeignKey('tag.tag_id'))) art_to_art_collection = Table('art_to_art_collection', Base.metadata, Column('collection_id', Integer, ForeignKey('art_collection.id')), Column('art_id', Integer, ForeignKey('art.id')), Column('ranking', String))""" artist_to_topic_table = Table('artist_to_topic', Base.metadata, Column('artist_id', Integer, ForeignKey('artist.id')), Column('topic_id', Integer, ForeignKey('topic.id'))) class DBTagImplication(Base): __tablename__ = 'tag_implication' root_tag = Column(Integer, ForeignKey('tag.tag_id'), primary_key=True) implicate = Column(Integer, ForeignKey('tag.tag_id'), primary_key=True) class DBTagAlias(Base): __tablename__ = 'tag_alias' tag1 = Column(Integer, ForeignKey('tag.tag_id'), primary_key=True) tag2 = Column(Integer, ForeignKey('tag.tag_id'), primary_key=True) class DBPresence(Base): __tablename__ = "presence" name = Column(String(20), primary_key=True) domain = Column(String(20), primary_key=True) link = Column(String, nullable=True) artist_id = Column(Integer, ForeignKey('artist.id', ondelete="CASCADE"), nullable=False) artist = relationship("DBArtist", back_populates="presences") arts = relationship("DBArt", secondary="art_to_presence", back_populates="presences", cascade="delete, all") class DBArt(Base): __tablename__ = 'art' id = Column(Integer, primary_key=True, index=True) md5_hash = Column(String(32), nullable=False, unique=True) path = Column(String, nullable=False, unique=True) title = Column(String, nullable=True) link = Column(String, nullable=True) description = Column(String, nullable=True) tags = relationship("DBTag", secondary=art_to_tag_table, back_populates="art", viewonly=True) collections = relationship("DBCollection", secondary="art_to_art_collection", back_populates="art") presences = relationship("DBPresence", secondary="art_to_presence", back_populates="arts") class DBArt2Presence(Base): __tablename__ = "art_to_presence" presence_name = Column(String(20), primary_key=True) presence_domain = Column(String(20), primary_key=True) art_id = Column(Integer, ForeignKey(DBArt.id, ondelete="CASCADE"), primary_key=True) __table_args__ = ( ForeignKeyConstraint( ("presence_name", "presence_domain"), ["presence.name", "presence.domain"] ), ) class DBArtist(Base): __tablename__ = "artist" id = Column(Integer, primary_key=True) name = Column(String, nullable=False) topics = relationship("DBTopic", secondary="artist_to_topic", back_populates="artists", viewonly=True) presences = relationship("DBPresence", back_populates="artist", cascade="all, delete") class DBCollection(Base): __tablename__ = "art_collection" id = Column(Integer, primary_key=True) name = Column(String, unique=True, nullable=False) description = Column(String) art = relationship("DBArt", secondary="art_to_art_collection", back_populates="collections") class DBCollection2Art(Base): __tablename__ = "art_to_art_collection" collection_id = Column(Integer, ForeignKey(DBCollection.id, ondelete="CASCADE"), primary_key=True) art_id = Column(Integer, ForeignKey(DBArt.id, ondelete="CASCADE"), primary_key=True) ranking = Column(String) class DBTopic(Base): # as of now unused __tablename__ = "topic" id = Column(Integer, primary_key=True) name = Column(String(20), unique=True, nullable=False) description = Column(String) artists = relationship("DBArtist", secondary="artist_to_topic", back_populates="topics", viewonly=True) class DBTagCategory(Base): __tablename__ = "tag_category" category_id = Column(Integer, primary_key=True) name = Column(String(20), nullable=False) tags = relationship("DBTag", back_populates="category", cascade="all, delete") class DBTag(Base): __tablename__ = "tag" tag_id = Column(Integer, primary_key=True) name = Column(String(50), unique=True) description = Column(String) category_id = Column(Integer, ForeignKey('tag_category.category_id')) category = relationship("DBTagCategory", back_populates='tags', foreign_keys=[category_id]) # TODO check if cascade is required art = relationship("DBArt", secondary=art_to_tag_table, back_populates="tags") # TODO check if cascade is required implications = relationship(DBTagImplication, backref="implied_by", primaryjoin=tag_id == DBTagImplication.root_tag) alias = relationship(DBTagAlias, backref="alias", primaryjoin=(tag_id == DBTagAlias.tag1) or (tag_id == DBTagAlias.tag2)) Base.metadata.create_all(bind=engine) class Database: __db: sqlalchemy.orm.Session = None def __get_db(self) -> sqlalchemy.orm.Session: Database.__db = SessionLocal() if Database.__db is None else Database.__db try: return Database.__db finally: pass def __del__(self): Database.__db.close() Database.__db = None # Art def get_art_list(self) -> List[DBArt]: return self.__get_db().query(DBArt).all() # TODO FIX StackOverflow def get_art_by_id(self, art_id: int) -> DBArt: return self.__get_db().query(DBArt).where(DBArt.id == art_id).first() def get_art_by_hash(self, md5_hash: str) -> DBArt: return self.__get_db().query(DBArt).filter(func.lower(DBArt.md5_hash) == md5_hash.lower()).first() def create_art_by_model(self, art: ArtnoID): if not (isinstance(art.path, str) and len(art.path) > 0): raise ValueError("New Art must contain a path!") db_art = DBArt(title=art.title, md5_hash=art.hash, path=art.path, link=art.link, description=art.description) if self.get_art_by_hash(art.hash) is not None: raise ValueError("New Art must not contain already known hashes! Is this really new?") db = self.__get_db() db.add(db_art) db.commit() if art.presences is not None: for presence in art.presences: if self.get_presence(presence.name, presence.domain) is None: raise ValueError("Used presence did not exist!") db_art_presence = DBArt2Presence(presence_name=presence.name, presence_domain=presence.domain, art_id=db_art.id) db.add(db_art_presence) db.commit() return db_art @DeprecationWarning def update_art_by_id(self, art_id: int, title: str, path: str, tags: list, link: str, md5_hash: str, authors: list = None): raise NotImplementedError @DeprecationWarning def update_art_by_hash(self, md5_hash: str, title: str, path: str, tags: list, link: str, authors: list = None): raise NotImplementedError def update_art_by_model(self, art: Art): db_art: DBArt = self.get_art_by_id(art.id) db_art.title = art.title if art.title is not None else db_art.title db_art.link = art.link if art.link is not None else db_art.link db_art.md5_hash = art.hash if art.hash is not None else db_art.md5_hash db_art.path = art.path if art.path is not None else db_art.path db_art.description = art.description if art.description is not None else db_art.description if art.presences is not None: for presence in art.presences: db_art.presences.append(self.get_presence(presence.name, presence.domain)) if art.collections is not None: for collection in art.collections: db_art.collections.append(self.get_collection_by_id(collection.id)) self.__get_db().commit() def delete_art_by_id(self, art_id: int): db_art: DBArt = self.get_art_by_id(art_id) self.__get_db().delete(db_art) self.__get_db().commit() @DeprecationWarning def delete_art_by_hash(self, md5_hash: str): raise NotImplementedError # Art -> Presences def get_art_presences_by_id(self, art_id: int) -> List[DBArt2Presence]: result = self.__get_db().query(DBArt2Presence).filter(DBArt2Presence.art_id == art_id).all() return result def get_art_presences_by_hash(self, md5_hash: str): result = self.__get_db().query(DBArt2Presence).join(DBArt).filter(DBArt.md5_hash == md5_hash).all() return result @DeprecationWarning # is this actually needed? superceeded by updating the art.presence field on art def update_art_presences_by_id(self, art_id: int, presences: List[Presence]): # presences = [("name", "domain"),(...)] """ Creates an art-presence relation for every presence listed :param art_id: :param presences: :return: """ db = self.__get_db() for presence in presences: art2presence = DBArt2Presence() art2presence.art_id = art_id art2presence.presence_name = presence.name art2presence.presence_domain = presence.domain db.add(art2presence) db.commit() def update_art_presences_by_hash(self, md5_hash: int, presences: list): # presences = [("name", "domain"),(...)] raise NotImplementedError def delete_art_presences_by_id(self, art_id: int, presence_name: str, presence_domain: str): art2presences = self.get_art_presences_by_id(art_id) art2presence = None for rel in art2presences: if rel.presence_name.strip() == presence_name.strip() and \ rel.presence_domain.strip() == presence_domain.strip(): art2presence = rel if art2presence is None: raise ValueError("Unknown art-presence relation") self.__get_db().delete(art2presence) self.__get_db().commit() def delete_art_presences_by_hash(self, md5_hash: str, presence_name: str, presence_domain: str): raise NotImplementedError # Presence def get_presence_list(self) -> List[DBPresence]: return self.__get_db().query(DBPresence).all() # TODO fix StackOverflow def get_presence(self, name: str, domain: str) -> DBPresence: result = self.__get_db().query(DBPresence) \ .filter(func.lower(DBPresence.name) == name.lower() and func.lower(DBPresence.domain) == domain.lower()).first() return result def create_presence(self, name: str, domain: str, artist_id: int, link: str) -> DBPresence: if not (len(name) > 0 and len(domain) > 0): print(f"Name: \"{name}\" Domain: \"{domain}\"") raise ValueError("New Presence must have some name and domain!") db_presence = DBPresence(name=name, domain=domain, artist_id=artist_id, link=link) db = self.__get_db() db.add(db_presence) db.commit() return db_presence def update_presence(self, name: str, domain: str, artist_id: int, link: str): db_presence = self.get_presence(name=name, domain=domain) db_presence.link = link db_presence.artist_id = artist_id self.__get_db().commit() def delete_presence(self, name: str, domain: str): db_presence = self.get_presence(name=name, domain=domain) self.__get_db().delete(db_presence) self.__get_db().commit() # Artist -> Presence def get_artist_presences(self, artist_id: int): result = self.__get_db().query(DBPresence).filter(DBPresence.artist_id == artist_id).all() return result # Artist def get_artist_list(self): return self.__get_db().query(DBArtist).all() # TODO fix StackOverflow def get_artist(self, artist_id: int) -> DBArtist: result = self.__get_db().query(DBArtist).where(DBArtist.id == artist_id).first() return result def create_artist(self, name: str, topics: List[int]) -> DBArtist: db_artist = DBArtist(name=name, topics=topics) db = self.__get_db() db.add(db_artist) db.commit() return db_artist def search_artist(self, name: str) -> Artist: result = self.__get_db().query(DBArtist).where(DBArtist.description == name).all() return result def update_artist(self, artist_id: int, name: str = None, topics: List[int] = None): db_artist = self.get_artist(artist_id) if name is not None: db_artist.description = name if topics is not None: db_artist.topics = topics self.__get_db().commit() def delete_artist(self, artist_id: int): db_artist = self.get_artist(artist_id) self.__get_db().delete(db_artist) self.__get_db().commit() # TODO FIX sqlalchemy.exc.IntegrityError: (psycopg2.errors.ForeignKeyViolation) # TODO update or delete on table "artist" violates foreign key constraint "presence_artist_id_fkey" on table "presence" # TODO DETAIL: Key (id)=(83) is still referenced from table "presence". # Topic def get_topic_list(self): return self.__get_db().query(DBTopic).all() # TODO fix StackOverflow def get_topic_by_id(self, id: int) -> DBTopic: result = self.__get_db().query(DBTopic).filter(DBTopic.id == id).first() return result def get_topic_by_name(self, name: str) -> DBTopic: result = self.__get_db().query(DBTopic).filter(func.lower(DBTopic.name) == name.lower()).first() return result def create_topic_by_model(self, topic: TopicNoId): if not isinstance(topic.name, str) and len(topic.name) > 2: raise ValueError("Topic name must be at least 2 letters long!") if self.get_topic_by_name(topic.name) is not None: raise ValueError(f"Topic name must be unique! '{topic.name}' already exists!") db_topic = DBTopic(name=topic.name, description=topic.description) db = self.__get_db() db.add(db_topic) db.commit() return db_topic def update_topic_by_model(self, topic: Topic): db_topic = self.get_topic_by_id(topic.id) db_topic.name = topic.name db_topic.description = topic.description self.__get_db().commit() return db_topic def delete_topic_by_id(self, id: int): db_topic = self.get_topic_by_id(id) self.__get_db().delete(db_topic) self.__get_db().commit() def delete_topic_by_name(self, name: str): raise NotImplementedError # Artist -> Topic def get_artist_topics(self, artist_id: int): result = self.__get_db().query(DBTopic).filter(DBTopic.artists.any(id=artist_id)).all() return result def update_artist_topics(self, artist_id: int, topic_ids: list): raise NotImplementedError def delete_artist_topics(self, artist_id: int, topic_ids: list): raise NotImplementedError # Topic -> Artist def get_topic_artists(self, topic_id: int): result = self.__get_db().query(DBArtist).filter(DBArtist.topics.any(id=topic_id)).all() return result def update_topic_artists(self, topic_id: int, artists: list): raise NotImplementedError def delete_topic_artists(self, topic_id: int): # deletes only the connections, not the artists raise NotImplementedError # Tag def get_tag_list(self): return self.__get_db().query(DBTag).all() # TODO fix StackOverflow def get_tag_by_id(self, tag_id: int) -> DBTag: result = self.__get_db().query(DBTag).where(tag_id == DBTag.tag_id).first() return result def get_tag_by_name(self, name: str) -> DBTag: result = self.__get_db().query(DBTag).filter(func.lower(DBTag.name) == name.lower()).first() return result def create_tag_by_model(self, tag: TagNoID): db_tag = self.get_tag_by_name(tag.name) if db_tag is not None: raise ValueError("Tried to create new tag with a name that is already taken!") if tag.category_id is None: raise Exception("Tried to create tag with no category!") if tag.implications is None: tag.implications = [] if tag.aliases is None: tag.aliases = [] db_tag = DBTag(name=tag.name, description=tag.description, category_id=tag.category_id, implications=tag.implications, alias=tag.aliases) db = self.__get_db() db.add(db_tag) db.commit() # TODO check that alias is bidirectional automatically # e.g. Alias tag1 -> tag2 needs to also turn up as tag2 -> tag1 return db_tag def update_tag_by_model(self, tag: Tag): db_tag = self.get_tag_by_id(tag.tag_id) db_categ = self.get_category_by_id(tag.category_id) if db_categ is None: raise ValueError("The given category id was not found!") db_tag.name = tag.name if tag.name is not None else db_tag.name db_tag.description = tag.description if tag.description is not None else db_tag.description db_tag.category_id = tag.category_id if tag.category_id is not None else db_tag.category_id db_tag.alias = tag.aliases if tag.aliases is not None else db_tag.alias db_tag.implications = tag.implications if tag.implications is not None else db_tag.implications self.__get_db().commit() # TODO give feedback if alias, implication does not exist def delete_tag_by_id(self, tag_id: int): db_tag = self.get_tag_by_id(tag_id) if db_tag is None: raise ValueError("Could not find the tag to delete!") db = self.__get_db() db.delete(db_tag) db.commit() def delete_tag_by_model(self, tag: Tag): db_tag = self.get_tag_by_id(tag.tag_id) if db_tag is None: raise ValueError("Could not find the tag to delete!") db = self.__get_db() db.delete(db_tag) db.commit() def search_tag_by_name_fuzzy(self, search: str) -> list: # return a list of tags fitting the fuzzy name search result = self.__get_db().query(DBTag).filter(DBTag.name.ilike("%{}%".format(search))) \ .options(load_only("tag_id", "name")).all() return result # Tag -> Art def get_tag_art(self, tag_id: int): result = self.__get_db().query(DBArt).filter(DBArt.tags.any(tag_id=tag_id)).all() return result def update_tag_art(self, tag_id: int): # is this useful? raise NotImplementedError def delete_tag_art(self): # deletes only the connections, not the art raise NotImplementedError # Category -> Tag def get_category_tags(self, category_id: int): result = self.__get_db().query(DBTag).where(DBTag.category_id == category_id).all() return result def update_category_tags(self, category_id: int, tags: list): raise NotImplementedError def delete_category_tags(self, category_id: int): raise NotImplementedError # Category def get_category_list(self) -> List[DBTagCategory]: result = self.__get_db().query(DBTagCategory).all() # TODO fix StackOverflow return result def get_category_by_id(self, category_id: int) -> DBTagCategory: result = self.__get_db().query(DBTagCategory).where(DBTagCategory.category_id == category_id).first() return result def get_category_by_name(self, name: str) -> DBTagCategory: result = self.__get_db().query(DBTagCategory).filter(func.lower(DBTagCategory.name) == name.lower()).first() return result def create_category_by_model(self, category: TagCategorynoID) -> DBTagCategory: if not (isinstance(category.name, str) and len(category.name) > 0): raise ValueError("New Category must bear a name!") if self.get_category_by_name(category.name) is not None: raise ValueError("New Tag Category must not contain an already used name! Is this a duplicate?") db_category = DBTagCategory(name=category.name) db = self.__get_db() db.add(db_category) db.commit() return db_category def update_category_by_model(self, category: TagCategory): db_category: DBTagCategory = self.get_category_by_id(category_id=category.category_id) if self.get_category_by_name(category.name) is not None: raise ValueError("New Tag Category must not contain an already used name! " f"Is this ({category.name}) a duplicate?") db_category.name = category.name self.__get_db().commit() def delete_category_by_id(self, category_id: int): db_category: DBTagCategory = self.get_category_by_id(category_id) if db_category is None: raise ValueError(f"Tried to delete unknown category. ID ({category_id}) was not found!") try: self.__get_db().delete(db_category) self.__get_db().commit() except psycopg2.IntegrityError as e: print(e) # Collection def get_collection_list(self) -> List[DBCollection]: result = self.__get_db().query(DBCollection).all() # TODO fix StackOverflow return result def get_collection_by_id(self, id: int): result = self.__get_db().query(DBCollection).where(DBCollection.id == id).first() return result def get_collections_by_name(self, name: str): result = self.__get_db().query(DBCollection).where(DBCollection.name == name) return result def create_collection_by_model(self, coll: CollectionNoID): db_collection = DBCollection(name=coll.name, description=coll.description) self.__get_db().add(db_collection) self.__get_db().commit() return db_collection def update_collection_by_model(self, coll: Collection): db_collection = self.get_collection_by_id(coll.id) if db_collection is None: raise ValueError("Could not find a collection with this ID!") db_collection.name = coll.name db_collection.description = coll.description self.__get_db().commit() def delete_collection_by_id(self, id: int): db_collection = self.get_collection_by_id(id) if db_collection is None: raise ValueError("Could not find a collection with this ID!") self.__get_db().delete(db_collection) self.__get_db().commit() # Art -> Collection def get_collection_art(self, collection_id: int): return self.__get_db().query(DBCollection2Art).filter(DBCollection2Art.collection_id == collection_id).all() # Art <-> Collection def get_art_collection_relation(self, art_id: int, collection_id: int) -> DBCollection2Art: return self.__get_db().query(DBCollection2Art).filter(DBCollection2Art.collection_id == collection_id, DBCollection2Art.art_id == art_id).first() def create_art_collection_relation(self, art2collection: Art2CollRelation): db_art2collection = DBCollection2Art(art_id=art2collection.art_id, collection_id=art2collection.collection_id, ranking=art2collection.ranking) self.__get_db().add(db_art2collection) self.__get_db().commit() def update_art_collection_relation_by_model(self, art_collection_relation: Art2CollRelation): db_art2collection = self.get_art_collection_relation(art_id=art_collection_relation.art_id, collection_id=art_collection_relation.collection_id) db_art2collection.ranking = art_collection_relation.ranking self.__get_db().commit() def delete_art_collection_relation(self, art_id: int, collection_id: int): db_art2collection = self.get_art_collection_relation(art_id=art_id, collection_id=collection_id) if db_art2collection is None: raise ValueError("Could not find art-collection relation with these IDs!") self.__get_db().delete(db_art2collection) self.__get_db().commit() # Collection -> Art def get_art_collections(self, art_id: int): return self.__get_db().query(DBCollection2Art).filter( DBCollection2Art.art_id == art_id).all() # TODO throws error # TODO SAWarning: SELECT statement has a cartesian product between FROM element(s) "art_to_art_collection" and FROM element "art_collection". Apply join condition(s) between each element to resolve. # return self.__get_db().query(DBCollection).filter(DBCollection2Art.art_id == art_id).all()