You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

389 lines
16 KiB
Python

import shutil
import sys
import os
from hashlib import md5
from PyQt5.QtWidgets import QApplication
from ArtNet.db.db_adapter import DBAdapter
from ArtNet.file.file_reader import FileReader
from ArtNet.file.config_reader import ConfigReader
from ArtNet.gui.window import Window
from ArtNet.gui.dialogs.db_connection_dialog.db_dialog import DBDialog
from ArtNet.web.link_generator import LinkGenerator
class ArtNetManager:
def __init__(self, config_location: str = "."):
self.config = ConfigReader(config_location, "somePassword")
if self.config.data["version"] != self.config.CONFIG_VERSION:
print("Loaded config version is unequal to expected version! {0} (current) != {1} (expected)"
.format(self.config.data["version"], self.config.CONFIG_VERSION))
self.db_connection = None
self.__app = QApplication(sys.argv)
while self.db_connection is None:
try:
self.db_connection = self.create_db_connection(user=self.config.data["db"]["user"],
password=self.config.data["db"]["password"],
host=self.config.data["db"]["host"],
port=self.config.data["db"]["port"],
database=self.config.data["db"]["database"])
except ValueError as e: # db connection didn't work
print(e)
dialog = DBDialog()
prev_db_data = self.get_db_connection_details()
dialog.ui.user_line_edit.setText(prev_db_data["user"])
dialog.ui.password_line_edit.setText(prev_db_data["password"])
dialog.ui.host_line_edit.setText(prev_db_data["host"])
dialog.ui.database_line_edit.setText(prev_db_data["database"])
dialog.ui.port_line_edit.setText(str(prev_db_data["port"]))
db_data: dict = dialog.exec_()
if len(db_data.keys()) == 0:
return
self.change_db_connection(host=db_data["host"], port=db_data["port"],
user=db_data["user"], password=db_data["password"],
database=db_data["database"])
self.window = Window(self)
if len(self.config.data["file_root"]) == 0 or not os.path.isdir(self.config.data["file_root"]): # no file_root given by config or invalid
print("Querying for new file root due to lack of valid one ...")
self.window.on_artnet_root_change_clicked()
self.__file_reader = FileReader(self.config.data["file_root"])
self.curr_image_index: int = None
self.all_images: list = None
self.update_all_images_list()
self.window.set_temporary_status_message("Hello o7", 5000)
def run(self):
if len(self.all_images) > 0:
self.curr_image_index = 0
self.change_image()
self.window.show()
sys.exit(self.__app.exec_())
def update_all_images_list(self):
"""
Updates the internal list of all images in the artnet root
:return:
"""
images = []
artist_list = self.__file_reader.list_artists()
artist_list.sort()
for artist in artist_list:
for image in self.__file_reader.get_files(artist):
images.append(image)
self.all_images = images
self.known_image_amount = 0
for image in self.all_images:
if self.db_connection.get_art_by_path(image) is not None:
self.known_image_amount += 1
def scrape_tags(self, file_name: str, art_ID: int, url: str=None):
"""
Scrape the tags from the given url and return which one's are new
:param file_name:
:param art_ID:
:param url:
:return:
"""
if url is None:
return None
#tags = ArtNet.web.Scrap_Tags.scrap_tags(file_name, url, ArtNet.web.link_generator.predict_domain(file_name))
tags = LinkGenerator.get_instance().scrape_tags(url, LinkGenerator.get_instance().predict_domain(file_name))
if tags is None:
return None
already_applied_tags = self.db_connection.get_art_tags_by_ID(art_ID)
for i in range(len(already_applied_tags)):
already_applied_tags[i] = self.db_connection.get_tag_by_ID(already_applied_tags[i])[0][1].strip()
importable_tags = []
importable_artists = []
scraped_tags = []
for i in tags.values():
scraped_tags += i
for tag in scraped_tags:
result = self.db_connection.get_tag_by_name(tag)
if len(result) == 0: # tag does not exist yet
if tag in tags['artists']:
importable_artists.append(tag)
continue
importable_tags.append(tag)
continue
if tag in already_applied_tags: # tag is already applied
continue
result = self.db_connection.get_tag_impliers(tag)
if len(result) != 0: # tag is implied by some other tag
continue
result = self.db_connection.get_tag_aliases_by_name(tag)
if len(result) != 0: # tag is already tagged by an alias
continue
if tag in tags['artists']:
importable_artists.append(tag)
continue
importable_tags.append(tag) # tag must be known and not tagged yet
return importable_tags, importable_artists
def import_tags(self, art_ID: int, tags):
"""
Add a list of given tags to the specified art_ID
:param art_ID:
:param tags:
:return:
"""
for tag in tags:
if len(self.db_connection.get_tag_by_name(tag)) == 0: # tag doesn't exist yet
result = self.window.force_edit_tag_dialog(name=tag)
if result is None: # tag creation was aborted
continue
tag = result['name'] # overwrite with possibly new tag name
tag = self.db_connection.get_tag_by_name(tag)[0][0]
self.window.curr_tags.append(tag)
self.window.data_changed = True
def get_md5_of_image(self, path: str):
"""
Calculate the md5 hash of the image given by the path.
:param path: assumed to be relative e.g. artist_A/image.jpg
:return:
"""
md5_hash = md5()
try:
with open(self.get_root() + os.path.sep + path,
"rb") as file: # read file part by part because of potential memory limits
for chunk in iter(lambda: file.read(4096), b""):
md5_hash.update(chunk)
return md5_hash.hexdigest()
except FileNotFoundError:
self.update_all_images_list()
return None
def delete_image(self, path: str, delete_instead_of_move: bool = False, trash_bin_folder_path: str = "trash"):
"""
Deletes the image from the root folder
:param path:
:param delete_instead_of_move:
:param trash_bin_folder_path:
:return:
"""
full_art_path = self.get_root() + os.path.sep + path
if delete_instead_of_move:
print(f"Deleting the actual file {full_art_path} is disabled for now")
#os.remove(full_art_path)
#return
trash_dst = trash_bin_folder_path + os.path.sep + path
t_splits = trash_dst.split(os.path.sep)
t_path = ""
for i in range(len(t_splits)-1):
t_path += os.path.sep + t_splits[i]
t_path = t_path[1:]
if not os.path.exists(t_path):
print(f"{t_path} did not exist and will be created!")
os.makedirs("."+os.path.sep+t_path)
print(f"Moving image {full_art_path} to {trash_dst}")
shutil.move(full_art_path, trash_dst)
self.update_all_images_list()
while self.curr_image_index >= len(self.all_images):
self.curr_image_index -= 1
@DeprecationWarning
def recalculate_hash_for_known_images(self):
"""
Calculate and write the md5 hash for every image known to the database.
Important: For migration purposes only
:return:
"""
for i in range(len(self.all_images)):
image_db_result = self.db_connection.get_art_by_path(self.all_images[i])
file_name = os.path.basename(self.all_images[i])
if image_db_result is None: # unknown image, skip
continue
hash_digest = self.get_md5_of_image(self.all_images[i])
authors = self.db_connection.get_authors_of_art_by_ID(image_db_result["ID"])
tag_ids = self.db_connection.get_art_tags_by_ID(art_ID=image_db_result["ID"])
tags = []
for tag_id in tag_ids:
tags.append(self.db_connection.get_tag_by_ID(tag_id)[0][1].strip())
if image_db_result["title"] == file_name or len(image_db_result["title"]) == 0:
image_db_result["title"] = None
self.db_connection.save_image(ID=image_db_result["ID"], path=image_db_result["path"],
title=image_db_result["title"],
link=image_db_result["link"], authors=authors, md5_hash=hash_digest,
tags=tags)
def get_next_unknown_image(self) -> int:
"""
Search all images for the next image not known to the database in ascending order.
:return: index of the next unknown image, None if there is no unknown image
"""
next_unknown: int = None
curr_searched_image_index = self.curr_image_index + 1
finished_loop = False
while next_unknown is None and not finished_loop:
if curr_searched_image_index >= len(self.all_images): # wrap around to the start
curr_searched_image_index = 0
if curr_searched_image_index == self.curr_image_index: # searched all images, nothing unknown
break
image_db_result = self.db_connection.get_art_by_path(self.all_images[curr_searched_image_index])
if image_db_result is None:
image_db_result = self.db_connection.get_art_by_hash(
self.get_md5_of_image(self.all_images[curr_searched_image_index])
)
if image_db_result is None: # image is unknown to database
image_db_result = self.db_connection.get_art_by_hash(
self.get_md5_of_image(self.all_images[curr_searched_image_index]))
if image_db_result is None:
next_unknown = curr_searched_image_index
break
curr_searched_image_index += 1
if next_unknown:
return curr_searched_image_index
else:
return None
def get_prev_unknown_image(self) -> int:
"""
Search all images for the next image not known to the databse in descending order.
:return:index of the next unknown image, None if there is no unknown image
"""
next_unknown: int = None
curr_searched_image_index = self.curr_image_index - 1
finished_loop = False
while next_unknown is None and not finished_loop:
if curr_searched_image_index <= 0: # wrap around to the end
curr_searched_image_index = len(self.all_images) - 1
if curr_searched_image_index == self.curr_image_index: # searched all images, nothing unknown
break
image_db_result = self.db_connection.get_art_by_path(self.all_images[curr_searched_image_index])
if image_db_result is None: # image is unknown to database
image_db_result = self.db_connection.get_art_by_hash(
self.get_md5_of_image(self.all_images[curr_searched_image_index]))
if image_db_result is None:
next_unknown = curr_searched_image_index
break
curr_searched_image_index -= 1
if next_unknown:
return curr_searched_image_index
else:
return None
def change_image(self):
"""
Refresh the image display to show the most current data according to the selected image
:return:
"""
self.window.setting_up_data = True
image_db_result = self.db_connection.get_art_by_hash(
self.get_md5_of_image(self.all_images[self.curr_image_index])
)
#image_db_result = self.db_connection.get_art_by_path(self.all_images[self.curr_image_index])
s = self.all_images[self.curr_image_index].split(os.path.sep)
if image_db_result is not None:
image_title = image_db_result["title"] if isinstance(image_db_result["title"], str) and \
len(image_db_result["title"]) > 0 else self.window.UNKNOWN_TITLE
image_author = self.db_connection.get_authors_of_art_by_ID(image_db_result["ID"])
image_link = image_db_result["link"]
art_ID = image_db_result["ID"]
if image_author is None or len(image_author) == 0:
image_author = [(self.all_images[self.curr_image_index].split(os.path.sep)[0], "(Not in Database)")]
else:
art_ID = None
image_author = [(s[0], "(Not in Database)")]
image_title = s[-1] + " (Not in Database)"
image_link = "(Unknown)"
print("Displaying", self.all_images[self.curr_image_index])
self.window.display_image(image_title, image_author,
os.path.join(self.config.data["file_root"], self.all_images[self.curr_image_index]),
self.all_images[self.curr_image_index],
art_ID, image_link, file_name=s[-1])
self.window.set_tag_list([self.db_connection.get_tag_by_ID(x)[0][1].strip() for x in self.db_connection.get_art_tags_by_ID(art_ID)])
self.window.data_changed = False
self.window.setting_up_data = False
def create_db_connection(self, host: str, port: int, database: str, user: str, password: str) -> DBAdapter:
print("Changing db connection to ".format(host, port, user, password))
return DBAdapter(user=user, password=password, host=host, port=port, database=database)
def get_root(self) -> str:
return self.config.data["file_root"]
def change_root(self, path: str):
"""
Change the Artnet root path and confirm it is working.
Rewrite config there
:param path:
:return:
"""
if len(path) == 0:
exit(0)
print("Changing root to", path)
self.config.data["file_root"] = path
self.config.update_config()
def get_db_connection_details(self) -> dict:
return self.config.data["db"]
def change_db_connection(self, host: str, port: int, database: str, user: str, password: str):
self.config.data["db"]["user"] = user
self.config.data["db"]["password"] = password
self.config.data["db"]["host"] = host
self.config.data["db"]["database"] = database
self.config.data["db"]["port"] = str(port)
self.config.update_config()
self.db_connection = self.create_db_connection(host, port, database, user, password)