Browse Source

Concurrency added for the get similar images method. Not convinced it is any faster and my be worse than the synchronous version.

master
Drew Short 11 years ago
parent
commit
b05634e002
  1. 4
      src/main/resources/default.properties
  2. 249
      src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala

4
src/main/resources/default.properties

@ -9,8 +9,8 @@ app.log.info=false
app.log.error=true app.log.error=true
#Concurrency Settings #Concurrency Settings
app.engine.concurrent.similarity.limit=20
app.engine.concurrent.processing.limit=10
app.engine.concurrent.similarity.limit=15
app.engine.concurrent.processing.limit=15
#Default Image Settings #Default Image Settings
#images must be 90% similar #images must be 90% similar

249
src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala

@ -3,55 +3,36 @@ package com.sothr.imagetools
import java.io.File import java.io.File
import akka.actor.{Actor, ActorSystem, Props, ActorLogging} import akka.actor.{Actor, ActorSystem, Props, ActorLogging}
import akka.routing.{Broadcast, SmallestMailboxRouter} import akka.routing.{Broadcast, SmallestMailboxRouter}
import akka.event.Logging
import akka.pattern.ask import akka.pattern.ask
import akka.util.Timeout import akka.util.Timeout
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import scala.collection.mutable.{MutableList, HashSet}
import com.sothr.imagetools.image.{SimilarImages, ImageFilter, Image}
import com.sothr.imagetools.image.{SimilarImages, Image}
import com.sothr.imagetools.util.{PropertiesEnum, PropertiesService} import com.sothr.imagetools.util.{PropertiesEnum, PropertiesService}
import scala.concurrent.{Await, blocking, Future}
import scala.concurrent.Await
import java.lang.Thread import java.lang.Thread
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global
// exeternal cases //
// processing files into images
case class EngineProcessFile(file:File)
case object EngineNoMoreFiles
case object EngineIsProcessingFinished
case object EngineGetProcessingResults
//finding similarities between images
case class EngineFindSimilarities(images:List[Image])
case object EngineIsSimilarityFinished
case object EngineGetSimilarityResults
//internal cases
case class EngineFileProcessed(image:Image)
case object EngineActorProcessingFinished
case class EngineCompareImages(image1:Image,image2:Image)
case object EngineActorCompareImagesRoundFinished
case object EngineActorCompareImagesFinished
case object EngineActorReactivate
import scala.collection.mutable
class ConcurrentEngine extends Engine with grizzled.slf4j.Logging { class ConcurrentEngine extends Engine with grizzled.slf4j.Logging {
val system = ActorSystem("EngineActorSystem") val system = ActorSystem("EngineActorSystem")
val engineController = system.actorOf(Props[ConcurrentEngineController], name = "EngineController")
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
val engineProcessingController = system.actorOf(Props[ConcurrentEngineProcessingController], name = "EngineProcessingController")
val engineSimilarityController = system.actorOf(Props[ConcurrentEngineSimilarityController], name = "EngineSimilarityController")
implicit val timeout = Timeout(30, TimeUnit.SECONDS)
def getImagesForDirectory(directoryPath:String):List[Image] = { def getImagesForDirectory(directoryPath:String):List[Image] = {
debug(s"Looking for images in directory: $directoryPath") debug(s"Looking for images in directory: $directoryPath")
val directory:File = new File(directoryPath) val directory:File = new File(directoryPath)
val images:MutableList[Image] = new MutableList[Image]()
val images:mutable.MutableList[Image] = new mutable.MutableList[Image]()
if (directory.isDirectory) { if (directory.isDirectory) {
val files = directory.listFiles(imageFilter) val files = directory.listFiles(imageFilter)
info(s"Found ${files.length} files that are images in directory: $directoryPath") info(s"Found ${files.length} files that are images in directory: $directoryPath")
for (file <- files) { for (file <- files) {
engineController ! EngineProcessFile(file)
engineProcessingController ! EngineProcessFile(file)
} }
engineController ! EngineNoMoreFiles
engineProcessingController ! EngineNoMoreFiles
var doneProcessing = false var doneProcessing = false
while(!doneProcessing) { while(!doneProcessing) {
val f = engineController ? EngineIsProcessingFinished
val f = engineProcessingController ? EngineIsProcessingFinished
val result = Await.result(f, timeout.duration).asInstanceOf[Boolean] val result = Await.result(f, timeout.duration).asInstanceOf[Boolean]
result match { result match {
case true => case true =>
@ -64,7 +45,7 @@ class ConcurrentEngine extends Engine with grizzled.slf4j.Logging {
//val future = Future { blocking(Thread.sleep(5000L)); "done" } //val future = Future { blocking(Thread.sleep(5000L)); "done" }
} }
} }
val f = engineController ? EngineGetProcessingResults
val f = engineProcessingController ? EngineGetProcessingResults
val result = Await.result(f, timeout.duration).asInstanceOf[List[Image]] val result = Await.result(f, timeout.duration).asInstanceOf[List[Image]]
images ++= result images ++= result
} else { } else {
@ -78,56 +59,65 @@ class ConcurrentEngine extends Engine with grizzled.slf4j.Logging {
debug(s"Looking for similar images in directory: $directoryPath") debug(s"Looking for similar images in directory: $directoryPath")
val images = getImagesForDirectory(directoryPath) val images = getImagesForDirectory(directoryPath)
info(s"Searching ${images.length} images for similarities") info(s"Searching ${images.length} images for similarities")
val ignoreSet = new HashSet[Image]()
val allSimilarImages = new MutableList[SimilarImages]()
var processedCount = 0
var similarCount = 0
val allSimilarImages = new mutable.MutableList[SimilarImages]()
for (rootImage <- images) { for (rootImage <- images) {
if (!ignoreSet.contains(rootImage)) {
if (processedCount % 25 == 0) {
info(s"Processed ${processedCount}/${images.length - ignoreSet.size} About ${images.length - processedCount} images to go")
}
debug(s"Looking for images similar to: ${rootImage.imagePath}") debug(s"Looking for images similar to: ${rootImage.imagePath}")
ignoreSet += rootImage
val similarImages = new MutableList[Image]()
for (image <- images) {
if (!ignoreSet.contains(image)) {
if (rootImage.isSimilarTo(image)) {
debug(s"Image: ${image.imagePath} is similar")
similarImages += image
ignoreSet += image
similarCount += 1
}
engineSimilarityController ! EngineCompareImages(rootImage, images, null)
} }
//tell the comparison engine there's nothing left to compare
engineSimilarityController ! EngineNoMoreComparisons
var doneProcessing = false
while(!doneProcessing) {
val f = engineSimilarityController ? EngineIsSimilarityFinished
val result = Await.result(f, timeout.duration).asInstanceOf[Boolean]
result match {
case true =>
doneProcessing = true
debug("Processing Complete")
case false =>
debug("Still Processing")
//sleep thread
Thread.sleep(5000L)
//val future = Future { blocking(Thread.sleep(5000L)); "done" }
} }
if (similarImages.length > 1) {
val similar = new SimilarImages(rootImage, similarImages.toList)
debug(s"Found similar images: ${similar.toString}")
allSimilarImages += similar
}
processedCount += 1
} }
val f = engineSimilarityController ? EngineGetSimilarityResults
val result = Await.result(f, timeout.duration).asInstanceOf[List[SimilarImages]]
allSimilarImages ++= result
var similarCount = 0
for (similarImage <- allSimilarImages) {
similarCount += 1 + similarImage.similarImages.size
} }
info(s"Finished processing ${images.size} images. Found $similarCount similar images") info(s"Finished processing ${images.size} images. Found $similarCount similar images")
allSimilarImages.toList allSimilarImages.toList
} }
} }
class ConcurrentEngineController extends Actor with ActorLogging {
// exeternal cases //
// processing files into images
case class EngineProcessFile(file:File)
case object EngineNoMoreFiles
case object EngineIsProcessingFinished
case object EngineGetProcessingResults
//internal cases
case class EngineFileProcessed(image:Image)
case object EngineActorProcessingFinished
case object EngineActorReactivate
class ConcurrentEngineProcessingController extends Actor with ActorLogging {
val imageCache = AppConfig.cacheManager.getCache("images") val imageCache = AppConfig.cacheManager.getCache("images")
val numOfRouters = PropertiesService.get(PropertiesEnum.ConcurrentProcessingLimit.toString).toInt val numOfRouters = PropertiesService.get(PropertiesEnum.ConcurrentProcessingLimit.toString).toInt
val router = context.actorOf(Props[ConcurrentEngineActor].withRouter(SmallestMailboxRouter(nrOfInstances = numOfRouters)))
val router = context.actorOf(Props[ConcurrentEngineProcessingActor].withRouter(SmallestMailboxRouter(nrOfInstances = numOfRouters)))
var images:MutableList[Image] = new MutableList[Image]()
var images:mutable.MutableList[Image] = new mutable.MutableList[Image]()
var toProcess = 0 var toProcess = 0
var processed = 0 var processed = 0
var processorsFinished = 0 var processorsFinished = 0
override def preStart() = {
// initialization code
}
override def receive = { override def receive = {
case command:EngineProcessFile => processFile(command) case command:EngineProcessFile => processFile(command)
case command:EngineFileProcessed => fileProcessed(command) case command:EngineFileProcessed => fileProcessed(command)
@ -151,7 +141,7 @@ class ConcurrentEngineController extends Actor with ActorLogging {
def fileProcessed(command:EngineFileProcessed) = { def fileProcessed(command:EngineFileProcessed) = {
processed += 1 processed += 1
if (processed % 25 == 0) log.info(s"Processed ${processed}/${toProcess}")
if (processed % 25 == 0) log.info(s"Processed $processed/$toProcess")
if (command.image != null) { if (command.image != null) {
log.debug(s"processed image: ${command.image.imagePath}") log.debug(s"processed image: ${command.image.imagePath}")
images += command.image images += command.image
@ -200,7 +190,7 @@ class ConcurrentEngineController extends Actor with ActorLogging {
} }
} }
class ConcurrentEngineActor extends Actor with ActorLogging {
class ConcurrentEngineProcessingActor extends Actor with ActorLogging {
var ignoreMessages = false var ignoreMessages = false
override def receive = { override def receive = {
case command:EngineProcessFile => processFile(command) case command:EngineProcessFile => processFile(command)
@ -227,3 +217,136 @@ class ConcurrentEngineActor extends Actor with ActorLogging {
} }
} }
} }
//finding similarities between images
case class EngineCompareImages(image1:Image,images:List[Image],ignoreList:Set[Image])
case class EngineCompareImagesComplete(similarImages:SimilarImages)
case object EngineNoMoreComparisons
case object EngineIsSimilarityFinished
case object EngineGetSimilarityResults
case object EngineActorCompareImagesFinished
class ConcurrentEngineSimilarityController extends Actor with ActorLogging {
val imageCache = AppConfig.cacheManager.getCache("images")
val numOfRouters = PropertiesService.get(PropertiesEnum.ConcurrentSimiliartyLimit.toString).toInt
val router = context.actorOf(Props[ConcurrentEngineSimilarityActor].withRouter(SmallestMailboxRouter(nrOfInstances = numOfRouters)))
val allSimilarImages = new mutable.MutableList[SimilarImages]
val ignoreList = new mutable.HashSet[Image]()
var toProcess = 0
var processed = 0
var processorsFinished = 0
override def receive = {
case command:EngineCompareImages => findSimilarities(command)
case command:EngineCompareImagesComplete => similarityProcessed(command)
case EngineNoMoreComparisons => requestWrapup()
case EngineActorCompareImagesFinished => actorProcessingFinished()
case EngineIsSimilarityFinished => isProcessingFinished()
case EngineGetSimilarityResults => getResults()
case _ => log.info("received unknown message")
}
def findSimilarities(command:EngineCompareImages) = {
log.debug(s"Finding similarities between ${command.image1.imagePath} and ${command.images.length} images")
toProcess += 1
//just relay the command to our workers
router ! EngineCompareImages(command.image1, command.images, ignoreList.toSet[Image])
}
def similarityProcessed(command:EngineCompareImagesComplete) = {
processed += 1
if (processed % 25 == 0) log.info(s"Processed $processed/$toProcess")
if (command.similarImages != null) {
if (!ignoreList.contains(command.similarImages.rootImage)) {
log.debug(s"Found similar images: ${command.similarImages}")
allSimilarImages += command.similarImages
//add the similar images to the ignore list so we don't re-process them constantly
ignoreList += command.similarImages.rootImage
ignoreList ++= command.similarImages.similarImages
}
}
}
def requestWrapup() = {
router ! Broadcast(EngineNoMoreComparisons)
}
/*
* Record that a processor is done
*/
def actorProcessingFinished() = {
processorsFinished += 1
}
/*
* Check if processing is done
*/
def isProcessingFinished() = {
try {
if (processorsFinished >= numOfRouters) sender ! true else sender ! false
} catch {
case e: Exception
sender ! akka.actor.Status.Failure(e)
throw e
}
}
/*
* Get the results of the processing
*/
def getResults() = {
try {
processorsFinished = 0
toProcess = 0
processed = 0
sender ! allSimilarImages.toList
allSimilarImages.clear()
} catch {
case e: Exception
sender ! akka.actor.Status.Failure(e)
throw e
}
}
}
class ConcurrentEngineSimilarityActor extends Actor with ActorLogging {
var ignoreMessages = false
override def receive = {
case command:EngineCompareImages => compareImages(command)
case EngineNoMoreComparisons => finishedComparisons()
case EngineActorReactivate => ignoreMessages = false
case _ => log.info("received unknown message")
}
def compareImages(command:EngineCompareImages) = {
if (!ignoreMessages) {
val similarImages = new mutable.MutableList[Image]()
for (image <- command.images) {
if (!command.ignoreList.contains(image) && command.image1 != image) {
if (command.image1.isSimilarTo(image)) {
similarImages += image
var ignoreMessages = false
}
}
}
//only send a message if we find similar images
if (similarImages.length >= 1) {
val similarImage = new SimilarImages(command.image1, similarImages.toList)
log.debug(s"Found ${similarImage.similarImages.length} similar images to ${similarImage.rootImage}")
sender ! EngineCompareImagesComplete(similarImage)
} else {
log.debug(s"Found no similar images to ${command.image1}")
sender ! EngineCompareImagesComplete(null)
}
}
}
def finishedComparisons() = {
if (!ignoreMessages) {
ignoreMessages = true
sender ! EngineActorCompareImagesFinished
}
}
}
Loading…
Cancel
Save