|
|
@ -2,7 +2,7 @@ package com.sothr.imagetools |
|
|
|
|
|
|
|
import java.io.File |
|
|
|
import akka.actor.{Actor, ActorSystem, Props, ActorLogging} |
|
|
|
import akka.routing.{Broadcast, SmallestMailboxRouter} |
|
|
|
import akka.routing.{Broadcast, RoundRobinRouter, SmallestMailboxRouter} |
|
|
|
import akka.pattern.ask |
|
|
|
import akka.util.Timeout |
|
|
|
import java.util.concurrent.TimeUnit |
|
|
@ -55,55 +55,15 @@ class ConcurrentEngine extends Engine with grizzled.slf4j.Logging { |
|
|
|
images.toList |
|
|
|
} |
|
|
|
|
|
|
|
//copied from the sequential engine as the concurrent version has issues currently |
|
|
|
def getSimilarImagesForDirectory(directoryPath:String):List[SimilarImages] = { |
|
|
|
debug(s"Looking for similar images in directory: $directoryPath") |
|
|
|
val images = getImagesForDirectory(directoryPath) |
|
|
|
info(s"Searching ${images.length} images for similarities") |
|
|
|
val ignoreSet = new mutable.HashSet[Image]() |
|
|
|
val allSimilarImages = new mutable.MutableList[SimilarImages]() |
|
|
|
var processedCount = 0 |
|
|
|
var similarCount = 0 |
|
|
|
for (rootImage <- images) { |
|
|
|
if (!ignoreSet.contains(rootImage)) { |
|
|
|
if (processedCount % 25 == 0) { |
|
|
|
info(s"Processed ${processedCount}/${images.length - ignoreSet.size} About ${images.length - processedCount} images to go") |
|
|
|
} |
|
|
|
debug(s"Looking for images similar to: ${rootImage.imagePath}") |
|
|
|
ignoreSet += rootImage |
|
|
|
val similarImages = new mutable.MutableList[Image]() |
|
|
|
for (image <- images) { |
|
|
|
if (!ignoreSet.contains(image)) { |
|
|
|
if (rootImage.isSimilarTo(image)) { |
|
|
|
debug(s"Image: ${image.imagePath} is similar") |
|
|
|
similarImages += image |
|
|
|
ignoreSet += image |
|
|
|
similarCount += 1 |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (similarImages.length > 1) { |
|
|
|
val similar = new SimilarImages(rootImage, similarImages.toList) |
|
|
|
debug(s"Found similar images: ${similar.toString}") |
|
|
|
allSimilarImages += similar |
|
|
|
} |
|
|
|
processedCount += 1 |
|
|
|
} |
|
|
|
} |
|
|
|
info(s"Finished processing ${images.size} images. Found $similarCount similar images") |
|
|
|
allSimilarImages.toList |
|
|
|
} |
|
|
|
|
|
|
|
/* |
|
|
|
//needs to be rebuilt |
|
|
|
def getSimilarImagesForDirectory(directoryPath:String):List[SimilarImages] = { |
|
|
|
debug(s"Looking for similar images in directory: $directoryPath") |
|
|
|
val images = getImagesForDirectory(directoryPath) |
|
|
|
engineSimilarityController ! EngineCompareSetImages(images) |
|
|
|
info(s"Searching ${images.length} images for similarities") |
|
|
|
val allSimilarImages = new mutable.MutableList[SimilarImages]() |
|
|
|
for (rootImage <- images) { |
|
|
|
debug(s"Looking for images similar to: ${rootImage.imagePath}") |
|
|
|
engineSimilarityController ! EngineCompareImages(rootImage, images, null) |
|
|
|
engineSimilarityController ! EngineCompareImages(rootImage) |
|
|
|
} |
|
|
|
//tell the comparison engine there's nothing left to compare |
|
|
|
engineSimilarityController ! EngineNoMoreComparisons |
|
|
@ -124,16 +84,30 @@ class ConcurrentEngine extends Engine with grizzled.slf4j.Logging { |
|
|
|
} |
|
|
|
val f = engineSimilarityController ? EngineGetSimilarityResults |
|
|
|
val result = Await.result(f, timeout.duration).asInstanceOf[List[SimilarImages]] |
|
|
|
allSimilarImages ++= result |
|
|
|
//process the result into a list we want in cleanedSimilarImages |
|
|
|
var count = 0 |
|
|
|
val cleanedSimilarImages = new mutable.MutableList[SimilarImages]() |
|
|
|
val ignoreSet = new mutable.HashSet[Image]() |
|
|
|
for (similarImages <- result) { |
|
|
|
count += 1 |
|
|
|
if (count % 25 == 0 || count == result.length) debug(s"Cleaning similar image $count/$result.length ${result.length-count} left to clean") |
|
|
|
if (!ignoreSet.contains(similarImages.rootImage)) { |
|
|
|
cleanedSimilarImages += similarImages |
|
|
|
ignoreSet += similarImages.rootImage |
|
|
|
for (image <- similarImages.similarImages) { |
|
|
|
ignoreSet += image |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
var similarCount = 0 |
|
|
|
for (similarImage <- allSimilarImages) { |
|
|
|
for (similarImage <- cleanedSimilarImages) { |
|
|
|
similarCount += 1 + similarImage.similarImages.size |
|
|
|
} |
|
|
|
|
|
|
|
info(s"Finished processing ${images.size} images. Found $similarCount similar images") |
|
|
|
allSimilarImages.toList |
|
|
|
}*/ |
|
|
|
cleanedSimilarImages.toList |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -166,6 +140,11 @@ class ConcurrentEngineProcessingController extends Actor with ActorLogging { |
|
|
|
|
|
|
|
var processorsFinished = 0 |
|
|
|
|
|
|
|
override def preStart() = { |
|
|
|
log.info("Staring the controller for processing images") |
|
|
|
log.info("Using {} actors to process images", numOfRouters) |
|
|
|
} |
|
|
|
|
|
|
|
override def receive = { |
|
|
|
case command:EngineProcessFile => processFile(command) |
|
|
|
case command:EngineFileProcessed => fileProcessed(command) |
|
|
@ -189,7 +168,7 @@ class ConcurrentEngineProcessingController extends Actor with ActorLogging { |
|
|
|
|
|
|
|
def fileProcessed(command:EngineFileProcessed) = { |
|
|
|
processed += 1 |
|
|
|
if (processed % 25 == 0) log.info(s"Processed $processed/$toProcess") |
|
|
|
if (processed % 25 == 0 || processed == toProcess) log.info(s"Processed $processed/$toProcess") |
|
|
|
if (command.image != null) { |
|
|
|
log.debug(s"processed image: ${command.image.imagePath}") |
|
|
|
images += command.image |
|
|
@ -267,15 +246,15 @@ class ConcurrentEngineProcessingActor extends Actor with ActorLogging { |
|
|
|
} |
|
|
|
|
|
|
|
//finding similarities between images |
|
|
|
case class EngineCompareImages(image1:Image,images:List[Image],ignoreList:Set[Image]) |
|
|
|
case class EngineCompareImages(image1:Image) |
|
|
|
case class EngineCompareImagesComplete(similarImages:SimilarImages) |
|
|
|
case class EngineCompareSetImages(images:List[Image]) |
|
|
|
case object EngineNoMoreComparisons |
|
|
|
case object EngineIsSimilarityFinished |
|
|
|
case object EngineGetSimilarityResults |
|
|
|
case object EngineActorCompareImagesFinished |
|
|
|
|
|
|
|
class ConcurrentEngineSimilarityController extends Actor with ActorLogging { |
|
|
|
val imageCache = AppConfig.cacheManager.getCache("images") |
|
|
|
val numOfRouters = { |
|
|
|
val max = PropertiesService.get(PropertiesEnum.ConcurrentSimiliartyLimit.toString).toInt |
|
|
|
val processors = Runtime.getRuntime.availableProcessors() |
|
|
@ -283,18 +262,24 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { |
|
|
|
if (processors > max) threads = max else if (processors > 1) threads = processors - 1 else threads = 1 |
|
|
|
threads |
|
|
|
} |
|
|
|
val router = context.actorOf(Props[ConcurrentEngineSimilarityActor].withRouter(SmallestMailboxRouter(nrOfInstances = numOfRouters))) |
|
|
|
val router = context.actorOf(Props[ConcurrentEngineSimilarityActor].withRouter(RoundRobinRouter(nrOfInstances = numOfRouters))) |
|
|
|
|
|
|
|
val allSimilarImages = new mutable.MutableList[SimilarImages] |
|
|
|
val ignoreList = new mutable.HashSet[Image]() |
|
|
|
var numImages = 0 |
|
|
|
var toProcess = 0 |
|
|
|
var processed = 0 |
|
|
|
|
|
|
|
var processorsFinished = 0 |
|
|
|
|
|
|
|
override def preStart() = { |
|
|
|
log.info("Staring the controller for processing similarites between images") |
|
|
|
log.info("Using {} actors to process image similarites", numOfRouters) |
|
|
|
} |
|
|
|
|
|
|
|
override def receive = { |
|
|
|
case command:EngineCompareImages => findSimilarities(command) |
|
|
|
case command:EngineCompareImagesComplete => similarityProcessed(command) |
|
|
|
case command:EngineCompareSetImages => setImageList(command) |
|
|
|
case EngineNoMoreComparisons => requestWrapup() |
|
|
|
case EngineActorCompareImagesFinished => actorProcessingFinished() |
|
|
|
case EngineIsSimilarityFinished => isProcessingFinished() |
|
|
@ -302,24 +287,27 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { |
|
|
|
case _ => log.info("received unknown message") |
|
|
|
} |
|
|
|
|
|
|
|
def setImageList(command:EngineCompareSetImages) = { |
|
|
|
numImages = command.images.length |
|
|
|
router ! Broadcast(command) |
|
|
|
} |
|
|
|
|
|
|
|
def findSimilarities(command:EngineCompareImages) = { |
|
|
|
log.debug(s"Finding similarities between ${command.image1.imagePath} and ${command.images.length} images") |
|
|
|
log.debug(s"Finding similarities between ${command.image1.imagePath} and $numImages images") |
|
|
|
toProcess += 1 |
|
|
|
if (toProcess % 250 == 0 || toProcess == numImages) { |
|
|
|
log.info("Sent {}/{} images to be processed for similarites", toProcess, numImages) |
|
|
|
} |
|
|
|
//just relay the command to our workers |
|
|
|
router ! EngineCompareImages(command.image1, command.images, ignoreList.toSet[Image]) |
|
|
|
router ! EngineCompareImages(command.image1) |
|
|
|
} |
|
|
|
|
|
|
|
def similarityProcessed(command:EngineCompareImagesComplete) = { |
|
|
|
processed += 1 |
|
|
|
if (processed % 25 == 0) log.info(s"Processed $processed/$toProcess") |
|
|
|
if (processed % 25 == 0 || processed == numImages) log.info(s"Processed $processed/$toProcess") |
|
|
|
if (command.similarImages != null) { |
|
|
|
if (!ignoreList.contains(command.similarImages.rootImage)) { |
|
|
|
log.debug(s"Found similar images: ${command.similarImages}") |
|
|
|
allSimilarImages += command.similarImages |
|
|
|
//add the similar images to the ignore list so we don't re-process them constantly |
|
|
|
ignoreList += command.similarImages.rootImage |
|
|
|
ignoreList ++= command.similarImages.similarImages |
|
|
|
} |
|
|
|
log.debug(s"Found similar images: ${command.similarImages}") |
|
|
|
allSimilarImages += command.similarImages |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -332,6 +320,7 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { |
|
|
|
*/ |
|
|
|
def actorProcessingFinished() = { |
|
|
|
processorsFinished += 1 |
|
|
|
log.debug("Similarity Processor Reported Finished") |
|
|
|
} |
|
|
|
|
|
|
|
/* |
|
|
@ -339,6 +328,7 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { |
|
|
|
*/ |
|
|
|
def isProcessingFinished() = { |
|
|
|
try { |
|
|
|
log.debug("Processors Finished {}/{}", processorsFinished, numOfRouters) |
|
|
|
if (processorsFinished >= numOfRouters) sender ! true else sender ! false |
|
|
|
} catch { |
|
|
|
case e: Exception ⇒ |
|
|
@ -367,21 +357,30 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { |
|
|
|
|
|
|
|
class ConcurrentEngineSimilarityActor extends Actor with ActorLogging { |
|
|
|
var ignoreMessages = false |
|
|
|
var imageList = new mutable.MutableList[Image]() |
|
|
|
override def receive = { |
|
|
|
case command:EngineCompareImages => compareImages(command) |
|
|
|
case command:EngineCompareSetImages => cloneAndSetImages(command) |
|
|
|
case EngineNoMoreComparisons => finishedComparisons() |
|
|
|
case EngineActorReactivate => ignoreMessages = false |
|
|
|
case _ => log.info("received unknown message") |
|
|
|
} |
|
|
|
|
|
|
|
def cloneAndSetImages(command:EngineCompareSetImages) = { |
|
|
|
imageList.clear() |
|
|
|
for (image <- command.images) { |
|
|
|
imageList += image.cloneImage |
|
|
|
} |
|
|
|
log.debug("Added {} cloned images to internal list", imageList.length) |
|
|
|
} |
|
|
|
|
|
|
|
def compareImages(command:EngineCompareImages) = { |
|
|
|
if (!ignoreMessages) { |
|
|
|
val similarImages = new mutable.MutableList[Image]() |
|
|
|
for (image <- command.images) { |
|
|
|
if (!command.ignoreList.contains(image) && command.image1 != image) { |
|
|
|
for (image <- imageList) { |
|
|
|
if (!command.image1.equals(image)) { |
|
|
|
if (HashService.areImageHashesSimilar(command.image1.hashes, image.hashes)) { |
|
|
|
similarImages += image |
|
|
|
var ignoreMessages = false |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -399,7 +398,10 @@ class ConcurrentEngineSimilarityActor extends Actor with ActorLogging { |
|
|
|
|
|
|
|
def finishedComparisons() = { |
|
|
|
if (!ignoreMessages) { |
|
|
|
log.info("Commanded to finish processing") |
|
|
|
ignoreMessages = true |
|
|
|
imageList.clear() |
|
|
|
log.debug("Finished processing comparisons") |
|
|
|
sender ! EngineActorCompareImagesFinished |
|
|
|
} |
|
|
|
} |