1
0
Fork 0
Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

122 Zeilen
4.0 KiB
Python

from threading import Thread
from time import sleep
import numpy as np
import random
from scipy.spatial import KDTree
class ImageCompareManageThread(Thread):
def __init__(self, name: str, candidate_desc: np.array, descriptors: dict, callback,
concurrent_threads: int = 10, samples: int = 100, dist_thresh: float = 80):
super().__init__(name=name)
self.candidate_desc = candidate_desc
self.descriptors = descriptors
self.samples = samples
self.dist_thresh = dist_thresh
self.callback = callback
self.concurrent_threads = concurrent_threads
self.todo = []
self.threads = {}
self.searching = False
def run(self):
print("[{0}] Starting management ...".format(self.name))
self.searching = True
self.todo = list(self.descriptors.keys())
for i in range(self.concurrent_threads):
if len(self.todo) == 0:
break
key = self.todo.pop()
ict = ImageCompareThread(key, self.candidate_desc, self.descriptors[key][1],
self.samples, self.dist_thresh, self.finish_thread)
self.threads[key] = ict
ict.start()
while self.searching:
sleep(2)
def finish_thread(self, name: str, hits: int):
self.callback(name, hits)
print("[{0}] finished with {1}".format(name, hits))
print("{0} jobs left ...".format(len(self.todo)))
if len(self.todo) > 0 and self.searching: # still work to do, start another thread
key = self.todo.pop()
ict = ImageCompareThread(key, self.candidate_desc, self.descriptors[key][1],
self.samples, self.dist_thresh, self.finish_thread)
self.threads[key] = ict
ict.start()
else:
self.searching = False
class ImageCompareThread(Thread):
def __init__(self, name: str, candidate_desc: np.array, db_desc: np.array,
sample_size: int, dist_thresh: float, callback):
super().__init__(name=name)
self.candidate_desc = candidate_desc
self.db_desc = db_desc
self.samples = sample_size
self.dist_thresh = dist_thresh
self.callback = callback
def run(self):
print("[{0}] starting ...".format(self.name))
hits = self.find_matching_keypoints(self.candidate_desc, self.db_desc,
self.samples, self.dist_thresh)
self.callback(self.name, hits)
def find_matching_keypoints(self, keypoints1: list, keypoints2: list,
sample_size: int, dist_thresh: float) -> int:
"""
Find nearest neighbours for each point in keypoints1 in keypoints2.
Returns number of sufficiently matching keypoints
:param keypoints1:
:param keypoints2:
:param sample_size:
:param dist_thresh:
:return:
"""
hits, sum = 0, 0
for i in range(len(keypoints1)):
hit, dist = self.has_matching_keypoint(keypoints1[i], keypoints2, dist_thresh)
if hit:
hits += 1
sum += dist
return hits
def __get_random_selection(self, l: list, num: int) -> list:
result = []
for k in range(num):
result.append(random.choice(l))
return result
def has_matching_keypoint(self, point: np.ndarray, points: list, max_dist: float) -> tuple:
"""
Find nearest neighbour for point in points.
:param point:
:param points:
:param max_dist:
:return:
"""
tree = KDTree(points)
dist, ind = tree.query([point], k=2)
dist = dist[0] # just resolving nested lists
#print("Distances:{0} Indexes:{1} MaxDist:{2}".format(dist, ind[0], max_dist))
if dist[0] <= max_dist: # second neighbour is found, valid hit
return True, dist[0]
else:
return False, dist[0]