From b9faa78f430e18d20b9705da8707c764ef1f7b26 Mon Sep 17 00:00:00 2001 From: Drew Short Date: Tue, 22 Apr 2014 21:00:42 -0500 Subject: [PATCH] Started work on abstracting out the messaging in the engine. Moving over to dedicated actor(s) for handling messages and progress. --- .../java/com/sothr/imagetools/AppCLI.java | 79 +++++++++++++++---- .../java/com/sothr/imagetools/AppConfig.java | 8 ++ .../sothr/imagetools/ConcurrentEngine.scala | 74 +++++++++++++---- .../scala/com/sothr/imagetools/Engine.scala | 68 +++++++++++++++- .../sothr/imagetools/SequentialEngine.scala | 23 +++++- 5 files changed, 218 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/sothr/imagetools/AppCLI.java b/src/main/java/com/sothr/imagetools/AppCLI.java index 44e64fe..e18f27d 100644 --- a/src/main/java/com/sothr/imagetools/AppCLI.java +++ b/src/main/java/com/sothr/imagetools/AppCLI.java @@ -1,5 +1,8 @@ package com.sothr.imagetools; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; import com.sothr.imagetools.image.SimilarImages; import org.apache.commons.cli.*; import org.slf4j.Logger; @@ -40,31 +43,73 @@ class AppCLI { private static Options getOptions() { //scan a list of directories Options options = new Options(); - options.addOption(new Option("s", true, "scan directories for a list of similar images")); + + //show help + Option helpOption = OptionBuilder.create('h'); + helpOption.setLongOpt("help"); + helpOption.setDescription("Display this help dialog"); + options.addOption(helpOption); + + //scan directories + Option scanOption = OptionBuilder.create('s'); + scanOption.setLongOpt("scan"); + scanOption.setDescription("Scan directories for a list of similar images"); + scanOption.setArgs(1); + scanOption.setArgName("DIRECTORY"); + options.addOption(scanOption); + //scan directories in a recursive manner - options.addOption(new Option("r", false, "scan directories recursively")); + Option recursiveOption = OptionBuilder.create('r'); + recursiveOption.setLongOpt("recursive"); + recursiveOption.setDescription("Scan directories recursively"); + options.addOption(recursiveOption); + + //depth limit + Option depthOption = OptionBuilder.create('d'); + depthOption.setLongOpt("depth"); + depthOption.setDescription("Limit the maximum depth of the recursive search"); + depthOption.setArgs(1); + depthOption.setArgName("INTEGER"); + options.addOption(depthOption); return options; } private static void process(CommandLine cmd) { //scan a comma separated list of paths to search for image similarities - Engine engine = new ConcurrentEngine(); - if (cmd.hasOption('s')) { - Boolean recursive = false; - Integer recursiveDepth = 500; - if (cmd.hasOption('r')) { - recursive = true; - } - String scanList = cmd.getOptionValue('s'); - String[] paths = scanList.split(","); - for (String path : paths) { - List similarImages = engine.getSimilarImagesForDirectory(path, recursive, recursiveDepth); - for (int index = 0; index < similarImages.length(); index++) { - SimilarImages similar = similarImages.apply(index); - System.out.println(similar.toString()); + try { + Engine engine = new ConcurrentEngine(); + + //create the listeners that will be passed onto the actors + ActorSystem system = AppConfig.getAppActorSystem(); + Props cliListenerProps = Props.create(CLIEngineListener.class); + ActorRef cliListener = system.actorOf(cliListenerProps); + + //set the listeners + engine.setProcessedListener(cliListener); + engine.setSimilarityListener(cliListener); + + + if (cmd.hasOption('s')) { + Boolean recursive = false; + Integer recursiveDepth = 500; + if (cmd.hasOption('r')) { + recursive = true; + } + if (cmd.hasOption('d')) { + recursiveDepth = Integer.parseInt(cmd.getOptionValue('d')); + } + String scanList = cmd.getOptionValue('s'); + String[] paths = scanList.split(","); + for (String path : paths) { + List similarImages = engine.getSimilarImagesForDirectory(path, recursive, recursiveDepth); + for (int index = 0; index < similarImages.length(); index++) { + SimilarImages similar = similarImages.apply(index); + System.out.println(similar.toString()); + } } } + } catch (Exception ex) { + throw new IllegalArgumentException("One or more arguments could not be parsed correctly", ex); } } - } diff --git a/src/main/java/com/sothr/imagetools/AppConfig.java b/src/main/java/com/sothr/imagetools/AppConfig.java index 4b48a38..9e89f16 100644 --- a/src/main/java/com/sothr/imagetools/AppConfig.java +++ b/src/main/java/com/sothr/imagetools/AppConfig.java @@ -1,5 +1,6 @@ package com.sothr.imagetools; +import akka.actor.ActorSystem; import com.sothr.imagetools.dao.HibernateUtil; import com.sothr.imagetools.util.ResourceLoader; import com.sothr.imagetools.util.PropertiesService; @@ -34,6 +35,13 @@ public class AppConfig { //Cache defaults private static Boolean configuredCache = false; + // General Akka Actor System + private static ActorSystem appSystem = ActorSystem.create("ITActorSystem"); + + public static ActorSystem getAppActorSystem() { + return appSystem; + } + public static void configureApp() { logger = (Logger)LoggerFactory.getLogger(AppConfig.class); loadProperties(); diff --git a/src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala b/src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala index 000a01a..729934f 100644 --- a/src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala +++ b/src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala @@ -1,7 +1,7 @@ package com.sothr.imagetools import java.io.File -import akka.actor.{Actor, ActorSystem, Props, ActorLogging} +import akka.actor._ import akka.routing.{Broadcast, RoundRobinRouter, SmallestMailboxRouter} import akka.pattern.ask import akka.util.Timeout @@ -11,15 +11,22 @@ import com.sothr.imagetools.hash.HashService import com.sothr.imagetools.util.{PropertiesEnum, PropertiesService} import scala.concurrent.Await import java.lang.Thread -import scala.concurrent.ExecutionContext.Implicits.global import scala.collection.mutable +import akka.routing.Broadcast class ConcurrentEngine extends Engine with grizzled.slf4j.Logging { - val system = ActorSystem("EngineActorSystem") - val engineProcessingController = system.actorOf(Props[ConcurrentEngineProcessingController], name = "EngineProcessingController") - val engineSimilarityController = system.actorOf(Props[ConcurrentEngineSimilarityController], name = "EngineSimilarityController") - implicit val timeout = Timeout(30, TimeUnit.SECONDS) - + val engineProcessingController = system.actorOf(Props[ConcurrentEngineProcessingController], name = "EngineProcessingController") + val engineSimilarityController = system.actorOf(Props[ConcurrentEngineSimilarityController], name = "EngineSimilarityController") + implicit val timeout = Timeout(30, TimeUnit.SECONDS) + + override def setProcessedListener(listenerRef: ActorRef) = { + engineProcessingController ! SetNewListener(listenerRef) + } + + override def setSimilarityListener(listenerRef: ActorRef) = { + engineSimilarityController ! SetNewListener(listenerRef) + } + def getImagesForDirectory(directoryPath:String, recursive:Boolean=false, recursiveDepth:Int=500):List[Image] = { debug(s"Looking for images in directory: $directoryPath") val imageFiles = getAllImageFiles(directoryPath, recursive, recursiveDepth) @@ -104,7 +111,9 @@ class ConcurrentEngine extends Engine with grizzled.slf4j.Logging { } -// exeternal cases // +// external cases // +case class SetNewListener(listenerType: ActorRef) + // processing files into images case class EngineProcessFile(file:File) case object EngineNoMoreFiles @@ -131,13 +140,23 @@ class ConcurrentEngineProcessingController extends Actor with ActorLogging { var processed = 0 var processorsFinished = 0 - + var listener = context.actorOf(Props[DefaultLoggingEngineListener], + name = "ProcessedEngineListener") + + def setListener(newListener: ActorRef) = { + //remove the old listener + this.listener ! PoisonPill + //setup the new listener + this.listener = newListener + } + override def preStart() = { log.info("Staring the controller for processing images") log.info("Using {} actors to process images", numOfRouters) } override def receive = { + case command:SetNewListener => setListener(command.listenerType) case command:EngineProcessFile => processFile(command) case command:EngineFileProcessed => fileProcessed(command) case EngineNoMoreFiles => requestWrapup() @@ -146,6 +165,11 @@ class ConcurrentEngineProcessingController extends Actor with ActorLogging { case EngineGetProcessingResults => getResults() case _ => log.info("received unknown message") } + + override def postStop() = { + super.postStop() + this.listener ! PoisonPill + } def processFile(command:EngineProcessFile) = { log.debug(s"Started evaluating ${command.file.getAbsolutePath}") @@ -155,7 +179,10 @@ class ConcurrentEngineProcessingController extends Actor with ActorLogging { def fileProcessed(command:EngineFileProcessed) = { processed += 1 - if (processed % 25 == 0 || processed == toProcess) log.info(s"Processed $processed/$toProcess") + if (processed % 25 == 0 || processed == toProcess) { + //log.info(s"Processed $processed/$toProcess") + listener ! ComparedFileCount(processed,toProcess) + } if (command.image != null) { log.debug(s"processed image: ${command.image.imagePath}") images += command.image @@ -256,12 +283,23 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { var processorsFinished = 0 + var listener = context.actorOf(Props[DefaultLoggingEngineListener], + name = "SimilarityEngineListener") + + def setListener(newListener: ActorRef) = { + //remove the old listener + this.listener ! PoisonPill + //setup the new listener + this.listener = newListener + } + override def preStart() = { - log.info("Staring the controller for processing similarites between images") - log.info("Using {} actors to process image similarites", numOfRouters) + log.info("Staring the controller for processing similarities between images") + log.info("Using {} actors to process image similarities", numOfRouters) } override def receive = { + case command:SetNewListener => setListener(command.listenerType) case command:EngineCompareImages => findSimilarities(command) case command:EngineCompareImagesComplete => similarityProcessed(command) case EngineNoMoreComparisons => requestWrapup() @@ -271,12 +309,17 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { case _ => log.info("received unknown message") } + override def postStop() = { + super.postStop() + this.listener ! PoisonPill + } def findSimilarities(command:EngineCompareImages) = { log.debug(s"Finding similarities between {} and {} images", command.image1.imagePath, command.images.length) toProcess += 1 if (toProcess % 250 == 0) { - log.info("Sent {} images to be processed for similarites", toProcess) + //log.info("Sent {} images to be processed for similarites", toProcess) + listener ! SubmitMessage(s"Sent $toProcess images to be processed for similarites") } //just relay the command to our workers router ! command @@ -284,7 +327,10 @@ class ConcurrentEngineSimilarityController extends Actor with ActorLogging { def similarityProcessed(command:EngineCompareImagesComplete) = { processed += 1 - if (processed % 25 == 0 || processed == toProcess) log.info(s"Processed $processed/$toProcess") + if (processed % 25 == 0 || processed == toProcess) { + //log.info(s"Processed $processed/$toProcess") + listener ! ScannedFileCount(processed,toProcess) + } if (command.similarImages != null) { log.debug(s"Found similar images: ${command.similarImages}") allSimilarImages += command.similarImages diff --git a/src/main/scala/com/sothr/imagetools/Engine.scala b/src/main/scala/com/sothr/imagetools/Engine.scala index 4e3ac8e..86b5936 100644 --- a/src/main/scala/com/sothr/imagetools/Engine.scala +++ b/src/main/scala/com/sothr/imagetools/Engine.scala @@ -5,15 +5,19 @@ import com.sothr.imagetools.util.DirectoryFilter import scala.collection.mutable import java.io.File import grizzled.slf4j.Logging +import akka.actor.{ActorRef, ActorSystem, ActorLogging, Actor} /** * Created by drew on 1/26/14. */ abstract class Engine extends Logging{ - + val system = ActorSystem("EngineActorSystem") val imageFilter:ImageFilter = new ImageFilter() val imageCache = AppConfig.cacheManager.getCache("images") + def setProcessedListener(listenerType: ActorRef) + def setSimilarityListener(listenerType: ActorRef) + def getAllImageFiles(directoryPath:String, recursive:Boolean=false, recursiveDepth:Int=500):List[File] = { val fileList = new mutable.MutableList[File]() if (directoryPath != null && directoryPath != "") { @@ -47,3 +51,65 @@ abstract class Engine extends Logging{ */ def getSimilarImagesForDirectory(directoryPath:String, recursive:Boolean=false, recursiveDepth:Int=500):List[SimilarImages]; } + +case class SubmitMessage(message:String) +case class ScannedFileCount(count:Integer, total:Integer, message:String=null) +case class ComparedFileCount(count:Integer,total:Integer, message:String=null) +abstract class EngineListener extends Actor with ActorLogging { + override def receive: Actor.Receive = { + case command:SubmitMessage => handleMessage(command) + case command:ScannedFileCount => handleScannedFileCount(command) + case command:ComparedFileCount => handleComparedFileCount(command) + case _ => log.info("received unknown message") + } + + def handleMessage(command:SubmitMessage) + def handleScannedFileCount(command:ScannedFileCount) + def handleComparedFileCount(command:ComparedFileCount) +} + +/** + * Actor for logging output information + */ +class DefaultLoggingEngineListener extends EngineListener with ActorLogging { + override def handleComparedFileCount(command: ComparedFileCount): Unit = { + if (command.message != null) { + log.info(command.message) + } + log.info("Processed {}/{}",command.count,command.total) + } + + override def handleScannedFileCount(command: ScannedFileCount): Unit = { + if (command.message != null) { + log.info(command.message) + } + log.info("Scanned {}/{} For Similarities",command.count,command.total) + } + + override def handleMessage(command: SubmitMessage): Unit = { + log.info(command.message) + } +} + +/** + * Actor for writing progress out to the commandline + */ +class CLIEngineListener extends EngineListener with ActorLogging { + override def handleComparedFileCount(command: ComparedFileCount): Unit = { + if (command.message != null) { + System.out.println(command.message) + } + System.out.println(s"Processed ${command.count}/${command.total}") + } + + override def handleScannedFileCount(command: ScannedFileCount): Unit = { + if (command.message != null) { + System.out.println(command.message) + } + System.out.println(s"Scanned ${command.count}/${command.total} For Similarities") + } + + override def handleMessage(command: SubmitMessage): Unit = { + System.out.println(command.message) + } +} \ No newline at end of file diff --git a/src/main/scala/com/sothr/imagetools/SequentialEngine.scala b/src/main/scala/com/sothr/imagetools/SequentialEngine.scala index 1ffb84f..48133a0 100644 --- a/src/main/scala/com/sothr/imagetools/SequentialEngine.scala +++ b/src/main/scala/com/sothr/imagetools/SequentialEngine.scala @@ -4,12 +4,26 @@ import com.sothr.imagetools.image.{SimilarImages, ImageFilter, Image} import scala.collection.mutable import java.io.File import grizzled.slf4j.Logging +import akka.actor.{ActorRef, Props} /** * Created by drew on 1/26/14. */ class SequentialEngine extends Engine with Logging { + var processedListener = system.actorOf(Props[DefaultLoggingEngineListener], + name = "ProcessedEngineListener") + var similarityListener = system.actorOf(Props[DefaultLoggingEngineListener], + name = "SimilarityEngineListener") + + override def setProcessedListener(listenerRef: ActorRef) = { + this.processedListener = listenerRef + } + + override def setSimilarityListener(listenerRef: ActorRef) = { + this.similarityListener = listenerRef + } + def getImagesForDirectory(directoryPath:String, recursive:Boolean=false, recursiveDepth:Int=500):List[Image] = { debug(s"Looking for images in directory: $directoryPath") val images:mutable.MutableList[Image] = new mutable.MutableList[Image]() @@ -18,7 +32,10 @@ class SequentialEngine extends Engine with Logging { var count = 0 for (file <- imageFiles) { count += 1 - if (count % 25 == 0) info(s"Processed ${count}/${imageFiles.size}") + if (count % 25 == 0) { + //info(s"Processed ${count}/${imageFiles.size}") + processedListener ! ScannedFileCount(count,imageFiles.size) + } val image = ImageService.getImage(file) if (image != null) { images += image @@ -38,7 +55,9 @@ class SequentialEngine extends Engine with Logging { for (rootImage <- images) { if (!ignoreSet.contains(rootImage)) { if (processedCount % 25 == 0) { - info(s"Processed ${processedCount}/${images.length - ignoreSet.size} About ${images.length - processedCount} images to go") + //info(s"Processed ${processedCount}/${images.length - ignoreSet.size} About ${images.length - + // processedCount} images to go") + similarityListener ! ScannedFileCount(processedCount,images.length-ignoreSet.size) } debug(s"Looking for images similar to: ${rootImage.imagePath}") ignoreSet += rootImage