import os import base64 import copy import secrets import yaml from cryptography.exceptions import InvalidTag from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC class ConfigReader: kdf = None nonce = None CONFIG_VERSION = 4 def __init__(self, root_folder: str): if root_folder is None: raise Exception("No root folder was defined!") self.__key = None self.__aesgcm = None self.data = {} self.__config_location = os.path.join(root_folder, ".artnet", "artnet.config") self.__config_key_location = os.path.join(root_folder, ".artnet", "artnet_config.key") try: password = self.try_get_config_password() except FileNotFoundError as e: if not os.path.exists(self.__config_location): self.generate_config_key_file() password = self.try_get_config_password() else: raise e ConfigReader.__create_kdf() self.__key = ConfigReader.kdf.derive(bytes(password.encode('utf-8'))) self.__aesgcm = AESGCM(self.__key) if os.path.exists(self.__config_location) and os.path.isfile(self.__config_location): self.read_config() else: if not os.path.exists(os.path.join(root_folder, ".artnet")): os.mkdir(os.path.join(root_folder, ".artnet")) self.create_default_config() self.read_config() def try_get_config_password(self) -> str: """ Attempts to read the config encryption password from the default location """ if os.path.exists(self.__config_key_location) and os.path.isfile(self.__config_key_location): with open(self.__config_key_location, "r") as file: password = file.readline() file.close() return password else: raise FileNotFoundError(f"The config encryption key file could not be found! " f"FileNotFound: {self.__config_key_location}") def generate_config_key_file(self): """ Tries to generate a new pseudo-random generated key file for encrypting the config file. The key is in plain text and therefore not ideal. """ if not os.path.exists(self.__config_key_location) and not os.path.exists(self.__config_location): with open(self.__config_key_location, "w") as file: file.write(secrets.token_hex(100)) file.close() else: raise Exception("Something went terribly wrong! " "Tried to create new config key file although the key file or config file aready exists! " "Perhaps old config?") def update_config(self): """ Update the written config with the local settings :return: """ file = open(self.__config_location, "w") yaml.dump(stream=file, data=self.encrypt_sensitive_data(copy.deepcopy(self.data))) def read_config(self): """ Read the config from file and overwrite local settings :return: """ print(f"Config file: {os.path.join(os.getcwd(), self.__config_location)}") file = open(self.__config_location, "r") data = yaml.safe_load(stream=file) self.data = self.decrypt_sensitive_data(data) def encrypt_sensitive_data(self, data: dict) -> dict: """ Encrypts the sensitive portions of the data :return: """ new_data = data new_data["db"]["password"] = self.encrypt_text(data["db"]["password"]) new_data["db"]["user"] = self.encrypt_text(data["db"]["user"]) new_data["file_root"] = self.encrypt_text(data["file_root"]) return new_data def decrypt_sensitive_data(self, data: dict) -> dict: """ Decrypts the sensitive portions of the data :return: """ new_data = data new_data["db"]["password"] = self.decrypt_text(data["db"]["password"]) new_data["db"]["user"] = self.decrypt_text(data["db"]["user"]) new_data["file_root"] = self.decrypt_text(data["file_root"]) return new_data def create_default_config(self): """ Create a default config, overwrites all settings and generates a new key :return: """ self.data = { "version": ConfigReader.CONFIG_VERSION, "db": { "host": "localhost", "port": 5432, "database": "artnet", "user": "your_user", "password": "enter_password_via_gui" }, "file_root": "", } self.update_config() @staticmethod def __create_kdf(): ConfigReader.kdf = PBKDF2HMAC(algorithm=hashes.SHA512(), length=32, salt=bytes("ArtN3t.WhatElse?".encode('utf-8')), iterations=10000, backend=default_backend() ) ConfigReader.nonce = bytes("qt34nvßn".encode('utf-8')) def encrypt_text(self, text: str) -> bytes: cipher_text_bytes = self.__aesgcm.encrypt(data=text.encode('utf-8'), associated_data=None, nonce=ConfigReader.nonce ) return base64.urlsafe_b64encode(cipher_text_bytes) def decrypt_text(self, cipher: str) -> str: if ConfigReader.kdf is None: ConfigReader.__create_kdf() try: decrypted_cipher_text_bytes = self.__aesgcm.decrypt( nonce=ConfigReader.nonce, data=base64.urlsafe_b64decode(cipher), associated_data=None ) except InvalidTag: raise Exception("Could not decrypt Text! Wrong Password?") return decrypted_cipher_text_bytes.decode('utf-8') if __name__ == "__main__": cr = ConfigReader(password="MySuperDuperKey", root_folder=".") print(cr.data)