You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
451 lines
19 KiB
Python
451 lines
19 KiB
Python
import shutil
|
|
import sys
|
|
import os
|
|
import logging
|
|
import datetime
|
|
from hashlib import md5
|
|
|
|
from PyQt5.QtWidgets import QApplication
|
|
|
|
from ArtNet.db.db_adapter import DBAdapter
|
|
from ArtNet.file.file_reader import FileReader
|
|
from ArtNet.file.config_reader import ConfigReader
|
|
from ArtNet.gui.importer_window import ImporterWindow
|
|
from ArtNet.gui.browse_window import BrowseWindow
|
|
from ArtNet.gui.dialogs.db_connection_dialog.db_dialog import DBDialog
|
|
from ArtNet.web.link_generator import LinkGenerator
|
|
|
|
|
|
class ArtNetManager:
|
|
LOG_FOLDER = "log"
|
|
|
|
def __init__(self, config_location: str = "."):
|
|
if not os.path.isdir(ArtNetManager.LOG_FOLDER):
|
|
os.mkdir(ArtNetManager.LOG_FOLDER)
|
|
file_name = "artnet-" + str(datetime.datetime.now().strftime("%Y-%m-%d")) + ".log"
|
|
logging.basicConfig(filename=os.path.join(ArtNetManager.LOG_FOLDER, file_name), encoding='utf-8',
|
|
level=logging.DEBUG)
|
|
logFormatter = logging.Formatter(fmt="%(asctime)s.%(msecs)03d [%(threadName)-12.12s] [%(levelname)-5.5s] "
|
|
"%(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S")
|
|
|
|
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
|
|
for handler in logging.getLogger().handlers:
|
|
handler.setFormatter(logFormatter)
|
|
logging.info("Starting ArtNet client ...")
|
|
|
|
self.known_image_amount = None
|
|
self.config = ConfigReader(config_location, "somePassword")
|
|
|
|
if self.config.data["version"] != self.config.CONFIG_VERSION:
|
|
logging.warning("Loaded config version is unequal to expected version! {0} (current) != {1} (expected)"
|
|
.format(self.config.data["version"], self.config.CONFIG_VERSION))
|
|
|
|
self.db_connection = None
|
|
self.__app = QApplication(sys.argv)
|
|
|
|
while self.db_connection is None:
|
|
try:
|
|
self.db_connection = self.create_db_connection(user=self.config.data["db"]["user"],
|
|
password=self.config.data["db"]["password"],
|
|
host=self.config.data["db"]["host"],
|
|
port=self.config.data["db"]["port"],
|
|
database=self.config.data["db"]["database"])
|
|
except ValueError as e: # db connection didn't work
|
|
logging.error(e)
|
|
dialog = DBDialog()
|
|
prev_db_data = self.get_db_connection_details()
|
|
dialog.ui.user_line_edit.setText(prev_db_data["user"])
|
|
dialog.ui.password_line_edit.setText(prev_db_data["password"])
|
|
dialog.ui.host_line_edit.setText(prev_db_data["host"])
|
|
dialog.ui.database_line_edit.setText(prev_db_data["database"])
|
|
dialog.ui.port_line_edit.setText(str(prev_db_data["port"]))
|
|
|
|
db_data: dict = dialog.exec_()
|
|
if db_data is None:
|
|
logging.info("Dialog was closed without result! Exiting ...")
|
|
exit(0)
|
|
if len(db_data.keys()) == 0:
|
|
return
|
|
self.change_db_connection(host=db_data["host"], port=db_data["port"],
|
|
user=db_data["user"], password=db_data["password"],
|
|
database=db_data["database"])
|
|
self.curr_active_window = None
|
|
|
|
self.import_window = ImporterWindow(self)
|
|
self.browse_window = BrowseWindow(self)
|
|
|
|
if len(self.config.data["file_root"]) == 0 or not os.path.isdir(
|
|
self.config.data["file_root"]): # no file_root given by config or invalid
|
|
logging.info("Querying for new file root due to lack of valid one ...")
|
|
self.import_window.on_artnet_root_change_clicked()
|
|
self.__file_reader = FileReader(self.config.data["file_root"])
|
|
|
|
self.curr_image_index: int = None
|
|
|
|
self.all_images: list = None
|
|
self.update_all_images_list()
|
|
|
|
self.import_window.set_temporary_status_message("Hello o7", 5000)
|
|
|
|
def run(self):
|
|
if len(self.all_images) == 0:
|
|
raise FileNotFoundError("No files or folders were in artnet root!")
|
|
self.curr_image_index = 0
|
|
self.curr_active_window = self.import_window
|
|
self.refresh_shown_image()
|
|
|
|
self.import_window.show()
|
|
|
|
status = self.__app.exec_()
|
|
logging.info(f"Shutting client down with status: {status}")
|
|
sys.exit(status)
|
|
|
|
def switch_to_browser(self, displayed_image_index: int = None):
|
|
logging.debug("Switching to browser window ...")
|
|
if displayed_image_index is not None:
|
|
self.curr_image_index = displayed_image_index
|
|
self.refresh_shown_image()
|
|
self.curr_active_window.hide()
|
|
|
|
self.browse_window.show()
|
|
self.curr_active_window = self.browse_window
|
|
|
|
def switch_to_importer(self, displayed_image_index: int = None):
|
|
logging.debug("Switching to importer window ...")
|
|
if displayed_image_index is not None:
|
|
self.curr_image_index = displayed_image_index
|
|
self.refresh_shown_image()
|
|
self.curr_active_window.hide()
|
|
|
|
self.import_window.show()
|
|
self.curr_active_window = self.browse_window
|
|
|
|
def update_all_images_list(self):
|
|
"""
|
|
Updates the internal list of all images in the artnet root
|
|
:return:
|
|
"""
|
|
images = []
|
|
artist_list = self.__file_reader.list_artists()
|
|
artist_list.sort()
|
|
for artist in artist_list:
|
|
for image in self.__file_reader.get_files(artist):
|
|
images.append(image)
|
|
self.all_images = images
|
|
|
|
self.known_image_amount = 0
|
|
for image in self.all_images:
|
|
if self.db_connection.get_art_by_path(image) is not None:
|
|
self.known_image_amount += 1
|
|
|
|
self.import_window.ui.imageNumberSpinBox.setMaximum(len(self.all_images))
|
|
|
|
def scrape_tags(self, file_name: str, art_ID: int, url: str = None):
|
|
"""
|
|
Scrape the tags from the given url and return which one's are new
|
|
:param file_name:
|
|
:param art_ID:
|
|
:param url:
|
|
:return:
|
|
"""
|
|
if url is None:
|
|
return None
|
|
|
|
tags = LinkGenerator.get_instance().scrape_tags(url=url, file_name=file_name,
|
|
domain=LinkGenerator.get_instance().predict_domain(file_name))
|
|
if tags is None:
|
|
return None
|
|
|
|
already_applied_tags = self.db_connection.get_art_tags_by_ID(art_ID)
|
|
for i in range(len(already_applied_tags)): # converting the list to List[str]
|
|
already_applied_tags[i] = self.db_connection.get_tag_by_ID(already_applied_tags[i])[0][1].strip()
|
|
|
|
importable_tags = []
|
|
importable_artists = []
|
|
scraped_tags = []
|
|
for i in tags.values():
|
|
scraped_tags += i
|
|
|
|
for tag in scraped_tags:
|
|
result = self.db_connection.get_tag_by_name(tag)
|
|
if len(result) == 0: # tag does not exist yet
|
|
if tag in tags['artists']:
|
|
importable_artists.append(tag)
|
|
continue
|
|
importable_tags.append(tag)
|
|
continue
|
|
|
|
if tag in already_applied_tags: # tag is already applied
|
|
continue
|
|
|
|
result = self.db_connection.get_tag_impliers(tag)
|
|
if len(result) != 0: # tag is implied by some other tag
|
|
skip = False
|
|
for implier_tag_id in result:
|
|
implier_tag_id, implier_tag_name, _, _ = self.db_connection.get_tag_by_ID(implier_tag_id)[0]
|
|
|
|
if implier_tag_name.strip() in scraped_tags:
|
|
skip = True
|
|
break
|
|
if skip: # skipping the current tag as it is implied by another tag
|
|
continue
|
|
|
|
result = self.db_connection.get_tag_aliases_by_name(tag)
|
|
if len(result) != 0: # tag is already tagged by an alias
|
|
continue
|
|
|
|
if tag in tags['artists']:
|
|
importable_artists.append(tag)
|
|
continue
|
|
importable_tags.append(tag) # tag must be known and not tagged yet
|
|
|
|
return importable_tags, importable_artists
|
|
|
|
def import_tags(self, tags):
|
|
"""
|
|
Add a list of given tags to the specified art_ID
|
|
:param tags:
|
|
:return:
|
|
"""
|
|
for tag in tags:
|
|
|
|
if len(self.db_connection.get_tag_by_name(tag)) == 0: # tag doesn't exist yet
|
|
result = self.import_window.force_edit_tag_dialog(name=tag)
|
|
|
|
if result is None: # tag creation was aborted
|
|
continue
|
|
|
|
tag = result['name'] # overwrite with possibly new tag name
|
|
|
|
tag = self.db_connection.get_tag_by_name(tag)[0][0]
|
|
self.import_window.curr_tags.append(tag)
|
|
self.import_window.data_changed = True
|
|
|
|
def get_md5_of_image(self, path: str):
|
|
"""
|
|
Calculate the md5 hash of the image given by the path.
|
|
|
|
:param path: assumed to be relative e.g. artist_A/image.jpg
|
|
:return:
|
|
"""
|
|
md5_hash = md5()
|
|
try:
|
|
with open(self.get_root() + os.path.sep + path,
|
|
"rb") as file: # read file part by part because of potential memory limits
|
|
for chunk in iter(lambda: file.read(4096), b""):
|
|
md5_hash.update(chunk)
|
|
|
|
return md5_hash.hexdigest()
|
|
except FileNotFoundError:
|
|
self.update_all_images_list()
|
|
return None
|
|
|
|
def delete_image(self, path: str, delete_instead_of_move: bool = False, trash_bin_folder_path: str = "trash"):
|
|
"""
|
|
Deletes the image from the root folder
|
|
:param path:
|
|
:param delete_instead_of_move:
|
|
:param trash_bin_folder_path:
|
|
:return:
|
|
"""
|
|
full_art_path = self.get_root() + os.path.sep + path
|
|
|
|
if delete_instead_of_move:
|
|
logging.warning(f"Deleting the actual file {full_art_path} is disabled for now")
|
|
# os.remove(full_art_path)
|
|
# return
|
|
|
|
trash_dst = trash_bin_folder_path + os.path.sep + path
|
|
t_splits = trash_dst.split(os.path.sep)
|
|
t_path = ""
|
|
for i in range(len(t_splits) - 1):
|
|
t_path += os.path.sep + t_splits[i]
|
|
t_path = t_path[1:]
|
|
if not os.path.exists(t_path):
|
|
logging.info(f"{t_path} did not exist and will be created!")
|
|
os.makedirs("." + os.path.sep + t_path)
|
|
logging.info(f"Moving image {full_art_path} to {trash_dst}")
|
|
shutil.move(full_art_path, trash_dst)
|
|
|
|
self.update_all_images_list()
|
|
while self.curr_image_index >= len(self.all_images):
|
|
self.curr_image_index -= 1
|
|
|
|
@DeprecationWarning
|
|
def recalculate_hash_for_known_images(self):
|
|
"""
|
|
Calculate and write the md5 hash for every image known to the database.
|
|
|
|
Important: For migration purposes only
|
|
:return:
|
|
"""
|
|
for i in range(len(self.all_images)):
|
|
image_db_result = self.db_connection.get_art_by_path(self.all_images[i])
|
|
file_name = os.path.basename(self.all_images[i])
|
|
if image_db_result is None: # unknown image, skip
|
|
continue
|
|
|
|
hash_digest = self.get_md5_of_image(self.all_images[i])
|
|
|
|
authors = self.db_connection.get_authors_of_art_by_ID(image_db_result["ID"])
|
|
tag_ids = self.db_connection.get_art_tags_by_ID(art_ID=image_db_result["ID"])
|
|
tags = []
|
|
for tag_id in tag_ids:
|
|
tags.append(self.db_connection.get_tag_by_ID(tag_id)[0][1].strip())
|
|
|
|
if image_db_result["title"] == file_name or len(image_db_result["title"]) == 0:
|
|
image_db_result["title"] = None
|
|
|
|
self.db_connection.save_image(ID=image_db_result["ID"], path=image_db_result["path"],
|
|
title=image_db_result["title"],
|
|
link=image_db_result["link"], authors=authors, md5_hash=hash_digest,
|
|
tags=tags)
|
|
|
|
def get_next_unknown_image(self) -> int:
|
|
"""
|
|
Search all images for the next image not known to the database in ascending order.
|
|
|
|
:return: index of the next unknown image, None if there is no unknown image
|
|
"""
|
|
next_unknown: int = None
|
|
curr_searched_image_index = self.curr_image_index + 1
|
|
|
|
finished_loop = False
|
|
while next_unknown is None and not finished_loop:
|
|
if curr_searched_image_index >= len(self.all_images): # wrap around to the start
|
|
curr_searched_image_index = 0
|
|
if curr_searched_image_index == self.curr_image_index: # searched all images, nothing unknown
|
|
break
|
|
|
|
image_db_result = self.db_connection.get_art_by_path(self.all_images[curr_searched_image_index])
|
|
|
|
if image_db_result is None:
|
|
image_db_result = self.db_connection.get_art_by_hash(
|
|
self.get_md5_of_image(self.all_images[curr_searched_image_index])
|
|
)
|
|
|
|
if image_db_result is None: # image is unknown to database
|
|
image_db_result = self.db_connection.get_art_by_hash(
|
|
self.get_md5_of_image(self.all_images[curr_searched_image_index]))
|
|
if image_db_result is None:
|
|
next_unknown = curr_searched_image_index
|
|
break
|
|
|
|
curr_searched_image_index += 1
|
|
|
|
if next_unknown:
|
|
return curr_searched_image_index
|
|
else:
|
|
return None
|
|
|
|
def get_prev_unknown_image(self) -> int:
|
|
"""
|
|
Search all images for the next image not known to the databse in descending order.
|
|
|
|
:return:index of the next unknown image, None if there is no unknown image
|
|
"""
|
|
next_unknown: int = None
|
|
curr_searched_image_index = self.curr_image_index - 1
|
|
|
|
finished_loop = False
|
|
while next_unknown is None and not finished_loop:
|
|
if curr_searched_image_index <= 0: # wrap around to the end
|
|
curr_searched_image_index = len(self.all_images) - 1
|
|
if curr_searched_image_index == self.curr_image_index: # searched all images, nothing unknown
|
|
break
|
|
|
|
image_db_result = self.db_connection.get_art_by_path(self.all_images[curr_searched_image_index])
|
|
|
|
if image_db_result is None: # image is unknown to database
|
|
image_db_result = self.db_connection.get_art_by_hash(
|
|
self.get_md5_of_image(self.all_images[curr_searched_image_index]))
|
|
if image_db_result is None:
|
|
next_unknown = curr_searched_image_index
|
|
break
|
|
|
|
curr_searched_image_index -= 1
|
|
|
|
if next_unknown:
|
|
return curr_searched_image_index
|
|
else:
|
|
return None
|
|
|
|
def refresh_shown_image(self):
|
|
"""
|
|
Refresh the image display to show the most current data according to self.curr_image_index.
|
|
This index points to the image displayed as an index on self.all_images
|
|
:return:
|
|
"""
|
|
self.curr_active_window.setting_up_data = True
|
|
|
|
image_db_result = self.db_connection.get_art_by_hash(
|
|
self.get_md5_of_image(self.all_images[self.curr_image_index])
|
|
)
|
|
# image_db_result = self.db_connection.get_art_by_path(self.all_images[self.curr_image_index])
|
|
s = self.all_images[self.curr_image_index].split(os.path.sep)
|
|
if image_db_result is not None:
|
|
image_title = image_db_result["title"] if isinstance(image_db_result["title"], str) and \
|
|
len(image_db_result["title"]) > 0 else self.import_window.UNKNOWN_TITLE
|
|
image_author = self.db_connection.get_authors_of_art_by_ID(image_db_result["ID"])
|
|
image_link = image_db_result["link"]
|
|
image_description = image_db_result["description"]
|
|
art_ID = image_db_result["ID"]
|
|
if image_author is None or len(image_author) == 0:
|
|
image_author = [(self.all_images[self.curr_image_index].split(os.path.sep)[0], "(Not in Database)")]
|
|
else:
|
|
art_ID = None
|
|
image_author = [(s[0], "(Not in Database)")]
|
|
image_title = s[-1] + " (Not in Database)"
|
|
image_link = "(Unknown)"
|
|
image_description = None
|
|
|
|
logging.info(f"Displaying #{self.curr_image_index} \"{self.all_images[self.curr_image_index]}\"")
|
|
self.curr_active_window.display_image(image_title, image_author,
|
|
os.path.join(self.config.data["file_root"], self.all_images[self.curr_image_index]),
|
|
self.all_images[self.curr_image_index],
|
|
art_ID, image_link, file_name=s[-1], description=image_description)
|
|
self.curr_active_window.set_tag_list(
|
|
[self.db_connection.get_tag_by_ID(x)[0][1].strip() for x in self.db_connection.get_art_tags_by_ID(art_ID)])
|
|
|
|
self.curr_active_window.data_changed = False
|
|
self.curr_active_window.setting_up_data = False
|
|
|
|
def create_db_connection(self, host: str, port: int, database: str, user: str, password: str) -> DBAdapter:
|
|
logging.info(f"Changing db connection to {host}:{port} {user}@{database} ...")
|
|
return DBAdapter(user=user, password=password, host=host, port=port, database=database)
|
|
|
|
def get_root(self) -> str:
|
|
return self.config.data["file_root"]
|
|
|
|
def change_root(self, path: str):
|
|
"""
|
|
Change the Artnet root path and confirm it is working.
|
|
|
|
Rewrite config there
|
|
:param path:
|
|
:return:
|
|
"""
|
|
if len(path) == 0:
|
|
exit(0)
|
|
logging.info("Changing root to", path)
|
|
self.config.data["file_root"] = path
|
|
self.config.update_config()
|
|
self.__file_reader = FileReader(self.config.data["file_root"])
|
|
self.update_all_images_list()
|
|
self.refresh_shown_image()
|
|
|
|
def get_db_connection_details(self) -> dict:
|
|
return self.config.data["db"]
|
|
|
|
def change_db_connection(self, host: str, port: int, database: str, user: str, password: str):
|
|
self.config.data["db"]["user"] = user
|
|
self.config.data["db"]["password"] = password
|
|
self.config.data["db"]["host"] = host
|
|
self.config.data["db"]["database"] = database
|
|
self.config.data["db"]["port"] = str(port)
|
|
|
|
self.config.update_config()
|
|
|
|
self.db_connection = self.create_db_connection(host, port, database, user, password)
|