import requests import json test_collection_entries = [{"name": "collection_name1", "description": "description1", "id": None}, {"name": "collection_name2", "description": "description2", "id": None}] def list_collections(url: str, port: int): r = requests.get(f"http://{url}:{port}/artnet/metadata/collection") if r.status_code != 200: print(f"Querying collections failed! Status: {r.status_code} Reason: {r.text}") raise Exception("Failed to query for collections!") collections = json.loads(r.text) return collections def create_collection_entries(url: str, port: int): for i in range(len(test_collection_entries)): r = create_collection(url, port, name=test_collection_entries[i]["name"], description=test_collection_entries[i]["description"]) if r.status_code != 200: print(f"Create Collection Entry Test Nr.{i}: failed with {r.status_code} and reason {r.text}") raise Exception("Create Collection Entry Test: FAILED") else: test_collection_entries[i]["id"] = json.loads(r.text)["id"] def update_collection_entries(url: str, port: int): for i in range(len(test_collection_entries)): new_name = test_collection_entries[i]["name"] + "_updated" new_desc = test_collection_entries[i]["description"] + "_updated" r = update_collection(url, port, test_collection_entries[i]["id"], new_name, new_desc) if r.status_code != 200: print(f"Updating Collection Entry Test Nr.{i}: failed with {r.status_code} and reason {r.text}") raise Exception("Update Collection Entry Test: FAILED") new_coll = get_collection_by_id(url, port, test_collection_entries[i]["id"]) if new_coll["name"] != new_name: print(f"Collection name is not matching the update collection! " f"Current: {new_coll['name']} Expected: {new_name}") raise Exception("Update Collection Entry Test: FAILED") if new_coll["description"] != new_desc: print(f"Collection description is not matching the update collection! " f"Current: {new_coll['description']} Expected: {new_desc}") raise Exception("Update Collection Entry Test: FAILED") def delete_collection_entries(url: str, port: int): for i in range(len(test_collection_entries)): r = delete_collection(url, port, id=test_collection_entries[i]["id"]) if r.status_code != 200: print(f"Delete Collection Entry Test Nr.{i}: failed with {r.status_code} and reason {r.text}") raise Exception("Delete Collection Entry Test: FAILED") def delete_all_collections(url: str, port: int): collections = list_collections(url, port) for collection in collections: r = delete_collection(url, port, collection["id"]) if r.status_code != 200: print(f"Failed deleting collections! Status: {r.status_code} Reason: {r.text}") raise Exception("Failed deleting collection!") def get_collection_by_id(url: str, port: int, id: str): r = requests.get(f"http://{url}:{port}/artnet/metadata/collection?id={id}") if r.status_code != 200: raise Exception("Failed querying for collection!") return json.loads(r.text) def get_collection_by_name(url: str, port: int, name: str): r = requests.get(f"http://{url}:{port}/artnet/metadata/collection?name={name}") return r def create_collection(url: str, port: int, name: str, description: str): r = requests.post(f"http://{url}:{port}/artnet/metadata/collection", json={"name": name, "description": description}) return r def update_collection(url: str, port: int, id: str, name: str, description: str): r = requests.post(f"http://{url}:{port}/artnet/metadata/collection?id={id}", json={"name": name, "description": description}) return r def delete_collection(url: str, port: int, id: str): r = requests.delete(f"http://{url}:{port}/artnet/metadata/collection?id={id}") return r def run_collection_tests(url: str, port: int): print() print("----------------") l = len(list_collections(url, port)) print(f"Starting collection test with ({l}) collections!") if l > 0: print("Deleting leftover collections ...") delete_all_collections(url, port) print(f"Creating {len(list_collections(url, port))} collections as a test ...") create_collection_entries(url, port) create_collection_result = False if not len(list_collections(url, port)) else True print(f"Found {len(list_collections(url, port))} collection entries!") print(f"Updating collection entries with new data ...") update_collection_entries(url, port) update_collection_result = True print(f"Deleting the collections again ...") delete_collection_entries(url, port) delete_collection_result = False if not len(list_collections(url, port)) == 0 else True print(f"Found {len(list_collections(url, port))} collection entries!") return create_collection_result, update_collection_result, delete_collection_result