You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

375 lines
13 KiB

  1. package com.sothr.imagetools
  2. import java.io.File
  3. import akka.actor.{Actor, ActorSystem, Props, ActorLogging}
  4. import akka.routing.{Broadcast, RoundRobinRouter, SmallestMailboxRouter}
  5. import akka.pattern.ask
  6. import akka.util.Timeout
  7. import java.util.concurrent.TimeUnit
  8. import com.sothr.imagetools.image.{SimilarImages, Image}
  9. import com.sothr.imagetools.hash.HashService
  10. import com.sothr.imagetools.util.{PropertiesEnum, PropertiesService}
  11. import scala.concurrent.Await
  12. import java.lang.Thread
  13. import scala.concurrent.ExecutionContext.Implicits.global
  14. import scala.collection.mutable
  15. class ConcurrentEngine extends Engine with grizzled.slf4j.Logging {
  16. val system = ActorSystem("EngineActorSystem")
  17. val engineProcessingController = system.actorOf(Props[ConcurrentEngineProcessingController], name = "EngineProcessingController")
  18. val engineSimilarityController = system.actorOf(Props[ConcurrentEngineSimilarityController], name = "EngineSimilarityController")
  19. implicit val timeout = Timeout(30, TimeUnit.SECONDS)
  20. def getImagesForDirectory(directoryPath:String, recursive:Boolean=false, recursiveDepth:Int=500):List[Image] = {
  21. debug(s"Looking for images in directory: $directoryPath")
  22. val imageFiles = getAllImageFiles(directoryPath, recursive, recursiveDepth)
  23. val images:mutable.MutableList[Image] = new mutable.MutableList[Image]()
  24. for (file <- imageFiles) {
  25. engineProcessingController ! EngineProcessFile(file)
  26. }
  27. engineProcessingController ! EngineNoMoreFiles
  28. var doneProcessing = false
  29. while(!doneProcessing) {
  30. val f = engineProcessingController ? EngineIsProcessingFinished
  31. val result = Await.result(f, timeout.duration).asInstanceOf[Boolean]
  32. result match {
  33. case true =>
  34. doneProcessing = true
  35. debug("Processing Complete")
  36. case false =>
  37. debug("Still Processing")
  38. //sleep thread
  39. Thread.sleep(5000L)
  40. //val future = Future { blocking(Thread.sleep(5000L)); "done" }
  41. }
  42. }
  43. val f = engineProcessingController ? EngineGetProcessingResults
  44. val result = Await.result(f, timeout.duration).asInstanceOf[List[Image]]
  45. images ++= result
  46. images.toList
  47. }
  48. //needs to be rebuilt
  49. def getSimilarImagesForDirectory(directoryPath:String, recursive:Boolean=false, recursiveDepth:Int=500):List[SimilarImages] = {
  50. debug(s"Looking for similar images in directory: $directoryPath")
  51. val images = getImagesForDirectory(directoryPath, recursive, recursiveDepth)
  52. info(s"Searching ${images.length} images for similarities")
  53. for (rootImage <- images) {
  54. debug(s"Looking for images similar to: ${rootImage.imagePath}")
  55. engineSimilarityController ! EngineCompareImages(rootImage, images)
  56. }
  57. //tell the comparison engine there's nothing left to compare
  58. engineSimilarityController ! EngineNoMoreComparisons
  59. var doneProcessing = false
  60. while(!doneProcessing) {
  61. val f = engineSimilarityController ? EngineIsSimilarityFinished
  62. val result = Await.result(f, timeout.duration).asInstanceOf[Boolean]
  63. result match {
  64. case true =>
  65. doneProcessing = true
  66. debug("Processing Complete")
  67. case false =>
  68. debug("Still Processing")
  69. //sleep thread
  70. Thread.sleep(5000L)
  71. //val future = Future { blocking(Thread.sleep(5000L)); "done" }
  72. }
  73. }
  74. val f = engineSimilarityController ? EngineGetSimilarityResults
  75. val result = Await.result(f, timeout.duration).asInstanceOf[List[SimilarImages]]
  76. //process the result into a list we want in cleanedSimilarImages
  77. var count = 0
  78. val cleanedSimilarImages = new mutable.MutableList[SimilarImages]()
  79. val ignoreSet = new mutable.HashSet[Image]()
  80. for (similarImages <- result) {
  81. count += 1
  82. if (count % 25 == 0 || count == result.length) debug(s"Cleaning similar image $count/${result.length} ${result.length - count} left to clean")
  83. if (!ignoreSet.contains(similarImages.rootImage)) {
  84. cleanedSimilarImages += similarImages
  85. ignoreSet += similarImages.rootImage
  86. for (image <- similarImages.similarImages) {
  87. ignoreSet += image
  88. }
  89. }
  90. }
  91. var similarCount = 0
  92. for (similarImage <- cleanedSimilarImages) {
  93. similarCount += 1 + similarImage.similarImages.size
  94. }
  95. info(s"Finished processing ${images.size} images. Found $similarCount similar images")
  96. cleanedSimilarImages.toList
  97. }
  98. }
  99. // exeternal cases //
  100. // processing files into images
  101. case class EngineProcessFile(file:File)
  102. case object EngineNoMoreFiles
  103. case object EngineIsProcessingFinished
  104. case object EngineGetProcessingResults
  105. //internal cases
  106. case class EngineFileProcessed(image:Image)
  107. case object EngineActorProcessingFinished
  108. case object EngineActorReactivate
  109. class ConcurrentEngineProcessingController extends Actor with ActorLogging {
  110. val numOfRouters = {
  111. val max = PropertiesService.get(PropertiesEnum.ConcurrentProcessingLimit.toString).toInt
  112. val processors = Runtime.getRuntime.availableProcessors()
  113. var threads = 0
  114. if (processors > max) threads = max else if (processors > 1) threads = processors - 1 else threads = 1
  115. threads
  116. }
  117. val router = context.actorOf(Props[ConcurrentEngineProcessingActor].withRouter(SmallestMailboxRouter(nrOfInstances = numOfRouters)))
  118. var images:mutable.MutableList[Image] = new mutable.MutableList[Image]()
  119. var toProcess = 0
  120. var processed = 0
  121. var processorsFinished = 0
  122. override def preStart() = {
  123. log.info("Staring the controller for processing images")
  124. log.info("Using {} actors to process images", numOfRouters)
  125. }
  126. override def receive = {
  127. case command:EngineProcessFile => processFile(command)
  128. case command:EngineFileProcessed => fileProcessed(command)
  129. case EngineNoMoreFiles => requestWrapup()
  130. case EngineActorProcessingFinished => actorProcessingFinished()
  131. case EngineIsProcessingFinished => isProcessingFinished()
  132. case EngineGetProcessingResults => getResults()
  133. case _ => log.info("received unknown message")
  134. }
  135. def processFile(command:EngineProcessFile) = {
  136. log.debug(s"Started evaluating ${command.file.getAbsolutePath}")
  137. toProcess += 1
  138. router ! command
  139. }
  140. def fileProcessed(command:EngineFileProcessed) = {
  141. processed += 1
  142. if (processed % 25 == 0 || processed == toProcess) log.info(s"Processed $processed/$toProcess")
  143. if (command.image != null) {
  144. log.debug(s"processed image: ${command.image.imagePath}")
  145. images += command.image
  146. }
  147. }
  148. def requestWrapup() = {
  149. router ! Broadcast(EngineNoMoreFiles)
  150. }
  151. /*
  152. * Record that a processor is done
  153. */
  154. def actorProcessingFinished() = {
  155. processorsFinished += 1
  156. }
  157. /*
  158. * Check if processing is done
  159. */
  160. def isProcessingFinished() = {
  161. try {
  162. if (processorsFinished >= numOfRouters) sender ! true else sender ! false
  163. } catch {
  164. case e: Exception
  165. sender ! akka.actor.Status.Failure(e)
  166. throw e
  167. }
  168. }
  169. /*
  170. * Get the results of the processing
  171. */
  172. def getResults() = {
  173. try {
  174. processorsFinished = 0
  175. toProcess = 0
  176. processed = 0
  177. sender ! images.toList
  178. images.clear()
  179. } catch {
  180. case e: Exception
  181. sender ! akka.actor.Status.Failure(e)
  182. throw e
  183. }
  184. }
  185. }
  186. class ConcurrentEngineProcessingActor extends Actor with ActorLogging {
  187. var ignoreMessages = false
  188. override def receive = {
  189. case command:EngineProcessFile => processFile(command)
  190. case EngineNoMoreFiles => finishedProcessingFiles()
  191. case EngineActorReactivate => ignoreMessages = false
  192. case _ => log.info("received unknown message")
  193. }
  194. def processFile(command:EngineProcessFile) = {
  195. if (!ignoreMessages) {
  196. val image = ImageService.getImage(command.file)
  197. if (image != null) {
  198. sender ! EngineFileProcessed(image)
  199. } else {
  200. log.debug(s"Failed to process image: ${command.file.getAbsolutePath}")
  201. }
  202. }
  203. }
  204. def finishedProcessingFiles() = {
  205. if (!ignoreMessages) {
  206. ignoreMessages = true
  207. sender ! EngineActorProcessingFinished
  208. }
  209. }
  210. }
  211. //finding similarities between images
  212. case class EngineCompareImages(image1:Image, images:List[Image])
  213. case class EngineCompareImagesComplete(similarImages:SimilarImages)
  214. case object EngineNoMoreComparisons
  215. case object EngineIsSimilarityFinished
  216. case object EngineGetSimilarityResults
  217. case object EngineActorCompareImagesFinished
  218. class ConcurrentEngineSimilarityController extends Actor with ActorLogging {
  219. val numOfRouters = {
  220. val max = PropertiesService.get(PropertiesEnum.ConcurrentSimiliartyLimit.toString).toInt
  221. val processors = Runtime.getRuntime.availableProcessors()
  222. var threads = 0
  223. if (processors > max) threads = max else if (processors > 1) threads = processors - 1 else threads = 1
  224. threads
  225. }
  226. val router = context.actorOf(Props[ConcurrentEngineSimilarityActor].withRouter(RoundRobinRouter(nrOfInstances = numOfRouters)))
  227. val allSimilarImages = new mutable.MutableList[SimilarImages]
  228. var toProcess = 0
  229. var processed = 0
  230. var processorsFinished = 0
  231. override def preStart() = {
  232. log.info("Staring the controller for processing similarites between images")
  233. log.info("Using {} actors to process image similarites", numOfRouters)
  234. }
  235. override def receive = {
  236. case command:EngineCompareImages => findSimilarities(command)
  237. case command:EngineCompareImagesComplete => similarityProcessed(command)
  238. case EngineNoMoreComparisons => requestWrapup()
  239. case EngineActorCompareImagesFinished => actorProcessingFinished()
  240. case EngineIsSimilarityFinished => isProcessingFinished()
  241. case EngineGetSimilarityResults => getResults()
  242. case _ => log.info("received unknown message")
  243. }
  244. def findSimilarities(command:EngineCompareImages) = {
  245. log.debug(s"Finding similarities between {} and {} images", command.image1.imagePath, command.images.length)
  246. toProcess += 1
  247. if (toProcess % 250 == 0) {
  248. log.info("Sent {} images to be processed for similarites", toProcess)
  249. }
  250. //just relay the command to our workers
  251. router ! command
  252. }
  253. def similarityProcessed(command:EngineCompareImagesComplete) = {
  254. processed += 1
  255. if (processed % 25 == 0 || processed == toProcess) log.info(s"Processed $processed/$toProcess")
  256. if (command.similarImages != null) {
  257. log.debug(s"Found similar images: ${command.similarImages}")
  258. allSimilarImages += command.similarImages
  259. }
  260. }
  261. def requestWrapup() = {
  262. router ! Broadcast(EngineNoMoreComparisons)
  263. }
  264. /*
  265. * Record that a processor is done
  266. */
  267. def actorProcessingFinished() = {
  268. processorsFinished += 1
  269. log.debug("Similarity Processor Reported Finished")
  270. }
  271. /*
  272. * Check if processing is done
  273. */
  274. def isProcessingFinished() = {
  275. try {
  276. log.debug("Processors Finished {}/{}", processorsFinished, numOfRouters)
  277. if (processorsFinished >= numOfRouters) sender ! true else sender ! false
  278. } catch {
  279. case e: Exception
  280. sender ! akka.actor.Status.Failure(e)
  281. throw e
  282. }
  283. }
  284. /*
  285. * Get the results of the processing
  286. */
  287. def getResults() = {
  288. try {
  289. processorsFinished = 0
  290. toProcess = 0
  291. processed = 0
  292. sender ! allSimilarImages.toList
  293. allSimilarImages.clear()
  294. } catch {
  295. case e: Exception
  296. sender ! akka.actor.Status.Failure(e)
  297. throw e
  298. }
  299. }
  300. }
  301. class ConcurrentEngineSimilarityActor extends Actor with ActorLogging {
  302. var ignoreMessages = false
  303. override def receive = {
  304. case command:EngineCompareImages => compareImages(command)
  305. case EngineNoMoreComparisons => finishedComparisons()
  306. case EngineActorReactivate => ignoreMessages = false
  307. case _ => log.info("received unknown message")
  308. }
  309. def compareImages(command:EngineCompareImages) = {
  310. if (!ignoreMessages) {
  311. val similarImages = new mutable.MutableList[Image]()
  312. for (image <- command.images) {
  313. if (!command.image1.equals(image)) {
  314. if (HashService.areImageHashesSimilar(command.image1.hashes, image.hashes)) {
  315. similarImages += image
  316. }
  317. }
  318. }
  319. //only send a message if we find similar images
  320. if (similarImages.length >= 1) {
  321. val similarImage = new SimilarImages(command.image1, similarImages.toList)
  322. log.debug(s"Found ${similarImage.similarImages.length} similar images to ${similarImage.rootImage}")
  323. sender ! EngineCompareImagesComplete(similarImage)
  324. } else {
  325. log.debug(s"Found no similar images to ${command.image1}")
  326. sender ! EngineCompareImagesComplete(null)
  327. }
  328. }
  329. }
  330. def finishedComparisons() = {
  331. if (!ignoreMessages) {
  332. ignoreMessages = true
  333. log.debug("Finished processing comparisons")
  334. sender ! EngineActorCompareImagesFinished
  335. }
  336. }
  337. }