You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
678 lines
26 KiB
Python
678 lines
26 KiB
Python
from typing import List
|
|
|
|
import psycopg2
|
|
from sqlalchemy import Column, Boolean, Float, String, Integer, ForeignKey, Table, func, ForeignKeyConstraint
|
|
import sqlalchemy.orm as db
|
|
import sqlalchemy
|
|
from sqlalchemy.orm import declarative_base, relationship, load_only
|
|
|
|
from database.models import Art, ArtnoID, Artist, Presence, TagCategory, TagCategorynoID, TagNoID, Tag, \
|
|
Topic, TopicNoId, Collection, CollectionNoID, Art2CollRelationNoID, Art2CollRelation
|
|
|
|
# TODO read sensitive data from environment file or similar
|
|
SQLALCHEMY_DATABASE_URL = "postgresql://artnet_editor:G606Rm9sFEXe6wfTLxVu@[::1]/test_artnet"
|
|
engine = sqlalchemy.create_engine(SQLALCHEMY_DATABASE_URL, echo=True)
|
|
SessionLocal = sqlalchemy.orm.sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
Base = sqlalchemy.orm.declarative_base()
|
|
|
|
art_to_tag_table = Table('art_to_tag', Base.metadata,
|
|
Column('art_id', Integer, ForeignKey('art.id')),
|
|
Column('tag_id', Integer, ForeignKey('tag.id')))
|
|
|
|
"""art_to_art_collection = Table('art_to_art_collection', Base.metadata,
|
|
Column('collection_id', Integer, ForeignKey('art_collection.id')),
|
|
Column('art_id', Integer, ForeignKey('art.id')),
|
|
Column('ranking', String))"""
|
|
|
|
artist_to_topic_table = Table('artist_to_topic', Base.metadata,
|
|
Column('artist_id', Integer, ForeignKey('artist.id')),
|
|
Column('topic_id', Integer, ForeignKey('topic.id')))
|
|
|
|
|
|
class DBTagImplication(Base):
|
|
__tablename__ = 'tag_implication'
|
|
|
|
root_tag = Column(Integer, ForeignKey('tag.id'), primary_key=True)
|
|
implicate = Column(Integer, ForeignKey('tag.id'), primary_key=True)
|
|
|
|
|
|
class DBTagAlias(Base):
|
|
__tablename__ = 'tag_alias'
|
|
|
|
tag1 = Column(Integer, ForeignKey('tag.id'), primary_key=True)
|
|
tag2 = Column(Integer, ForeignKey('tag.id'), primary_key=True)
|
|
|
|
|
|
class DBPresence(Base):
|
|
__tablename__ = "presence"
|
|
|
|
name = Column(String(20), primary_key=True)
|
|
domain = Column(String(20), primary_key=True)
|
|
link = Column(String, nullable=True)
|
|
artist_id = Column(Integer, ForeignKey('artist.id', ondelete="CASCADE"), nullable=False)
|
|
|
|
artist = relationship("DBArtist", back_populates="presences")
|
|
arts = relationship("DBArt", secondary="art_to_presence", back_populates="presences",
|
|
cascade="delete, all")
|
|
|
|
|
|
class DBArt(Base):
|
|
__tablename__ = 'art'
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
md5_hash = Column(String(32), nullable=False, unique=True)
|
|
path = Column(String, nullable=False, unique=True)
|
|
title = Column(String, nullable=True)
|
|
link = Column(String, nullable=True)
|
|
description = Column(String, nullable=True)
|
|
|
|
tags = relationship("DBTag", secondary=art_to_tag_table, back_populates="art", viewonly=True)
|
|
collections = relationship("DBCollection", secondary="art_to_art_collection", back_populates="art")
|
|
presences = relationship("DBPresence", secondary="art_to_presence", back_populates="arts")
|
|
|
|
|
|
class DBArt2Presence(Base):
|
|
__tablename__ = "art_to_presence"
|
|
|
|
presence_name = Column(String(20), primary_key=True)
|
|
presence_domain = Column(String(20), primary_key=True)
|
|
art_id = Column(Integer, ForeignKey(DBArt.id, ondelete="CASCADE"), primary_key=True)
|
|
|
|
__table_args__ = (
|
|
ForeignKeyConstraint(
|
|
("presence_name", "presence_domain"),
|
|
["presence.name", "presence.domain"]
|
|
),
|
|
)
|
|
|
|
|
|
class DBArtist(Base):
|
|
__tablename__ = "artist"
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
name = Column(String, nullable=False)
|
|
|
|
topics = relationship("DBTopic", secondary="artist_to_topic", back_populates="artists", viewonly=True)
|
|
presences = relationship("DBPresence", back_populates="artist", cascade="all, delete")
|
|
|
|
|
|
class DBCollection(Base):
|
|
__tablename__ = "art_collection"
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
name = Column(String, unique=True, nullable=False)
|
|
description = Column(String)
|
|
|
|
art = relationship("DBArt", secondary="art_to_art_collection", back_populates="collections")
|
|
|
|
|
|
class DBCollection2Art(Base):
|
|
__tablename__ = "art_to_art_collection"
|
|
|
|
collection_id = Column(Integer, ForeignKey(DBCollection.id, ondelete="CASCADE"), primary_key=True)
|
|
art_id = Column(Integer, ForeignKey(DBArt.id, ondelete="CASCADE"), primary_key=True)
|
|
ranking = Column(String)
|
|
|
|
|
|
class DBTopic(Base): # as of now unused
|
|
__tablename__ = "topic"
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
name = Column(String(20), unique=True, nullable=False)
|
|
description = Column(String)
|
|
|
|
artists = relationship("DBArtist", secondary="artist_to_topic", back_populates="topics", viewonly=True)
|
|
|
|
|
|
class DBTagCategory(Base):
|
|
__tablename__ = "tag_category"
|
|
|
|
category_id = Column(Integer, primary_key=True)
|
|
name = Column(String(20), nullable=False)
|
|
|
|
tags = relationship("DBTag", back_populates="category", cascade="all, delete, delete-orphan")
|
|
|
|
|
|
class DBTag(Base):
|
|
__tablename__ = "tag"
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
name = Column(String(50), unique=True)
|
|
description = Column(String)
|
|
category_id = Column(Integer, ForeignKey('tag_category.category_id'))
|
|
|
|
category = relationship("DBTagCategory", back_populates='tags', foreign_keys=[category_id])
|
|
# TODO check if cascade is required
|
|
art = relationship("DBArt", secondary=art_to_tag_table, back_populates="tags")
|
|
# TODO check if cascade is required
|
|
implications = relationship(DBTagImplication, backref="implied_by",
|
|
primaryjoin=id == DBTagImplication.root_tag,
|
|
cascade="all, delete, delete-orphan")
|
|
alias = relationship(DBTagAlias, backref="alias",
|
|
primaryjoin=(id == DBTagAlias.tag1) or (id == DBTagAlias.tag2),
|
|
cascade="all, delete, delete-orphan")
|
|
|
|
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
|
|
class Database:
|
|
__db: sqlalchemy.orm.Session = None
|
|
|
|
def __get_db(self) -> sqlalchemy.orm.Session:
|
|
Database.__db = SessionLocal() if Database.__db is None else Database.__db
|
|
try:
|
|
return Database.__db
|
|
finally:
|
|
pass
|
|
|
|
def __del__(self):
|
|
Database.__db.close()
|
|
Database.__db = None
|
|
|
|
# Art
|
|
|
|
def get_art_list(self) -> List[DBArt]:
|
|
return self.__get_db().query(DBArt).all() # TODO FIX StackOverflow
|
|
|
|
def get_art_by_id(self, art_id: int) -> DBArt:
|
|
return self.__get_db().query(DBArt).where(DBArt.id == art_id).first()
|
|
|
|
def get_art_by_hash(self, md5_hash: str) -> DBArt:
|
|
return self.__get_db().query(DBArt).filter(func.lower(DBArt.md5_hash) == md5_hash.lower()).first()
|
|
|
|
def create_art_by_model(self, art: ArtnoID):
|
|
if not (isinstance(art.path, str) and len(art.path) > 0):
|
|
raise ValueError("New Art must contain a path!")
|
|
|
|
db_art = DBArt(title=art.title, md5_hash=art.hash, path=art.path, link=art.link, description=art.description)
|
|
|
|
if self.get_art_by_hash(art.hash) is not None:
|
|
raise ValueError("New Art must not contain already known hashes! Is this really new?")
|
|
|
|
db = self.__get_db()
|
|
db.add(db_art)
|
|
db.commit()
|
|
|
|
if art.presences is not None:
|
|
for presence in art.presences:
|
|
if self.get_presence(presence.name, presence.domain) is None:
|
|
raise ValueError("Used presence did not exist!")
|
|
db_art_presence = DBArt2Presence(presence_name=presence.name, presence_domain=presence.domain,
|
|
art_id=db_art.id)
|
|
db.add(db_art_presence)
|
|
db.commit()
|
|
|
|
return db_art
|
|
|
|
@DeprecationWarning
|
|
def update_art_by_id(self, art_id: int, title: str, path: str, tags: list, link: str,
|
|
md5_hash: str, authors: list = None):
|
|
raise NotImplementedError
|
|
|
|
@DeprecationWarning
|
|
def update_art_by_hash(self, md5_hash: str, title: str, path: str, tags: list, link: str, authors: list = None):
|
|
raise NotImplementedError
|
|
|
|
def update_art_by_model(self, art: Art):
|
|
db_art: DBArt = self.get_art_by_id(art.id)
|
|
|
|
db_art.title = art.title if art.title is not None else db_art.title
|
|
db_art.link = art.link if art.link is not None else db_art.link
|
|
db_art.md5_hash = art.hash if art.hash is not None else db_art.md5_hash
|
|
db_art.path = art.path if art.path is not None else db_art.path
|
|
db_art.description = art.description if art.description is not None else db_art.description
|
|
|
|
if art.presences is not None:
|
|
for presence in art.presences:
|
|
db_art.presences.append(self.get_presence(presence.name, presence.domain))
|
|
|
|
if art.collections is not None:
|
|
for collection in art.collections:
|
|
db_art.collections.append(self.get_collection_by_id(collection.id))
|
|
|
|
self.__get_db().commit()
|
|
|
|
def delete_art_by_id(self, art_id: int):
|
|
db_art: DBArt = self.get_art_by_id(art_id)
|
|
self.__get_db().delete(db_art)
|
|
self.__get_db().commit()
|
|
|
|
@DeprecationWarning
|
|
def delete_art_by_hash(self, md5_hash: str):
|
|
raise NotImplementedError
|
|
|
|
# Art -> Presences
|
|
|
|
def get_art_presences_by_id(self, art_id: int) -> List[DBArt2Presence]:
|
|
result = self.__get_db().query(DBArt2Presence).filter(DBArt2Presence.art_id == art_id).all()
|
|
return result
|
|
|
|
def get_art_presences_by_hash(self, md5_hash: str):
|
|
result = self.__get_db().query(DBArt2Presence).join(DBArt).filter(DBArt.md5_hash == md5_hash).all()
|
|
return result
|
|
|
|
@DeprecationWarning # is this actually needed? superceeded by updating the art.presence field on art
|
|
def update_art_presences_by_id(self, art_id: int,
|
|
presences: List[Presence]): # presences = [("name", "domain"),(...)]
|
|
"""
|
|
Creates an art-presence relation for every presence listed
|
|
:param art_id:
|
|
:param presences:
|
|
:return:
|
|
"""
|
|
db = self.__get_db()
|
|
|
|
for presence in presences:
|
|
art2presence = DBArt2Presence()
|
|
art2presence.art_id = art_id
|
|
art2presence.presence_name = presence.name
|
|
art2presence.presence_domain = presence.domain
|
|
|
|
db.add(art2presence)
|
|
|
|
db.commit()
|
|
|
|
def update_art_presences_by_hash(self, md5_hash: int,
|
|
presences: list): # presences = [("name", "domain"),(...)]
|
|
raise NotImplementedError
|
|
|
|
def delete_art_presences_by_id(self, art_id: int, presence_name: str, presence_domain: str):
|
|
art2presences = self.get_art_presences_by_id(art_id)
|
|
|
|
art2presence = None
|
|
for rel in art2presences:
|
|
if rel.presence_name.strip() == presence_name.strip() and \
|
|
rel.presence_domain.strip() == presence_domain.strip():
|
|
art2presence = rel
|
|
|
|
if art2presence is None:
|
|
raise ValueError("Unknown art-presence relation")
|
|
|
|
self.__get_db().delete(art2presence)
|
|
self.__get_db().commit()
|
|
|
|
def delete_art_presences_by_hash(self, md5_hash: str, presence_name: str, presence_domain: str):
|
|
raise NotImplementedError
|
|
|
|
# Presence
|
|
|
|
def get_presence_list(self) -> List[DBPresence]:
|
|
return self.__get_db().query(DBPresence).all() # TODO fix StackOverflow
|
|
|
|
def get_presence(self, name: str, domain: str) -> DBPresence:
|
|
result = self.__get_db().query(DBPresence) \
|
|
.filter(func.lower(DBPresence.name) == name.lower() and
|
|
func.lower(DBPresence.domain) == domain.lower()).first()
|
|
return result
|
|
|
|
def create_presence(self, name: str, domain: str, artist_id: int, link: str) -> DBPresence:
|
|
if not (len(name) > 0 and len(domain) > 0):
|
|
print(f"Name: \"{name}\" Domain: \"{domain}\"")
|
|
raise ValueError("New Presence must have some name and domain!")
|
|
|
|
db_presence = DBPresence(name=name, domain=domain, artist_id=artist_id, link=link)
|
|
|
|
db = self.__get_db()
|
|
db.add(db_presence)
|
|
db.commit()
|
|
|
|
return db_presence
|
|
|
|
def update_presence(self, name: str, domain: str, artist_id: int, link: str):
|
|
db_presence = self.get_presence(name=name, domain=domain)
|
|
|
|
db_presence.link = link
|
|
db_presence.artist_id = artist_id
|
|
|
|
self.__get_db().commit()
|
|
|
|
def delete_presence(self, name: str, domain: str):
|
|
db_presence = self.get_presence(name=name, domain=domain)
|
|
self.__get_db().delete(db_presence)
|
|
self.__get_db().commit()
|
|
|
|
# Artist -> Presence
|
|
|
|
def get_artist_presences(self, artist_id: int):
|
|
result = self.__get_db().query(DBPresence).filter(DBPresence.artist_id == artist_id).all()
|
|
return result
|
|
|
|
# Artist
|
|
|
|
def get_artist_list(self):
|
|
return self.__get_db().query(DBArtist).all() # TODO fix StackOverflow
|
|
|
|
def get_artist(self, artist_id: int) -> DBArtist:
|
|
result = self.__get_db().query(DBArtist).where(DBArtist.id == artist_id).first()
|
|
return result
|
|
|
|
def create_artist(self, name: str, topics: List[int]) -> DBArtist:
|
|
db_artist = DBArtist(name=name, topics=topics)
|
|
|
|
db = self.__get_db()
|
|
db.add(db_artist)
|
|
db.commit()
|
|
|
|
return db_artist
|
|
|
|
def search_artist(self, name: str) -> Artist:
|
|
result = self.__get_db().query(DBArtist).where(DBArtist.description == name).all()
|
|
return result
|
|
|
|
def update_artist(self, artist_id: int, name: str = None, topics: List[int] = None):
|
|
db_artist = self.get_artist(artist_id)
|
|
|
|
if name is not None:
|
|
db_artist.description = name
|
|
if topics is not None:
|
|
db_artist.topics = topics
|
|
|
|
self.__get_db().commit()
|
|
|
|
def delete_artist(self, artist_id: int):
|
|
db_artist = self.get_artist(artist_id)
|
|
|
|
self.__get_db().delete(db_artist)
|
|
self.__get_db().commit() # TODO FIX sqlalchemy.exc.IntegrityError: (psycopg2.errors.ForeignKeyViolation)
|
|
# TODO update or delete on table "artist" violates foreign key constraint "presence_artist_id_fkey" on table "presence"
|
|
# TODO DETAIL: Key (id)=(83) is still referenced from table "presence".
|
|
|
|
# Topic
|
|
|
|
def get_topic_list(self):
|
|
return self.__get_db().query(DBTopic).all() # TODO fix StackOverflow
|
|
|
|
def get_topic_by_id(self, id: int) -> DBTopic:
|
|
result = self.__get_db().query(DBTopic).filter(DBTopic.id == id).first()
|
|
return result
|
|
|
|
def get_topic_by_name(self, name: str) -> DBTopic:
|
|
result = self.__get_db().query(DBTopic).filter(func.lower(DBTopic.name) == name.lower()).first()
|
|
return result
|
|
|
|
def create_topic_by_model(self, topic: TopicNoId):
|
|
if not isinstance(topic.name, str) and len(topic.name) > 2:
|
|
raise ValueError("Topic name must be at least 2 letters long!")
|
|
|
|
if self.get_topic_by_name(topic.name) is not None:
|
|
raise ValueError(f"Topic name must be unique! '{topic.name}' already exists!")
|
|
|
|
db_topic = DBTopic(name=topic.name, description=topic.description)
|
|
db = self.__get_db()
|
|
db.add(db_topic)
|
|
db.commit()
|
|
|
|
return db_topic
|
|
|
|
def update_topic_by_model(self, topic: Topic):
|
|
db_topic = self.get_topic_by_id(topic.id)
|
|
|
|
db_topic.name = topic.name
|
|
db_topic.description = topic.description
|
|
|
|
self.__get_db().commit()
|
|
|
|
return db_topic
|
|
|
|
def delete_topic_by_id(self, id: int):
|
|
db_topic = self.get_topic_by_id(id)
|
|
|
|
self.__get_db().delete(db_topic)
|
|
self.__get_db().commit()
|
|
|
|
def delete_topic_by_name(self, name: str):
|
|
raise NotImplementedError
|
|
|
|
# Artist -> Topic
|
|
|
|
def get_artist_topics(self, artist_id: int):
|
|
result = self.__get_db().query(DBTopic).filter(DBTopic.artists.any(id=artist_id)).all()
|
|
return result
|
|
|
|
def update_artist_topics(self, artist_id: int, topic_ids: list):
|
|
raise NotImplementedError
|
|
|
|
def delete_artist_topics(self, artist_id: int, topic_ids: list):
|
|
raise NotImplementedError
|
|
|
|
# Topic -> Artist
|
|
|
|
def get_topic_artists(self, topic_id: int):
|
|
result = self.__get_db().query(DBArtist).filter(DBArtist.topics.any(id=topic_id)).all()
|
|
return result
|
|
|
|
def update_topic_artists(self, topic_id: int, artists: list):
|
|
raise NotImplementedError
|
|
|
|
def delete_topic_artists(self, topic_id: int): # deletes only the connections, not the artists
|
|
raise NotImplementedError
|
|
|
|
# Tag
|
|
|
|
def get_tag_list(self):
|
|
return self.__get_db().query(DBTag).all() # TODO fix StackOverflow
|
|
|
|
def get_tag_by_id(self, tag_id: int) -> DBTag:
|
|
result = self.__get_db().query(DBTag).where(tag_id == DBTag.id).first()
|
|
return result
|
|
|
|
def get_tag_by_name(self, name: str) -> DBTag:
|
|
result = self.__get_db().query(DBTag).filter(func.lower(DBTag.name) == name.lower()).first()
|
|
return result
|
|
|
|
def create_tag_by_model(self, tag: TagNoID):
|
|
db_tag = self.get_tag_by_name(tag.name)
|
|
if db_tag is not None:
|
|
raise ValueError("Tried to create new tag with a name that is already taken!")
|
|
if tag.category_id is None:
|
|
raise Exception("Tried to create tag with no category!")
|
|
|
|
if tag.implications is None:
|
|
tag.implications = []
|
|
if tag.aliases is None:
|
|
tag.aliases = []
|
|
|
|
db_tag = DBTag(name=tag.name, description=tag.description, category_id=tag.category_id,
|
|
implications=tag.implications, alias=tag.aliases)
|
|
|
|
db = self.__get_db()
|
|
db.add(db_tag)
|
|
db.commit()
|
|
|
|
# TODO check that alias is bidirectional automatically
|
|
# e.g. Alias tag1 -> tag2 needs to also turn up as tag2 -> tag1
|
|
|
|
return db_tag
|
|
|
|
def update_tag_by_model(self, tag: Tag):
|
|
db_tag = self.get_tag_by_id(tag.id)
|
|
|
|
db_categ = self.get_category_by_id(tag.category_id)
|
|
if db_categ is None:
|
|
raise ValueError("The given category id was not found!")
|
|
|
|
db_tag.name = tag.name if tag.name is not None else db_tag.name
|
|
db_tag.description = tag.description if tag.description is not None else db_tag.description
|
|
db_tag.category_id = tag.category_id if tag.category_id is not None else db_tag.category_id
|
|
|
|
db_tag.alias = tag.aliases if tag.aliases is not None else db_tag.alias
|
|
db_tag.implications = tag.implications if tag.implications is not None else db_tag.implications
|
|
|
|
self.__get_db().commit() # TODO give feedback if alias, implication does not exist
|
|
|
|
def delete_tag_by_id(self, tag_id: int):
|
|
db_tag = self.get_tag_by_id(tag_id)
|
|
if db_tag is None:
|
|
raise ValueError("Could not find the tag to delete!")
|
|
|
|
db = self.__get_db()
|
|
db.delete(db_tag)
|
|
db.commit()
|
|
|
|
def delete_tag_by_model(self, tag: Tag):
|
|
db_tag = self.get_tag_by_id(tag.tag_id)
|
|
if db_tag is None:
|
|
raise ValueError("Could not find the tag to delete!")
|
|
|
|
db = self.__get_db()
|
|
db.delete(db_tag)
|
|
db.commit()
|
|
|
|
def search_tag_by_name_fuzzy(self, search: str) -> list: # return a list of tags fitting the fuzzy name search
|
|
result = self.__get_db().query(DBTag).filter(DBTag.name.ilike("%{}%".format(search))) \
|
|
.options(load_only("id", "name")).all()
|
|
return result
|
|
|
|
# Tag -> Art
|
|
|
|
def get_tag_art(self, tag_id: int):
|
|
result = self.__get_db().query(DBArt).filter(DBArt.tags.any(id=tag_id)).all()
|
|
return result
|
|
|
|
def update_tag_art(self, tag_id: int): # is this useful?
|
|
raise NotImplementedError
|
|
|
|
def delete_tag_art(self): # deletes only the connections, not the art
|
|
raise NotImplementedError
|
|
|
|
# Category -> Tag
|
|
|
|
def get_category_tags(self, category_id: int):
|
|
result = self.__get_db().query(DBTag).where(DBTag.category_id == category_id).all()
|
|
return result
|
|
|
|
def update_category_tags(self, category_id: int, tags: list):
|
|
raise NotImplementedError
|
|
|
|
def delete_category_tags(self, category_id: int):
|
|
raise NotImplementedError
|
|
|
|
# Category
|
|
def get_category_list(self) -> List[DBTagCategory]:
|
|
result = self.__get_db().query(DBTagCategory).all() # TODO fix StackOverflow
|
|
return result
|
|
|
|
def get_category_by_id(self, category_id: int) -> DBTagCategory:
|
|
result = self.__get_db().query(DBTagCategory).where(DBTagCategory.category_id == category_id).first()
|
|
return result
|
|
|
|
def get_category_by_name(self, name: str) -> DBTagCategory:
|
|
result = self.__get_db().query(DBTagCategory).filter(func.lower(DBTagCategory.name) == name.lower()).first()
|
|
return result
|
|
|
|
def create_category_by_model(self, category: TagCategorynoID) -> DBTagCategory:
|
|
if not (isinstance(category.name, str) and len(category.name) > 0):
|
|
raise ValueError("New Category must bear a name!")
|
|
if self.get_category_by_name(category.name) is not None:
|
|
raise ValueError("New Tag Category must not contain an already used name! Is this a duplicate?")
|
|
|
|
db_category = DBTagCategory(name=category.name)
|
|
|
|
db = self.__get_db()
|
|
db.add(db_category)
|
|
db.commit()
|
|
|
|
return db_category
|
|
|
|
def update_category_by_model(self, category: TagCategory):
|
|
db_category: DBTagCategory = self.get_category_by_id(category_id=category.category_id)
|
|
if self.get_category_by_name(category.name) is not None:
|
|
raise ValueError("New Tag Category must not contain an already used name! "
|
|
f"Is this ({category.name}) a duplicate?")
|
|
|
|
db_category.name = category.name
|
|
|
|
self.__get_db().commit()
|
|
|
|
def delete_category_by_id(self, category_id: int):
|
|
db_category: DBTagCategory = self.get_category_by_id(category_id)
|
|
if db_category is None:
|
|
raise ValueError(f"Tried to delete unknown category. ID ({category_id}) was not found!")
|
|
try:
|
|
self.__get_db().delete(db_category)
|
|
self.__get_db().commit()
|
|
except psycopg2.IntegrityError as e:
|
|
print(e)
|
|
|
|
# Collection
|
|
def get_collection_list(self) -> List[DBCollection]:
|
|
result = self.__get_db().query(DBCollection).all() # TODO fix StackOverflow
|
|
return result
|
|
|
|
def get_collection_by_id(self, id: int):
|
|
result = self.__get_db().query(DBCollection).where(DBCollection.id == id).first()
|
|
return result
|
|
|
|
def get_collections_by_name(self, name: str):
|
|
result = self.__get_db().query(DBCollection).where(DBCollection.name == name)
|
|
return result
|
|
|
|
def create_collection_by_model(self, coll: CollectionNoID):
|
|
db_collection = DBCollection(name=coll.name, description=coll.description)
|
|
|
|
self.__get_db().add(db_collection)
|
|
self.__get_db().commit()
|
|
|
|
return db_collection
|
|
|
|
def update_collection_by_model(self, coll: Collection):
|
|
db_collection = self.get_collection_by_id(coll.id)
|
|
if db_collection is None:
|
|
raise ValueError("Could not find a collection with this ID!")
|
|
|
|
db_collection.name = coll.name
|
|
db_collection.description = coll.description
|
|
|
|
self.__get_db().commit()
|
|
|
|
def delete_collection_by_id(self, id: int):
|
|
db_collection = self.get_collection_by_id(id)
|
|
if db_collection is None:
|
|
raise ValueError("Could not find a collection with this ID!")
|
|
|
|
self.__get_db().delete(db_collection)
|
|
self.__get_db().commit()
|
|
|
|
# Art -> Collection
|
|
def get_collection_art(self, collection_id: int):
|
|
return self.__get_db().query(DBCollection2Art).filter(DBCollection2Art.collection_id == collection_id).all()
|
|
|
|
# Art <-> Collection
|
|
|
|
def get_art_collection_relation(self, art_id: int, collection_id: int) -> DBCollection2Art:
|
|
return self.__get_db().query(DBCollection2Art).filter(DBCollection2Art.collection_id == collection_id,
|
|
DBCollection2Art.art_id == art_id).first()
|
|
|
|
def create_art_collection_relation(self, art2collection: Art2CollRelation):
|
|
db_art2collection = DBCollection2Art(art_id=art2collection.art_id,
|
|
collection_id=art2collection.collection_id,
|
|
ranking=art2collection.ranking)
|
|
|
|
self.__get_db().add(db_art2collection)
|
|
self.__get_db().commit()
|
|
|
|
def update_art_collection_relation_by_model(self, art_collection_relation: Art2CollRelation):
|
|
db_art2collection = self.get_art_collection_relation(art_id=art_collection_relation.art_id,
|
|
collection_id=art_collection_relation.collection_id)
|
|
|
|
db_art2collection.ranking = art_collection_relation.ranking
|
|
self.__get_db().commit()
|
|
|
|
def delete_art_collection_relation(self, art_id: int, collection_id: int):
|
|
db_art2collection = self.get_art_collection_relation(art_id=art_id, collection_id=collection_id)
|
|
if db_art2collection is None:
|
|
raise ValueError("Could not find art-collection relation with these IDs!")
|
|
|
|
self.__get_db().delete(db_art2collection)
|
|
self.__get_db().commit()
|
|
|
|
# Collection -> Art
|
|
def get_art_collections(self, art_id: int):
|
|
return self.__get_db().query(DBCollection2Art).filter(
|
|
DBCollection2Art.art_id == art_id).all()
|
|
# TODO throws error
|
|
# TODO SAWarning: SELECT statement has a cartesian product between FROM element(s) "art_to_art_collection" and FROM element "art_collection". Apply join condition(s) between each element to resolve.
|
|
# return self.__get_db().query(DBCollection).filter(DBCollection2Art.art_id == art_id).all()
|