From 4581de443e68f83e9f6a40aa3e719032e24d5798 Mon Sep 17 00:00:00 2001 From: Drew Short Date: Wed, 29 Jan 2014 12:49:14 -0500 Subject: [PATCH] Started work on making the processing of images concurrent on a per engine basis --- src/main/resources/application.conf | 4 + .../sothr/imagetools/ConcurrentEngine.scala | 136 ++++++++++++++++++ .../scala/com/sothr/imagetools/Engine.scala | 3 + src/test/resources/application.conf | 4 + 4 files changed, 147 insertions(+) create mode 100644 src/main/resources/application.conf create mode 100644 src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala create mode 100644 src/test/resources/application.conf diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf new file mode 100644 index 0000000..abc24e6 --- /dev/null +++ b/src/main/resources/application.conf @@ -0,0 +1,4 @@ +akka { + event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] + loglevel = "DEBUG" +} \ No newline at end of file diff --git a/src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala b/src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala new file mode 100644 index 0000000..5dbf3e9 --- /dev/null +++ b/src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala @@ -0,0 +1,136 @@ +package com.sothr.imagetools + +import java.io.File +import akka.actor.{Actor, Props, ActorLogging} +import akka.routing.{Broadcast, RoundRobinRouter} +import akka.event.Logging +import scala.collection.mutable.MutableList +import com.sothr.imagetools.image.Image + +//exeternal cases +case class EngineProcessFile(file:File) +case object EngineNoMoreFiles +case object EngineIsProcessingFinished +case object EngineGetProcessingResults + +//internal cases +case class EngineFileProcessed(image:Image) +case object EngineActorProcessingFinished +case object EngineActorReactivate + +class ConcurrentEngine extends Actor with ActorLogging { + val imageCache = AppConfig.cacheManager.getCache("images") + val numOfRouters = 10 + val router = context.actorOf(Props[ConcurrentEngineActor].withRouter(RoundRobinRouter(nrOfInstances = numOfRouters))) + + var images:MutableList[Image] = new MutableList[Image]() + var toProcess = 0 + var processed = 0 + + var processorsFinished = 0 + + override def preStart() = { + // initialization code + } + + override def receive = { + case command:EngineProcessFile => processFile(command) + case command:EngineFileProcessed => fileProcessed(command) + case EngineNoMoreFiles => requestWrapup() + case EngineActorProcessingFinished => actorProcessingFinished() + case EngineIsProcessingFinished => isProcessingFinished() + case EngineGetProcessingResults => getResults() + case _ => log.info("received unknown message") + } + + def processFile(command:EngineProcessFile) = { + log.debug(s"Started evaluating ${command.file.getAbsolutePath}") + toProcess += 1 + if (imageCache.isKeyInCache(command.file.getAbsolutePath)) { + log.debug(s"${command.file.getAbsolutePath} was already processed") + self ! EngineFileProcessed(imageCache.get(command.file.getAbsolutePath).getObjectValue.asInstanceOf[Image]) + } else { + router ! command + } + } + + def fileProcessed(command:EngineFileProcessed) = { + processed += 1 + if (command.image != null) { + log.debug(s"processed image: ${command.image.imagePath}") + images += command.image + } + } + + /* + * + */ + def requestWrapup() = { + router ! Broadcast(EngineNoMoreFiles) + } + + /* + * Record that a processor is done + */ + def actorProcessingFinished() = { + processorsFinished += 1 + } + + /* + * Check if processing is done + */ + def isProcessingFinished() = { + try { + if (processorsFinished >= numOfRouters) sender ! true else sender ! false + } catch { + case e: Exception ⇒ + sender ! akka.actor.Status.Failure(e) + throw e + } + } + + /* + * Get the results of the processing + */ + def getResults() = { + try { + processorsFinished = 0 + toProcess = 0 + processed = 0 + sender ! images.toList + images.clear() + } catch { + case e: Exception ⇒ + sender ! akka.actor.Status.Failure(e) + throw e + } + } +} + +class ConcurrentEngineActor extends Actor with ActorLogging { + var ignoreMessages = false + override def receive = { + case command:EngineProcessFile => processFile(command) + case EngineNoMoreFiles => finishedProcessingFiles() + case EngineActorReactivate => ignoreMessages = false + case _ => log.info("received unknown message") + } + + def processFile(command:EngineProcessFile) = { + if (!ignoreMessages) { + val image = ImageService.getImage(command.file) + if (image != null) { + sender ! EngineFileProcessed(image) + } else { + log.debug(s"Failed to process image: ${command.file.getAbsolutePath}") + } + } + } + + def finishedProcessingFiles() = { + if (!ignoreMessages) { + ignoreMessages = true + sender ! EngineActorProcessingFinished + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/sothr/imagetools/Engine.scala b/src/main/scala/com/sothr/imagetools/Engine.scala index c8f0825..b656ecb 100644 --- a/src/main/scala/com/sothr/imagetools/Engine.scala +++ b/src/main/scala/com/sothr/imagetools/Engine.scala @@ -5,6 +5,7 @@ import scala.collection.mutable import java.io.File import grizzled.slf4j.Logging import net.sf.ehcache.Element +import akka.actor.{ActorSystem, Props} /** * Created by drew on 1/26/14. @@ -13,6 +14,8 @@ class Engine() extends Logging{ val imageFilter:ImageFilter = new ImageFilter() val imageCache = AppConfig.cacheManager.getCache("images") + val system = ActorSystem("EngineActorSystem") + val engineController = system.actorOf(Props[ConcurrentEngine], name = "EngineController") def getImagesForDirectory(directoryPath:String):List[Image] = { debug(s"Looking for images in directory: $directoryPath") diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf new file mode 100644 index 0000000..abc24e6 --- /dev/null +++ b/src/test/resources/application.conf @@ -0,0 +1,4 @@ +akka { + event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] + loglevel = "DEBUG" +} \ No newline at end of file