|
|
@ -20,50 +20,43 @@ class ConcurrentEngine extends Engine with grizzled.slf4j.Logging { |
|
|
|
val engineSimilarityController = system.actorOf(Props[ConcurrentEngineSimilarityController], name = "EngineSimilarityController") |
|
|
|
implicit val timeout = Timeout(30, TimeUnit.SECONDS) |
|
|
|
|
|
|
|
def getImagesForDirectory(directoryPath:String):List[Image] = { |
|
|
|
def getImagesForDirectory(directoryPath:String, recursive:Boolean=false, recursiveDepth:Int=500):List[Image] = { |
|
|
|
debug(s"Looking for images in directory: $directoryPath") |
|
|
|
val directory:File = new File(directoryPath) |
|
|
|
val imageFiles = getAllImageFiles(directoryPath, recursive, recursiveDepth) |
|
|
|
val images:mutable.MutableList[Image] = new mutable.MutableList[Image]() |
|
|
|
if (directory.isDirectory) { |
|
|
|
val files = directory.listFiles(imageFilter) |
|
|
|
info(s"Found ${files.length} files that are images in directory: $directoryPath") |
|
|
|
for (file <- files) { |
|
|
|
engineProcessingController ! EngineProcessFile(file) |
|
|
|
} |
|
|
|
engineProcessingController ! EngineNoMoreFiles |
|
|
|
var doneProcessing = false |
|
|
|
while(!doneProcessing) { |
|
|
|
val f = engineProcessingController ? EngineIsProcessingFinished |
|
|
|
val result = Await.result(f, timeout.duration).asInstanceOf[Boolean] |
|
|
|
result match { |
|
|
|
case true => |
|
|
|
doneProcessing = true |
|
|
|
debug("Processing Complete") |
|
|
|
case false => |
|
|
|
debug("Still Processing") |
|
|
|
//sleep thread |
|
|
|
Thread.sleep(5000L) |
|
|
|
//val future = Future { blocking(Thread.sleep(5000L)); "done" } |
|
|
|
} |
|
|
|
for (file <- imageFiles) { |
|
|
|
engineProcessingController ! EngineProcessFile(file) |
|
|
|
} |
|
|
|
engineProcessingController ! EngineNoMoreFiles |
|
|
|
var doneProcessing = false |
|
|
|
while(!doneProcessing) { |
|
|
|
val f = engineProcessingController ? EngineIsProcessingFinished |
|
|
|
val result = Await.result(f, timeout.duration).asInstanceOf[Boolean] |
|
|
|
result match { |
|
|
|
case true => |
|
|
|
doneProcessing = true |
|
|
|
debug("Processing Complete") |
|
|
|
case false => |
|
|
|
debug("Still Processing") |
|
|
|
//sleep thread |
|
|
|
Thread.sleep(5000L) |
|
|
|
//val future = Future { blocking(Thread.sleep(5000L)); "done" } |
|
|
|
} |
|
|
|
val f = engineProcessingController ? EngineGetProcessingResults |
|
|
|
val result = Await.result(f, timeout.duration).asInstanceOf[List[Image]] |
|
|
|
images ++= result |
|
|
|
} else { |
|
|
|
error(s"Provided path: $directoryPath is not a directory") |
|
|
|
} |
|
|
|
val f = engineProcessingController ? EngineGetProcessingResults |
|
|
|
val result = Await.result(f, timeout.duration).asInstanceOf[List[Image]] |
|
|
|
images ++= result |
|
|
|
images.toList |
|
|
|
} |
|
|
|
|
|
|
|
//needs to be rebuilt |
|
|
|
def getSimilarImagesForDirectory(directoryPath:String):List[SimilarImages] = { |
|
|
|
def getSimilarImagesForDirectory(directoryPath:String, recursive:Boolean=false, recursiveDepth:Int=500):List[SimilarImages] = { |
|
|
|
debug(s"Looking for similar images in directory: $directoryPath") |
|
|
|
val images = getImagesForDirectory(directoryPath) |
|
|
|
engineSimilarityController ! EngineCompareSetImages(images) |
|
|
|
val images = getImagesForDirectory(directoryPath, recursive, recursiveDepth) |
|
|
|
info(s"Searching ${images.length} images for similarities") |
|
|
|
for (rootImage <- images) { |
|
|
|
debug(s"Looking for images similar to: ${rootImage.imagePath}") |
|
|
|
engineSimilarityController ! EngineCompareImages(rootImage) |
|
|
|
engineSimilarityController ! EngineCompareImages(rootImage, images) |
|
|
|
} |
|
|
|
//tell the comparison engine there's nothing left to compare |
|
|
|
engineSimilarityController ! EngineNoMoreComparisons |
|
|
@ -90,7 +83,7 @@ class ConcurrentEngine extends Engine with grizzled.slf4j.Logging { |
|
|
|
val ignoreSet = new mutable.HashSet[Image]() |
|
|
|
for (similarImages <- result) { |
|
|
|
count += 1 |
|
|
|
if (count % 25 == 0 || count == result.length) debug(s"Cleaning similar image $count/$result.length ${result.length-count} left to clean") |
|
|
|
if (count % 25 == 0 || count == result.length) debug(s"Cleaning similar image $count/${result.length} ${result.length - count} left to clean") |
|
|
|
if (!ignoreSet.contains(similarImages.rootImage)) { |
|
|
|
cleanedSimilarImages += similarImages |
|
|
|
ignoreSet += similarImages.rootImage |
|
|
@ -124,7 +117,6 @@ case object EngineActorProcessingFinished |
|
|
|
case object EngineActorReactivate |
|
|
|
|
|
|
|
class ConcurrentEngineProcessingController extends Actor with ActorLogging { |
|
|
|
val imageCache = AppConfig.cacheManager.getCache("images") |
|
|
|
val numOfRouters = { |
|
|
|
val max = PropertiesService.get(PropertiesEnum.ConcurrentProcessingLimit.toString).toInt |
|
|
|
val processors = Runtime.getRuntime.availableProcessors() |
|
|
@ -158,12 +150,7 @@ class ConcurrentEngineProcessingController extends Actor with ActorLogging { |
|
|
|
def processFile(command:EngineProcessFile) = { |
|
|
|
log.debug(s"Started evaluating ${command.file.getAbsolutePath}") |
|
|
|
toProcess += 1 |
|
|
|
if (imageCache.isKeyInCache(command.file.getAbsolutePath)) { |
|
|
|
log.debug(s"${command.file.getAbsolutePath} was already processed") |
|
|
|
self ! EngineFileProcessed(imageCache.get(command.file.getAbsolutePath).getObjectValue.asInstanceOf[Image]) |
|
|
|
} else { |
|
|
|
router ! command |
|
|
|
} |
|
|
|
router ! command |
|
|
|
} |
|
|
|
|
|
|
|
def fileProcessed(command:EngineFileProcessed) = { |
|
|
@ -246,9 +233,8 @@ class ConcurrentEngineProcessingActor extends Actor with ActorLogging { |
|
|
|
} |
|
|
|
|
|
|
|
//finding similarities between images |
|
|
|
case class EngineCompareImages(image1:Image) |
|
|
|
case class EngineCompareImages(image1:Image, images:List[Image]) |
|
|
|
case class EngineCompareImagesComplete(similarImages:SimilarImages) |
|
|
|
case class EngineCompareSetImages(images:List[Image]) |
|
|
|
case object EngineNoMoreComparisons |
|
|
|
case object EngineIsSimilarityFinished |
|
|
|
case object EngineGetSimilarityResults |
|
|
@ -265,7 +251,6 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { |
|
|
|
val router = context.actorOf(Props[ConcurrentEngineSimilarityActor].withRouter(RoundRobinRouter(nrOfInstances = numOfRouters))) |
|
|
|
|
|
|
|
val allSimilarImages = new mutable.MutableList[SimilarImages] |
|
|
|
var numImages = 0 |
|
|
|
var toProcess = 0 |
|
|
|
var processed = 0 |
|
|
|
|
|
|
@ -279,7 +264,6 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { |
|
|
|
override def receive = { |
|
|
|
case command:EngineCompareImages => findSimilarities(command) |
|
|
|
case command:EngineCompareImagesComplete => similarityProcessed(command) |
|
|
|
case command:EngineCompareSetImages => setImageList(command) |
|
|
|
case EngineNoMoreComparisons => requestWrapup() |
|
|
|
case EngineActorCompareImagesFinished => actorProcessingFinished() |
|
|
|
case EngineIsSimilarityFinished => isProcessingFinished() |
|
|
@ -287,24 +271,20 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { |
|
|
|
case _ => log.info("received unknown message") |
|
|
|
} |
|
|
|
|
|
|
|
def setImageList(command:EngineCompareSetImages) = { |
|
|
|
numImages = command.images.length |
|
|
|
router ! Broadcast(command) |
|
|
|
} |
|
|
|
|
|
|
|
def findSimilarities(command:EngineCompareImages) = { |
|
|
|
log.debug(s"Finding similarities between ${command.image1.imagePath} and $numImages images") |
|
|
|
log.debug(s"Finding similarities between {} and {} images", command.image1.imagePath, command.images.length) |
|
|
|
toProcess += 1 |
|
|
|
if (toProcess % 250 == 0 || toProcess == numImages) { |
|
|
|
log.info("Sent {}/{} images to be processed for similarites", toProcess, numImages) |
|
|
|
if (toProcess % 250 == 0) { |
|
|
|
log.info("Sent {} images to be processed for similarites", toProcess) |
|
|
|
} |
|
|
|
//just relay the command to our workers |
|
|
|
router ! EngineCompareImages(command.image1) |
|
|
|
router ! command |
|
|
|
} |
|
|
|
|
|
|
|
def similarityProcessed(command:EngineCompareImagesComplete) = { |
|
|
|
processed += 1 |
|
|
|
if (processed % 25 == 0 || processed == numImages) log.info(s"Processed $processed/$toProcess") |
|
|
|
if (processed % 25 == 0 || processed == toProcess) log.info(s"Processed $processed/$toProcess") |
|
|
|
if (command.similarImages != null) { |
|
|
|
log.debug(s"Found similar images: ${command.similarImages}") |
|
|
|
allSimilarImages += command.similarImages |
|
|
@ -357,27 +337,17 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { |
|
|
|
|
|
|
|
class ConcurrentEngineSimilarityActor extends Actor with ActorLogging { |
|
|
|
var ignoreMessages = false |
|
|
|
var imageList = new mutable.MutableList[Image]() |
|
|
|
override def receive = { |
|
|
|
case command:EngineCompareImages => compareImages(command) |
|
|
|
case command:EngineCompareSetImages => cloneAndSetImages(command) |
|
|
|
case EngineNoMoreComparisons => finishedComparisons() |
|
|
|
case EngineActorReactivate => ignoreMessages = false |
|
|
|
case _ => log.info("received unknown message") |
|
|
|
} |
|
|
|
|
|
|
|
def cloneAndSetImages(command:EngineCompareSetImages) = { |
|
|
|
imageList.clear() |
|
|
|
for (image <- command.images) { |
|
|
|
imageList += image.cloneImage |
|
|
|
} |
|
|
|
log.debug("Added {} cloned images to internal list", imageList.length) |
|
|
|
} |
|
|
|
|
|
|
|
def compareImages(command:EngineCompareImages) = { |
|
|
|
if (!ignoreMessages) { |
|
|
|
val similarImages = new mutable.MutableList[Image]() |
|
|
|
for (image <- imageList) { |
|
|
|
for (image <- command.images) { |
|
|
|
if (!command.image1.equals(image)) { |
|
|
|
if (HashService.areImageHashesSimilar(command.image1.hashes, image.hashes)) { |
|
|
|
similarImages += image |
|
|
@ -398,9 +368,7 @@ class ConcurrentEngineSimilarityActor extends Actor with ActorLogging { |
|
|
|
|
|
|
|
def finishedComparisons() = { |
|
|
|
if (!ignoreMessages) { |
|
|
|
log.info("Commanded to finish processing") |
|
|
|
ignoreMessages = true |
|
|
|
imageList.clear() |
|
|
|
log.debug("Finished processing comparisons") |
|
|
|
sender ! EngineActorCompareImagesFinished |
|
|
|
} |
|
|
|