Python program for finding duplicate images in a set based on OpenCV and feature detection
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

122 lines
4.0KB

  1. from threading import Thread
  2. from time import sleep
  3. import numpy as np
  4. import random
  5. from scipy.spatial import KDTree
  6. class ImageCompareManageThread(Thread):
  7. def __init__(self, name: str, candidate_desc: np.array, descriptors: dict, callback,
  8. concurrent_threads: int = 10, samples: int = 100, dist_thresh: float = 80):
  9. super().__init__(name=name)
  10. self.candidate_desc = candidate_desc
  11. self.descriptors = descriptors
  12. self.samples = samples
  13. self.dist_thresh = dist_thresh
  14. self.callback = callback
  15. self.concurrent_threads = concurrent_threads
  16. self.todo = []
  17. self.threads = {}
  18. self.searching = False
  19. def run(self):
  20. print("[{0}] Starting management ...".format(self.name))
  21. self.searching = True
  22. self.todo = list(self.descriptors.keys())
  23. for i in range(self.concurrent_threads):
  24. if len(self.todo) == 0:
  25. break
  26. key = self.todo.pop()
  27. ict = ImageCompareThread(key, self.candidate_desc, self.descriptors[key][1],
  28. self.samples, self.dist_thresh, self.finish_thread)
  29. self.threads[key] = ict
  30. ict.start()
  31. while self.searching:
  32. sleep(2)
  33. def finish_thread(self, name: str, hits: int):
  34. self.callback(name, hits)
  35. print("[{0}] finished with {1}".format(name, hits))
  36. print("{0} jobs left ...".format(len(self.todo)))
  37. if len(self.todo) > 0 and self.searching: # still work to do, start another thread
  38. key = self.todo.pop()
  39. ict = ImageCompareThread(key, self.candidate_desc, self.descriptors[key][1],
  40. self.samples, self.dist_thresh, self.finish_thread)
  41. self.threads[key] = ict
  42. ict.start()
  43. else:
  44. self.searching = False
  45. class ImageCompareThread(Thread):
  46. def __init__(self, name: str, candidate_desc: np.array, db_desc: np.array,
  47. sample_size: int, dist_thresh: float, callback):
  48. super().__init__(name=name)
  49. self.candidate_desc = candidate_desc
  50. self.db_desc = db_desc
  51. self.samples = sample_size
  52. self.dist_thresh = dist_thresh
  53. self.callback = callback
  54. def run(self):
  55. print("[{0}] starting ...".format(self.name))
  56. hits = self.find_matching_keypoints(self.candidate_desc, self.db_desc,
  57. self.samples, self.dist_thresh)
  58. self.callback(self.name, hits)
  59. def find_matching_keypoints(self, keypoints1: list, keypoints2: list,
  60. sample_size: int, dist_thresh: float) -> int:
  61. """
  62. Find nearest neighbours for each point in keypoints1 in keypoints2.
  63. Returns number of sufficiently matching keypoints
  64. :param keypoints1:
  65. :param keypoints2:
  66. :param sample_size:
  67. :param dist_thresh:
  68. :return:
  69. """
  70. hits, sum = 0, 0
  71. for i in range(len(keypoints1)):
  72. hit, dist = self.has_matching_keypoint(keypoints1[i], keypoints2, dist_thresh)
  73. if hit:
  74. hits += 1
  75. sum += dist
  76. return hits
  77. def __get_random_selection(self, l: list, num: int) -> list:
  78. result = []
  79. for k in range(num):
  80. result.append(random.choice(l))
  81. return result
  82. def has_matching_keypoint(self, point: np.ndarray, points: list, max_dist: float) -> tuple:
  83. """
  84. Find nearest neighbour for point in points.
  85. :param point:
  86. :param points:
  87. :param max_dist:
  88. :return:
  89. """
  90. tree = KDTree(points)
  91. dist, ind = tree.query([point], k=2)
  92. dist = dist[0] # just resolving nested lists
  93. #print("Distances:{0} Indexes:{1} MaxDist:{2}".format(dist, ind[0], max_dist))
  94. if dist[0] <= max_dist: # second neighbour is found, valid hit
  95. return True, dist[0]
  96. else:
  97. return False, dist[0]