diff --git a/src/main/resources/default.properties b/src/main/resources/default.properties index 975c5cb..c04eead 100644 --- a/src/main/resources/default.properties +++ b/src/main/resources/default.properties @@ -9,8 +9,8 @@ app.log.info=false app.log.error=true #Concurrency Settings -app.engine.concurrent.similarity.limit=20 -app.engine.concurrent.processing.limit=10 +app.engine.concurrent.similarity.limit=15 +app.engine.concurrent.processing.limit=15 #Default Image Settings #images must be 90% similar diff --git a/src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala b/src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala index 71ba775..f8c5527 100644 --- a/src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala +++ b/src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala @@ -3,55 +3,36 @@ package com.sothr.imagetools import java.io.File import akka.actor.{Actor, ActorSystem, Props, ActorLogging} import akka.routing.{Broadcast, SmallestMailboxRouter} -import akka.event.Logging import akka.pattern.ask import akka.util.Timeout import java.util.concurrent.TimeUnit -import scala.collection.mutable.{MutableList, HashSet} -import com.sothr.imagetools.image.{SimilarImages, ImageFilter, Image} +import com.sothr.imagetools.image.{SimilarImages, Image} import com.sothr.imagetools.util.{PropertiesEnum, PropertiesService} -import scala.concurrent.{Await, blocking, Future} +import scala.concurrent.Await import java.lang.Thread import scala.concurrent.ExecutionContext.Implicits.global - -// exeternal cases // -// processing files into images -case class EngineProcessFile(file:File) -case object EngineNoMoreFiles -case object EngineIsProcessingFinished -case object EngineGetProcessingResults -//finding similarities between images -case class EngineFindSimilarities(images:List[Image]) -case object EngineIsSimilarityFinished -case object EngineGetSimilarityResults - -//internal cases -case class EngineFileProcessed(image:Image) -case object EngineActorProcessingFinished -case class EngineCompareImages(image1:Image,image2:Image) -case object EngineActorCompareImagesRoundFinished -case object EngineActorCompareImagesFinished -case object EngineActorReactivate +import scala.collection.mutable class ConcurrentEngine extends Engine with grizzled.slf4j.Logging { val system = ActorSystem("EngineActorSystem") - val engineController = system.actorOf(Props[ConcurrentEngineController], name = "EngineController") - implicit val timeout = Timeout(5, TimeUnit.SECONDS) + val engineProcessingController = system.actorOf(Props[ConcurrentEngineProcessingController], name = "EngineProcessingController") + val engineSimilarityController = system.actorOf(Props[ConcurrentEngineSimilarityController], name = "EngineSimilarityController") + implicit val timeout = Timeout(30, TimeUnit.SECONDS) def getImagesForDirectory(directoryPath:String):List[Image] = { debug(s"Looking for images in directory: $directoryPath") val directory:File = new File(directoryPath) - val images:MutableList[Image] = new MutableList[Image]() + val images:mutable.MutableList[Image] = new mutable.MutableList[Image]() if (directory.isDirectory) { val files = directory.listFiles(imageFilter) info(s"Found ${files.length} files that are images in directory: $directoryPath") for (file <- files) { - engineController ! EngineProcessFile(file) + engineProcessingController ! EngineProcessFile(file) } - engineController ! EngineNoMoreFiles + engineProcessingController ! EngineNoMoreFiles var doneProcessing = false while(!doneProcessing) { - val f = engineController ? EngineIsProcessingFinished + val f = engineProcessingController ? EngineIsProcessingFinished val result = Await.result(f, timeout.duration).asInstanceOf[Boolean] result match { case true => @@ -64,7 +45,7 @@ class ConcurrentEngine extends Engine with grizzled.slf4j.Logging { //val future = Future { blocking(Thread.sleep(5000L)); "done" } } } - val f = engineController ? EngineGetProcessingResults + val f = engineProcessingController ? EngineGetProcessingResults val result = Await.result(f, timeout.duration).asInstanceOf[List[Image]] images ++= result } else { @@ -78,56 +59,65 @@ class ConcurrentEngine extends Engine with grizzled.slf4j.Logging { debug(s"Looking for similar images in directory: $directoryPath") val images = getImagesForDirectory(directoryPath) info(s"Searching ${images.length} images for similarities") - val ignoreSet = new HashSet[Image]() - val allSimilarImages = new MutableList[SimilarImages]() - var processedCount = 0 - var similarCount = 0 + val allSimilarImages = new mutable.MutableList[SimilarImages]() for (rootImage <- images) { - if (!ignoreSet.contains(rootImage)) { - if (processedCount % 25 == 0) { - info(s"Processed ${processedCount}/${images.length - ignoreSet.size} About ${images.length - processedCount} images to go") - } - debug(s"Looking for images similar to: ${rootImage.imagePath}") - ignoreSet += rootImage - val similarImages = new MutableList[Image]() - for (image <- images) { - if (!ignoreSet.contains(image)) { - if (rootImage.isSimilarTo(image)) { - debug(s"Image: ${image.imagePath} is similar") - similarImages += image - ignoreSet += image - similarCount += 1 - } - } - } - if (similarImages.length > 1) { - val similar = new SimilarImages(rootImage, similarImages.toList) - debug(s"Found similar images: ${similar.toString}") - allSimilarImages += similar - } - processedCount += 1 + debug(s"Looking for images similar to: ${rootImage.imagePath}") + engineSimilarityController ! EngineCompareImages(rootImage, images, null) + } + //tell the comparison engine there's nothing left to compare + engineSimilarityController ! EngineNoMoreComparisons + var doneProcessing = false + while(!doneProcessing) { + val f = engineSimilarityController ? EngineIsSimilarityFinished + val result = Await.result(f, timeout.duration).asInstanceOf[Boolean] + result match { + case true => + doneProcessing = true + debug("Processing Complete") + case false => + debug("Still Processing") + //sleep thread + Thread.sleep(5000L) + //val future = Future { blocking(Thread.sleep(5000L)); "done" } } } + val f = engineSimilarityController ? EngineGetSimilarityResults + val result = Await.result(f, timeout.duration).asInstanceOf[List[SimilarImages]] + allSimilarImages ++= result + + var similarCount = 0 + for (similarImage <- allSimilarImages) { + similarCount += 1 + similarImage.similarImages.size + } + info(s"Finished processing ${images.size} images. Found $similarCount similar images") allSimilarImages.toList } } -class ConcurrentEngineController extends Actor with ActorLogging { +// exeternal cases // +// processing files into images +case class EngineProcessFile(file:File) +case object EngineNoMoreFiles +case object EngineIsProcessingFinished +case object EngineGetProcessingResults + +//internal cases +case class EngineFileProcessed(image:Image) +case object EngineActorProcessingFinished +case object EngineActorReactivate + +class ConcurrentEngineProcessingController extends Actor with ActorLogging { val imageCache = AppConfig.cacheManager.getCache("images") val numOfRouters = PropertiesService.get(PropertiesEnum.ConcurrentProcessingLimit.toString).toInt - val router = context.actorOf(Props[ConcurrentEngineActor].withRouter(SmallestMailboxRouter(nrOfInstances = numOfRouters))) + val router = context.actorOf(Props[ConcurrentEngineProcessingActor].withRouter(SmallestMailboxRouter(nrOfInstances = numOfRouters))) - var images:MutableList[Image] = new MutableList[Image]() + var images:mutable.MutableList[Image] = new mutable.MutableList[Image]() var toProcess = 0 var processed = 0 var processorsFinished = 0 - override def preStart() = { - // initialization code - } - override def receive = { case command:EngineProcessFile => processFile(command) case command:EngineFileProcessed => fileProcessed(command) @@ -151,7 +141,7 @@ class ConcurrentEngineController extends Actor with ActorLogging { def fileProcessed(command:EngineFileProcessed) = { processed += 1 - if (processed % 25 == 0) log.info(s"Processed ${processed}/${toProcess}") + if (processed % 25 == 0) log.info(s"Processed $processed/$toProcess") if (command.image != null) { log.debug(s"processed image: ${command.image.imagePath}") images += command.image @@ -200,7 +190,7 @@ class ConcurrentEngineController extends Actor with ActorLogging { } } -class ConcurrentEngineActor extends Actor with ActorLogging { +class ConcurrentEngineProcessingActor extends Actor with ActorLogging { var ignoreMessages = false override def receive = { case command:EngineProcessFile => processFile(command) @@ -226,4 +216,137 @@ class ConcurrentEngineActor extends Actor with ActorLogging { sender ! EngineActorProcessingFinished } } +} + +//finding similarities between images +case class EngineCompareImages(image1:Image,images:List[Image],ignoreList:Set[Image]) +case class EngineCompareImagesComplete(similarImages:SimilarImages) +case object EngineNoMoreComparisons +case object EngineIsSimilarityFinished +case object EngineGetSimilarityResults +case object EngineActorCompareImagesFinished + +class ConcurrentEngineSimilarityController extends Actor with ActorLogging { + val imageCache = AppConfig.cacheManager.getCache("images") + val numOfRouters = PropertiesService.get(PropertiesEnum.ConcurrentSimiliartyLimit.toString).toInt + val router = context.actorOf(Props[ConcurrentEngineSimilarityActor].withRouter(SmallestMailboxRouter(nrOfInstances = numOfRouters))) + + val allSimilarImages = new mutable.MutableList[SimilarImages] + val ignoreList = new mutable.HashSet[Image]() + var toProcess = 0 + var processed = 0 + + var processorsFinished = 0 + + override def receive = { + case command:EngineCompareImages => findSimilarities(command) + case command:EngineCompareImagesComplete => similarityProcessed(command) + case EngineNoMoreComparisons => requestWrapup() + case EngineActorCompareImagesFinished => actorProcessingFinished() + case EngineIsSimilarityFinished => isProcessingFinished() + case EngineGetSimilarityResults => getResults() + case _ => log.info("received unknown message") + } + + def findSimilarities(command:EngineCompareImages) = { + log.debug(s"Finding similarities between ${command.image1.imagePath} and ${command.images.length} images") + toProcess += 1 + //just relay the command to our workers + router ! EngineCompareImages(command.image1, command.images, ignoreList.toSet[Image]) + } + + def similarityProcessed(command:EngineCompareImagesComplete) = { + processed += 1 + if (processed % 25 == 0) log.info(s"Processed $processed/$toProcess") + if (command.similarImages != null) { + if (!ignoreList.contains(command.similarImages.rootImage)) { + log.debug(s"Found similar images: ${command.similarImages}") + allSimilarImages += command.similarImages + //add the similar images to the ignore list so we don't re-process them constantly + ignoreList += command.similarImages.rootImage + ignoreList ++= command.similarImages.similarImages + } + } + } + + def requestWrapup() = { + router ! Broadcast(EngineNoMoreComparisons) + } + + /* + * Record that a processor is done + */ + def actorProcessingFinished() = { + processorsFinished += 1 + } + + /* + * Check if processing is done + */ + def isProcessingFinished() = { + try { + if (processorsFinished >= numOfRouters) sender ! true else sender ! false + } catch { + case e: Exception ⇒ + sender ! akka.actor.Status.Failure(e) + throw e + } + } + + /* + * Get the results of the processing + */ + def getResults() = { + try { + processorsFinished = 0 + toProcess = 0 + processed = 0 + sender ! allSimilarImages.toList + allSimilarImages.clear() + } catch { + case e: Exception ⇒ + sender ! akka.actor.Status.Failure(e) + throw e + } + } +} + +class ConcurrentEngineSimilarityActor extends Actor with ActorLogging { + var ignoreMessages = false + override def receive = { + case command:EngineCompareImages => compareImages(command) + case EngineNoMoreComparisons => finishedComparisons() + case EngineActorReactivate => ignoreMessages = false + case _ => log.info("received unknown message") + } + + def compareImages(command:EngineCompareImages) = { + if (!ignoreMessages) { + val similarImages = new mutable.MutableList[Image]() + for (image <- command.images) { + if (!command.ignoreList.contains(image) && command.image1 != image) { + if (command.image1.isSimilarTo(image)) { + similarImages += image + var ignoreMessages = false + } + } + } + //only send a message if we find similar images + if (similarImages.length >= 1) { + val similarImage = new SimilarImages(command.image1, similarImages.toList) + log.debug(s"Found ${similarImage.similarImages.length} similar images to ${similarImage.rootImage}") + sender ! EngineCompareImagesComplete(similarImage) + } else { + log.debug(s"Found no similar images to ${command.image1}") + sender ! EngineCompareImagesComplete(null) + } + } + } + + def finishedComparisons() = { + if (!ignoreMessages) { + ignoreMessages = true + sender ! EngineActorCompareImagesFinished + } + } } \ No newline at end of file