import shutil import sys import os from hashlib import md5 from PyQt5.QtWidgets import QApplication from ArtNet.db.db_adapter import DBAdapter from ArtNet.file.file_reader import FileReader from ArtNet.file.config_reader import ConfigReader from ArtNet.gui.window import Window from ArtNet.gui.dialogs.db_connection_dialog.db_dialog import DBDialog from ArtNet.web.link_generator import LinkGenerator class ArtNetManager: def __init__(self, config_location: str = "."): self.config = ConfigReader(config_location, "somePassword") if self.config.data["version"] != self.config.CONFIG_VERSION: print("Loaded config version is unequal to expected version! {0} (current) != {1} (expected)" .format(self.config.data["version"], self.config.CONFIG_VERSION)) self.db_connection = None self.__app = QApplication(sys.argv) while self.db_connection is None: try: self.db_connection = self.create_db_connection(user=self.config.data["db"]["user"], password=self.config.data["db"]["password"], host=self.config.data["db"]["host"], port=self.config.data["db"]["port"], database=self.config.data["db"]["database"]) except ValueError as e: # db connection didn't work print(e) dialog = DBDialog() prev_db_data = self.get_db_connection_details() dialog.ui.user_line_edit.setText(prev_db_data["user"]) dialog.ui.password_line_edit.setText(prev_db_data["password"]) dialog.ui.host_line_edit.setText(prev_db_data["host"]) dialog.ui.database_line_edit.setText(prev_db_data["database"]) dialog.ui.port_line_edit.setText(str(prev_db_data["port"])) db_data: dict = dialog.exec_() if len(db_data.keys()) == 0: return self.change_db_connection(host=db_data["host"], port=db_data["port"], user=db_data["user"], password=db_data["password"], database=db_data["database"]) self.window = Window(self) if len(self.config.data["file_root"]) == 0 or not os.path.isdir(self.config.data["file_root"]): # no file_root given by config or invalid print("Querying for new file root due to lack of valid one ...") self.window.on_artnet_root_change_clicked() self.__file_reader = FileReader(self.config.data["file_root"]) self.curr_image_index: int = None self.all_images: list = None self.update_all_images_list() self.window.set_temporary_status_message("Hello o7", 5000) def run(self): if len(self.all_images) > 0: self.curr_image_index = 0 self.refresh_shown_image() self.window.show() sys.exit(self.__app.exec_()) def update_all_images_list(self): """ Updates the internal list of all images in the artnet root :return: """ images = [] artist_list = self.__file_reader.list_artists() artist_list.sort() for artist in artist_list: for image in self.__file_reader.get_files(artist): images.append(image) self.all_images = images self.known_image_amount = 0 for image in self.all_images: if self.db_connection.get_art_by_path(image) is not None: self.known_image_amount += 1 def scrape_tags(self, file_name: str, art_ID: int, url: str=None): """ Scrape the tags from the given url and return which one's are new :param file_name: :param art_ID: :param url: :return: """ if url is None: return None tags = LinkGenerator.get_instance().scrape_tags(url=url, file_name=file_name, domain=LinkGenerator.get_instance().predict_domain(file_name)) if tags is None: return None already_applied_tags = self.db_connection.get_art_tags_by_ID(art_ID) for i in range(len(already_applied_tags)): # converting the list to List[str] already_applied_tags[i] = self.db_connection.get_tag_by_ID(already_applied_tags[i])[0][1].strip() importable_tags = [] importable_artists = [] scraped_tags = [] for i in tags.values(): scraped_tags += i for tag in scraped_tags: result = self.db_connection.get_tag_by_name(tag) if len(result) == 0: # tag does not exist yet if tag in tags['artists']: importable_artists.append(tag) continue importable_tags.append(tag) continue if tag in already_applied_tags: # tag is already applied continue result = self.db_connection.get_tag_impliers(tag) if len(result) != 0: # tag is implied by some other tag continue result = self.db_connection.get_tag_aliases_by_name(tag) if len(result) != 0: # tag is already tagged by an alias continue if tag in tags['artists']: importable_artists.append(tag) continue importable_tags.append(tag) # tag must be known and not tagged yet return importable_tags, importable_artists def import_tags(self, tags): """ Add a list of given tags to the specified art_ID :param tags: :return: """ for tag in tags: if len(self.db_connection.get_tag_by_name(tag)) == 0: # tag doesn't exist yet result = self.window.force_edit_tag_dialog(name=tag) if result is None: # tag creation was aborted continue tag = result['name'] # overwrite with possibly new tag name tag = self.db_connection.get_tag_by_name(tag)[0][0] self.window.curr_tags.append(tag) self.window.data_changed = True def get_md5_of_image(self, path: str): """ Calculate the md5 hash of the image given by the path. :param path: assumed to be relative e.g. artist_A/image.jpg :return: """ md5_hash = md5() try: with open(self.get_root() + os.path.sep + path, "rb") as file: # read file part by part because of potential memory limits for chunk in iter(lambda: file.read(4096), b""): md5_hash.update(chunk) return md5_hash.hexdigest() except FileNotFoundError: self.update_all_images_list() return None def delete_image(self, path: str, delete_instead_of_move: bool = False, trash_bin_folder_path: str = "trash"): """ Deletes the image from the root folder :param path: :param delete_instead_of_move: :param trash_bin_folder_path: :return: """ full_art_path = self.get_root() + os.path.sep + path if delete_instead_of_move: print(f"Deleting the actual file {full_art_path} is disabled for now") #os.remove(full_art_path) #return trash_dst = trash_bin_folder_path + os.path.sep + path t_splits = trash_dst.split(os.path.sep) t_path = "" for i in range(len(t_splits)-1): t_path += os.path.sep + t_splits[i] t_path = t_path[1:] if not os.path.exists(t_path): print(f"{t_path} did not exist and will be created!") os.makedirs("."+os.path.sep+t_path) print(f"Moving image {full_art_path} to {trash_dst}") shutil.move(full_art_path, trash_dst) self.update_all_images_list() while self.curr_image_index >= len(self.all_images): self.curr_image_index -= 1 @DeprecationWarning def recalculate_hash_for_known_images(self): """ Calculate and write the md5 hash for every image known to the database. Important: For migration purposes only :return: """ for i in range(len(self.all_images)): image_db_result = self.db_connection.get_art_by_path(self.all_images[i]) file_name = os.path.basename(self.all_images[i]) if image_db_result is None: # unknown image, skip continue hash_digest = self.get_md5_of_image(self.all_images[i]) authors = self.db_connection.get_authors_of_art_by_ID(image_db_result["ID"]) tag_ids = self.db_connection.get_art_tags_by_ID(art_ID=image_db_result["ID"]) tags = [] for tag_id in tag_ids: tags.append(self.db_connection.get_tag_by_ID(tag_id)[0][1].strip()) if image_db_result["title"] == file_name or len(image_db_result["title"]) == 0: image_db_result["title"] = None self.db_connection.save_image(ID=image_db_result["ID"], path=image_db_result["path"], title=image_db_result["title"], link=image_db_result["link"], authors=authors, md5_hash=hash_digest, tags=tags) def get_next_unknown_image(self) -> int: """ Search all images for the next image not known to the database in ascending order. :return: index of the next unknown image, None if there is no unknown image """ next_unknown: int = None curr_searched_image_index = self.curr_image_index + 1 finished_loop = False while next_unknown is None and not finished_loop: if curr_searched_image_index >= len(self.all_images): # wrap around to the start curr_searched_image_index = 0 if curr_searched_image_index == self.curr_image_index: # searched all images, nothing unknown break image_db_result = self.db_connection.get_art_by_path(self.all_images[curr_searched_image_index]) if image_db_result is None: image_db_result = self.db_connection.get_art_by_hash( self.get_md5_of_image(self.all_images[curr_searched_image_index]) ) if image_db_result is None: # image is unknown to database image_db_result = self.db_connection.get_art_by_hash( self.get_md5_of_image(self.all_images[curr_searched_image_index])) if image_db_result is None: next_unknown = curr_searched_image_index break curr_searched_image_index += 1 if next_unknown: return curr_searched_image_index else: return None def get_prev_unknown_image(self) -> int: """ Search all images for the next image not known to the databse in descending order. :return:index of the next unknown image, None if there is no unknown image """ next_unknown: int = None curr_searched_image_index = self.curr_image_index - 1 finished_loop = False while next_unknown is None and not finished_loop: if curr_searched_image_index <= 0: # wrap around to the end curr_searched_image_index = len(self.all_images) - 1 if curr_searched_image_index == self.curr_image_index: # searched all images, nothing unknown break image_db_result = self.db_connection.get_art_by_path(self.all_images[curr_searched_image_index]) if image_db_result is None: # image is unknown to database image_db_result = self.db_connection.get_art_by_hash( self.get_md5_of_image(self.all_images[curr_searched_image_index])) if image_db_result is None: next_unknown = curr_searched_image_index break curr_searched_image_index -= 1 if next_unknown: return curr_searched_image_index else: return None def refresh_shown_image(self): """ Refresh the image display to show the most current data according to the selected image :return: """ self.window.setting_up_data = True image_db_result = self.db_connection.get_art_by_hash( self.get_md5_of_image(self.all_images[self.curr_image_index]) ) #image_db_result = self.db_connection.get_art_by_path(self.all_images[self.curr_image_index]) s = self.all_images[self.curr_image_index].split(os.path.sep) if image_db_result is not None: image_title = image_db_result["title"] if isinstance(image_db_result["title"], str) and \ len(image_db_result["title"]) > 0 else self.window.UNKNOWN_TITLE image_author = self.db_connection.get_authors_of_art_by_ID(image_db_result["ID"]) image_link = image_db_result["link"] art_ID = image_db_result["ID"] if image_author is None or len(image_author) == 0: image_author = [(self.all_images[self.curr_image_index].split(os.path.sep)[0], "(Not in Database)")] else: art_ID = None image_author = [(s[0], "(Not in Database)")] image_title = s[-1] + " (Not in Database)" image_link = "(Unknown)" print("Displaying", self.all_images[self.curr_image_index]) self.window.display_image(image_title, image_author, os.path.join(self.config.data["file_root"], self.all_images[self.curr_image_index]), self.all_images[self.curr_image_index], art_ID, image_link, file_name=s[-1]) self.window.set_tag_list([self.db_connection.get_tag_by_ID(x)[0][1].strip() for x in self.db_connection.get_art_tags_by_ID(art_ID)]) self.window.data_changed = False self.window.setting_up_data = False def create_db_connection(self, host: str, port: int, database: str, user: str, password: str) -> DBAdapter: print("Changing db connection to ".format(host, port, user, password)) return DBAdapter(user=user, password=password, host=host, port=port, database=database) def get_root(self) -> str: return self.config.data["file_root"] def change_root(self, path: str): """ Change the Artnet root path and confirm it is working. Rewrite config there :param path: :return: """ if len(path) == 0: exit(0) print("Changing root to", path) self.config.data["file_root"] = path self.config.update_config() self.__file_reader = FileReader(self.config.data["file_root"]) self.update_all_images_list() self.refresh_shown_image() def get_db_connection_details(self) -> dict: return self.config.data["db"] def change_db_connection(self, host: str, port: int, database: str, user: str, password: str): self.config.data["db"]["user"] = user self.config.data["db"]["password"] = password self.config.data["db"]["host"] = host self.config.data["db"]["database"] = database self.config.data["db"]["port"] = str(port) self.config.update_config() self.db_connection = self.create_db_connection(host, port, database, user, password)