You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
445 lines
14 KiB
445 lines
14 KiB
package com.sothr.imagetools.engine
|
|
|
|
import java.io.File
|
|
import java.util.concurrent.TimeUnit
|
|
|
|
import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Props}
|
|
import akka.pattern.ask
|
|
import akka.routing.{Broadcast, RoundRobinPool, SmallestMailboxPool}
|
|
import akka.util.Timeout
|
|
import com.sothr.imagetools.hash.{HashService, ImageHash}
|
|
import com.sothr.imagetools.engine.image.{Image, ImageService, SimilarImages}
|
|
import com.sothr.imagetools.engine.util.{PropertiesService, PropertyEnum}
|
|
|
|
import scala.collection.mutable
|
|
import scala.concurrent.Await
|
|
|
|
class ConcurrentEngine extends Engine with grizzled.slf4j.Logging {
|
|
val engineProcessingController = system.actorOf(Props[ConcurrentEngineProcessingController], name = "EngineProcessingController")
|
|
val engineSimilarityController = system.actorOf(Props[ConcurrentEngineSimilarityController], name = "EngineSimilarityController")
|
|
implicit val timeout = Timeout(30, TimeUnit.SECONDS)
|
|
|
|
override def setSearchedListener(listenerRef: ActorRef) = {
|
|
this.searchedListener = listenerRef
|
|
}
|
|
|
|
override def setProcessedListener(listenerRef: ActorRef) = {
|
|
engineProcessingController ! SetNewListener(listenerRef)
|
|
}
|
|
|
|
override def setSimilarityListener(listenerRef: ActorRef) = {
|
|
engineSimilarityController ! SetNewListener(listenerRef)
|
|
}
|
|
|
|
//needs to be rebuilt
|
|
def getSimilarImagesForDirectory(directoryPath: String, recursive: Boolean = false, recursiveDepth: Int = 500): List[SimilarImages] = {
|
|
debug(s"Looking for similar images in directory: $directoryPath")
|
|
val images = getImagesForDirectory(directoryPath, recursive, recursiveDepth)
|
|
info(s"Searching ${images.length} images for similarities")
|
|
// make sure the engine is listening
|
|
engineSimilarityController ! EngineStart
|
|
for (rootImage <- images) {
|
|
debug(s"Looking for images similar to: ${rootImage.imagePath}")
|
|
engineSimilarityController ! EngineCompareImages(rootImage, images)
|
|
}
|
|
//tell the comparison engine there's nothing left to compare
|
|
engineSimilarityController ! EngineNoMoreComparisons
|
|
var doneProcessing = false
|
|
while (!doneProcessing) {
|
|
val f = engineSimilarityController ? EngineIsSimilarityFinished
|
|
val result = Await.result(f, timeout.duration).asInstanceOf[Boolean]
|
|
result match {
|
|
case true =>
|
|
doneProcessing = true
|
|
debug("Processing Complete")
|
|
case false =>
|
|
debug("Still Processing")
|
|
//sleep thread
|
|
Thread.sleep(5000L)
|
|
//val future = Future { blocking(Thread.sleep(5000L)); "done" }
|
|
}
|
|
}
|
|
val f = engineSimilarityController ? EngineGetSimilarityResults
|
|
val result = Await.result(f, timeout.duration).asInstanceOf[List[SimilarImages]]
|
|
|
|
val cleanedSimilarImages = this.processSimilarities(result)
|
|
var similarCount = 0
|
|
for (similarImage <- cleanedSimilarImages) {
|
|
similarCount += 1 + similarImage.similarImages.size
|
|
}
|
|
|
|
info(s"Finished processing ${images.size} images. Found $similarCount similar images")
|
|
cleanedSimilarImages
|
|
}
|
|
|
|
def getImagesForDirectory(directoryPath: String, recursive: Boolean = false, recursiveDepth: Int = 500): List[Image] = {
|
|
debug(s"Looking for images in directory: $directoryPath")
|
|
val imageFiles = getAllImageFiles(directoryPath, recursive, recursiveDepth)
|
|
val images: mutable.MutableList[Image] = new mutable.MutableList[Image]()
|
|
// make sure the engine is listening
|
|
engineProcessingController ! EngineStart
|
|
for (file <- imageFiles) {
|
|
engineProcessingController ! EngineProcessFile(file)
|
|
}
|
|
engineProcessingController ! EngineNoMoreFiles
|
|
var doneProcessing = false
|
|
while (!doneProcessing) {
|
|
val f = engineProcessingController ? EngineIsProcessingFinished
|
|
val result = Await.result(f, timeout.duration).asInstanceOf[Boolean]
|
|
result match {
|
|
case true =>
|
|
doneProcessing = true
|
|
debug("Processing Complete")
|
|
case false =>
|
|
debug("Still Processing")
|
|
//sleep thread
|
|
Thread.sleep(5000L)
|
|
//val future = Future { blocking(Thread.sleep(5000L)); "done" }
|
|
}
|
|
}
|
|
val f = engineProcessingController ? EngineGetProcessingResults
|
|
val result = Await.result(f, timeout.duration).asInstanceOf[List[Image]]
|
|
images ++= result
|
|
images.toList
|
|
}
|
|
}
|
|
|
|
|
|
// external cases //
|
|
case class SetNewListener(listenerType: ActorRef)
|
|
|
|
case object EngineStart
|
|
|
|
// processing files into images
|
|
case class EngineProcessFile(file: File)
|
|
|
|
case object EngineNoMoreFiles
|
|
|
|
case object EngineIsProcessingFinished
|
|
|
|
case object EngineGetProcessingResults
|
|
|
|
//internal cases
|
|
case class EngineFileProcessed(image: Image)
|
|
|
|
case object EngineActorProcessingFinished
|
|
|
|
case object EngineActorReactivate
|
|
|
|
class ConcurrentEngineProcessingController extends Actor with ActorLogging {
|
|
val numOfRouters = {
|
|
val max = PropertiesService.get(PropertyEnum.ConcurrentProcessingLimit.toString).toInt
|
|
val processors = Runtime.getRuntime.availableProcessors()
|
|
var threads = 0
|
|
if (processors > max) threads = max else if (processors > 1) threads = processors - 1 else threads = 1
|
|
threads
|
|
}
|
|
var router = context.actorOf(Props[ConcurrentEngineProcessingActor].withRouter(SmallestMailboxPool(nrOfInstances = numOfRouters)))
|
|
|
|
var images: mutable.MutableList[Image] = new mutable.MutableList[Image]()
|
|
var toProcess = 0
|
|
var processed = 0
|
|
|
|
var processorsFinished = 0
|
|
var listener = context.actorOf(Props[DefaultLoggingEngineListener],
|
|
name = "ProcessedEngineListener")
|
|
|
|
override def preStart() = {
|
|
log.info("Staring the controller for processing images")
|
|
log.info("Using {} actors to process images", numOfRouters)
|
|
}
|
|
|
|
override def receive = {
|
|
case command: SetNewListener => setListener(command.listenerType)
|
|
case command: EngineProcessFile => processFile(command)
|
|
case command: EngineFileProcessed => fileProcessed(command)
|
|
case EngineStart => startEngine()
|
|
case EngineNoMoreFiles => requestWrapup()
|
|
case EngineActorProcessingFinished => actorProcessingFinished()
|
|
case EngineIsProcessingFinished => checkIfProcessingIsFinished()
|
|
case EngineGetProcessingResults => checkForResults()
|
|
case _ => log.info("received unknown message")
|
|
}
|
|
|
|
def setListener(newListener: ActorRef) = {
|
|
//remove the old listener
|
|
this.listener ! PoisonPill
|
|
//setup the new listener
|
|
this.listener = newListener
|
|
}
|
|
|
|
def startEngine() = {
|
|
router ! Broadcast(EngineActorReactivate)
|
|
}
|
|
|
|
def processFile(command: EngineProcessFile) = {
|
|
log.debug(s"Started evaluating ${command.file.getAbsolutePath}")
|
|
toProcess += 1
|
|
router ! command
|
|
}
|
|
|
|
def fileProcessed(command: EngineFileProcessed) = {
|
|
processed += 1
|
|
if (processed % 25 == 0 || processed == toProcess) {
|
|
//log.info(s"Processed $processed/$toProcess")
|
|
listener ! ComparedFileCount(processed, toProcess)
|
|
}
|
|
if (command.image != null) {
|
|
log.debug(s"processed image: ${command.image.imagePath}")
|
|
images += command.image
|
|
}
|
|
}
|
|
|
|
def requestWrapup() = {
|
|
router ! Broadcast(EngineNoMoreFiles)
|
|
}
|
|
|
|
/*
|
|
* Record that a processor is done
|
|
*/
|
|
def actorProcessingFinished() = {
|
|
processorsFinished += 1
|
|
}
|
|
|
|
/*
|
|
* Check if processing is done
|
|
*/
|
|
def checkIfProcessingIsFinished() = {
|
|
try {
|
|
if (processorsFinished >= numOfRouters) sender ! true else sender ! false
|
|
} catch {
|
|
case e: Exception ⇒
|
|
sender ! akka.actor.Status.Failure(e)
|
|
throw e
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Get the results of the processing
|
|
*/
|
|
def checkForResults() = {
|
|
try {
|
|
listener ! SubmitMessage(s"Finished processing $processed/$processed images")
|
|
processorsFinished = 0
|
|
toProcess = 0
|
|
processed = 0
|
|
sender ! images.toList
|
|
images.clear()
|
|
} catch {
|
|
case e: Exception ⇒
|
|
sender ! akka.actor.Status.Failure(e)
|
|
throw e
|
|
}
|
|
}
|
|
|
|
override def postStop() = {
|
|
super.postStop()
|
|
this.listener ! PoisonPill
|
|
}
|
|
}
|
|
|
|
class ConcurrentEngineProcessingActor extends Actor with ActorLogging {
|
|
var ignoreMessages = false
|
|
|
|
override def receive = {
|
|
case command: EngineProcessFile => processFile(command)
|
|
case EngineNoMoreFiles => finishedProcessingFiles()
|
|
case EngineActorReactivate => ignoreMessages = false
|
|
case _ => log.info("received unknown message")
|
|
}
|
|
|
|
def processFile(command: EngineProcessFile) = {
|
|
if (!ignoreMessages) {
|
|
val image = ImageService.getImage(command.file)
|
|
if (image != null) {
|
|
sender ! EngineFileProcessed(image)
|
|
} else {
|
|
log.debug(s"Failed to process image: ${command.file.getAbsolutePath}")
|
|
}
|
|
}
|
|
}
|
|
|
|
def finishedProcessingFiles() = {
|
|
if (!ignoreMessages) {
|
|
ignoreMessages = true
|
|
sender ! EngineActorProcessingFinished
|
|
}
|
|
}
|
|
}
|
|
|
|
//finding similarities between images
|
|
case class EngineCompareImages(image1: Image, images: List[Image])
|
|
|
|
case class EngineCompareImagesComplete(similarImages: SimilarImages)
|
|
|
|
case object EngineNoMoreComparisons
|
|
|
|
case object EngineIsSimilarityFinished
|
|
|
|
case object EngineGetSimilarityResults
|
|
|
|
case object EngineActorCompareImagesFinished
|
|
|
|
class ConcurrentEngineSimilarityController extends Actor with ActorLogging {
|
|
val numOfRouters = {
|
|
val max = PropertiesService.get(PropertyEnum.ConcurrentSimilarityLimit.toString).toInt
|
|
val processors = Runtime.getRuntime.availableProcessors()
|
|
var threads = 0
|
|
if (processors > max) threads = max else if (processors > 1) threads = processors - 1 else threads = 1
|
|
threads
|
|
}
|
|
val router = context.actorOf(Props[ConcurrentEngineSimilarityActor].withRouter(RoundRobinPool(nrOfInstances = numOfRouters)))
|
|
|
|
val allSimilarImages = new mutable.MutableList[SimilarImages]
|
|
var toProcess = 0
|
|
var processed = 0
|
|
|
|
var processorsFinished = 0
|
|
|
|
var listener = context.actorOf(Props[DefaultLoggingEngineListener],
|
|
name = "SimilarityEngineListener")
|
|
|
|
override def preStart() = {
|
|
log.info("Staring the controller for processing similarities between images")
|
|
log.info("Using {} actors to process image similarities", numOfRouters)
|
|
}
|
|
|
|
override def receive = {
|
|
case command: SetNewListener => setListener(command.listenerType)
|
|
case command: EngineCompareImages => findSimilarities(command)
|
|
case command: EngineCompareImagesComplete => similarityProcessed(command)
|
|
case EngineStart => startEngine()
|
|
case EngineNoMoreComparisons => requestWrapup()
|
|
case EngineActorCompareImagesFinished => actorProcessingFinished()
|
|
case EngineIsSimilarityFinished => checkIfProcessingIsFinished()
|
|
case EngineGetSimilarityResults => checkForResults()
|
|
case _ => log.info("received unknown message")
|
|
}
|
|
|
|
def setListener(newListener: ActorRef) = {
|
|
//remove the old listener
|
|
this.listener ! PoisonPill
|
|
//setup the new listener
|
|
this.listener = newListener
|
|
}
|
|
|
|
def startEngine() = {
|
|
router ! Broadcast(EngineActorReactivate)
|
|
}
|
|
|
|
def findSimilarities(command: EngineCompareImages) = {
|
|
log.debug(s"Finding similarities between {} and {} images", command.image1.imagePath, command.images.length)
|
|
toProcess += 1
|
|
if (toProcess % 250 == 0) {
|
|
//log.info("Sent {} images to be processed for similarites", toProcess)
|
|
listener ! SubmitMessage(s"Sent $toProcess images to be processed for similarites")
|
|
}
|
|
//just relay the command to our workers
|
|
router ! command
|
|
}
|
|
|
|
def similarityProcessed(command: EngineCompareImagesComplete) = {
|
|
processed += 1
|
|
if (processed % 25 == 0 || processed == toProcess) {
|
|
//log.info(s"Processed $processed/$toProcess")
|
|
listener ! ScannedFileCount(processed, toProcess)
|
|
}
|
|
if (command.similarImages != null) {
|
|
log.debug(s"Found similar images: ${command.similarImages}")
|
|
allSimilarImages += command.similarImages
|
|
}
|
|
}
|
|
|
|
def requestWrapup() = {
|
|
router ! Broadcast(EngineNoMoreComparisons)
|
|
}
|
|
|
|
/*
|
|
* Record that a processor is done
|
|
*/
|
|
def actorProcessingFinished() = {
|
|
processorsFinished += 1
|
|
log.debug("Similarity Processor Reported Finished")
|
|
}
|
|
|
|
/*
|
|
* Check if processing is done
|
|
*/
|
|
def checkIfProcessingIsFinished() = {
|
|
try {
|
|
log.debug("Processors finished {}/{}", processorsFinished, numOfRouters)
|
|
if (processorsFinished >= numOfRouters) sender ! true else sender ! false
|
|
} catch {
|
|
case e: Exception ⇒
|
|
sender ! akka.actor.Status.Failure(e)
|
|
throw e
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Get the results of the processing
|
|
*/
|
|
def checkForResults() = {
|
|
try {
|
|
listener ! SubmitMessage(s"Finished scanning $processed/$processed images")
|
|
processorsFinished = 0
|
|
toProcess = 0
|
|
processed = 0
|
|
sender ! allSimilarImages.toList
|
|
allSimilarImages.clear()
|
|
} catch {
|
|
case e: Exception ⇒
|
|
sender ! akka.actor.Status.Failure(e)
|
|
throw e
|
|
}
|
|
}
|
|
|
|
override def postStop() = {
|
|
super.postStop()
|
|
this.listener ! PoisonPill
|
|
}
|
|
}
|
|
|
|
class ConcurrentEngineSimilarityActor extends Actor with ActorLogging {
|
|
var ignoreMessages = false
|
|
|
|
override def receive = {
|
|
case command: EngineCompareImages => compareImages(command)
|
|
case EngineNoMoreComparisons => finishedComparisons()
|
|
case EngineActorReactivate => ignoreMessages = false
|
|
case _ => log.info("received unknown message")
|
|
}
|
|
|
|
def compareImages(command: EngineCompareImages) = {
|
|
if (!ignoreMessages) {
|
|
val similarImages = new mutable.MutableList[Image]()
|
|
for (image <- command.images) {
|
|
if (!command.image1.equals(image)) {
|
|
val image1Hashes = ImageService.convertToImageHashDTO(command.image1.hashes)
|
|
val imageHashes = ImageService.convertToImageHashDTO(image.hashes)
|
|
if (HashService.areImageHashesSimilar(PropertiesService.aHashSettings, PropertiesService.dHashSettings, PropertiesService.pHashSettings, image1Hashes, imageHashes)) {
|
|
similarImages += image
|
|
}
|
|
}
|
|
}
|
|
//only send a message if we find similar images
|
|
if (similarImages.length >= 1) {
|
|
similarImages += command.image1
|
|
val similarImage = new SimilarImages(similarImages.toSet)
|
|
log.debug(s"Found ${similarImage.similarImages.size} similar images to ${command.image1}")
|
|
sender ! EngineCompareImagesComplete(similarImage)
|
|
} else {
|
|
log.debug(s"Found no similar images to ${command.image1}")
|
|
sender ! EngineCompareImagesComplete(null)
|
|
}
|
|
}
|
|
}
|
|
|
|
def finishedComparisons() = {
|
|
if (!ignoreMessages) {
|
|
ignoreMessages = true
|
|
log.debug("Finished processing comparisons")
|
|
sender ! EngineActorCompareImagesFinished
|
|
}
|
|
}
|
|
}
|