from threading import Thread from time import sleep import numpy as np import random from scipy.spatial import KDTree class ImageCompareManageThread(Thread): def __init__(self, name: str, candidate_desc: np.array, descriptors: dict, callback, concurrent_threads: int = 10, samples: int = 100, dist_thresh: float = 80): super().__init__(name=name) self.candidate_desc = candidate_desc self.descriptors = descriptors self.samples = samples self.dist_thresh = dist_thresh self.callback = callback self.concurrent_threads = concurrent_threads self.todo = [] self.threads = {} self.searching = False def run(self): print("[{0}] Starting management ...".format(self.name)) self.searching = True self.todo = list(self.descriptors.keys()) for i in range(self.concurrent_threads): if len(self.todo) == 0: break key = self.todo.pop() ict = ImageCompareThread(key, self.candidate_desc, self.descriptors[key][1], self.samples, self.dist_thresh, self.finish_thread) self.threads[key] = ict ict.start() while self.searching: sleep(2) def finish_thread(self, name: str, hits: int): self.callback(name, hits) print("[{0}] finished with {1}".format(name, hits)) print("{0} jobs left ...".format(len(self.todo))) if len(self.todo) > 0 and self.searching: # still work to do, start another thread key = self.todo.pop() ict = ImageCompareThread(key, self.candidate_desc, self.descriptors[key][1], self.samples, self.dist_thresh, self.finish_thread) self.threads[key] = ict ict.start() else: self.searching = False class ImageCompareThread(Thread): def __init__(self, name: str, candidate_desc: np.array, db_desc: np.array, sample_size: int, dist_thresh: float, callback): super().__init__(name=name) self.candidate_desc = candidate_desc self.db_desc = db_desc self.samples = sample_size self.dist_thresh = dist_thresh self.callback = callback def run(self): print("[{0}] starting ...".format(self.name)) hits = self.find_matching_keypoints(self.candidate_desc, self.db_desc, self.samples, self.dist_thresh) self.callback(self.name, hits) def find_matching_keypoints(self, keypoints1: list, keypoints2: list, sample_size: int, dist_thresh: float) -> int: """ Find nearest neighbours for each point in keypoints1 in keypoints2. Returns number of sufficiently matching keypoints :param keypoints1: :param keypoints2: :param sample_size: :param dist_thresh: :return: """ hits, sum = 0, 0 for i in range(len(keypoints1)): hit, dist = self.has_matching_keypoint(keypoints1[i], keypoints2, dist_thresh) if hit: hits += 1 sum += dist return hits def __get_random_selection(self, l: list, num: int) -> list: result = [] for k in range(num): result.append(random.choice(l)) return result def has_matching_keypoint(self, point: np.ndarray, points: list, max_dist: float) -> tuple: """ Find nearest neighbour for point in points. :param point: :param points: :param max_dist: :return: """ tree = KDTree(points) dist, ind = tree.query([point], k=2) dist = dist[0] # just resolving nested lists #print("Distances:{0} Indexes:{1} MaxDist:{2}".format(dist, ind[0], max_dist)) if dist[0] <= max_dist: # second neighbour is found, valid hit return True, dist[0] else: return False, dist[0]