Browse Source

Working concurrent engine, however only a few methods are truly concurrent. Needs further work.

master
Drew Short 10 years ago
parent
commit
19cd92850d
  1. 2
      src/main/java/com/sothr/imagetools/AppCLI.java
  2. 92
      src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala
  3. 76
      src/main/scala/com/sothr/imagetools/Engine.scala
  4. 76
      src/main/scala/com/sothr/imagetools/SequentialEngine.scala
  5. 20
      src/test/scala/com/sothr/imagetools/EngineTest.scala

2
src/main/java/com/sothr/imagetools/AppCLI.java

@ -40,7 +40,7 @@ class AppCLI {
private static void process(CommandLine cmd) {
//scan a comma separated list of paths to search for image similarities
Engine engine = new Engine();
Engine engine = new ConcurrentEngine();
if (cmd.hasOption('s')) {
String scanList = cmd.getOptionValue('s');
String[] paths = scanList.split(",");

92
src/main/scala/com/sothr/imagetools/ConcurrentEngine.scala

@ -1,11 +1,17 @@
package com.sothr.imagetools
import java.io.File
import akka.actor.{Actor, Props, ActorLogging}
import akka.actor.{Actor, ActorSystem, Props, ActorLogging}
import akka.routing.{Broadcast, RoundRobinRouter}
import akka.event.Logging
import scala.collection.mutable.MutableList
import com.sothr.imagetools.image.Image
import akka.pattern.ask
import akka.util.Timeout
import java.util.concurrent.TimeUnit
import scala.collection.mutable.{MutableList, HashSet}
import com.sothr.imagetools.image.{SimilarImages, ImageFilter, Image}
import scala.concurrent.{Await, blocking, Future}
import java.lang.Thread
import scala.concurrent.ExecutionContext.Implicits.global
//exeternal cases
case class EngineProcessFile(file:File)
@ -18,7 +24,84 @@ case class EngineFileProcessed(image:Image)
case object EngineActorProcessingFinished
case object EngineActorReactivate
class ConcurrentEngine extends Actor with ActorLogging {
class ConcurrentEngine extends Engine with grizzled.slf4j.Logging {
val system = ActorSystem("EngineActorSystem")
val engineController = system.actorOf(Props[ConcurrentEngineController], name = "EngineController")
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
def getImagesForDirectory(directoryPath:String):List[Image] = {
debug(s"Looking for images in directory: $directoryPath")
val directory:File = new File(directoryPath)
val images:MutableList[Image] = new MutableList[Image]()
if (directory.isDirectory) {
val files = directory.listFiles(imageFilter)
info(s"Found ${files.length} files that are images in directory: $directoryPath")
for (file <- files) {
engineController ! EngineProcessFile(file)
}
engineController ! EngineNoMoreFiles
var doneProcessing = false
while(!doneProcessing) {
val f = engineController ? EngineIsProcessingFinished
val result = Await.result(f, timeout.duration).asInstanceOf[Boolean]
result match {
case true =>
doneProcessing = true
debug("Processing Complete")
case false =>
debug("Still Processing")
//sleep thread
val future = Future { blocking(Thread.sleep(5000L)); "done" }
}
}
val f = engineController ? EngineGetProcessingResults
val result = Await.result(f, timeout.duration).asInstanceOf[List[Image]]
images ++= result
} else {
error(s"Provided path: $directoryPath is not a directory")
}
images.toList
}
//needs to be rebuilt as a concurrent capable method
def getSimilarImagesForDirectory(directoryPath:String):List[SimilarImages] = {
debug(s"Looking for similar images in directory: $directoryPath")
val images = getImagesForDirectory(directoryPath)
info(s"Searching ${images.length} images for similarities")
val ignoreSet = new HashSet[Image]()
val allSimilarImages = new MutableList[SimilarImages]()
var processedCount = 0
var similarCount = 0
for (rootImage <- images) {
if (!ignoreSet.contains(rootImage)) {
info(s"Processed ${processedCount}/${images.length - ignoreSet.size} About ${images.length - processedCount} images to go")
debug(s"Looking for images similar to: ${rootImage.imagePath}")
ignoreSet += rootImage
val similarImages = new MutableList[Image]()
for (image <- images) {
if (!ignoreSet.contains(image)) {
if (rootImage.isSimilarTo(image)) {
debug(s"Image: ${image.imagePath} is similar")
similarImages += image
ignoreSet += image
similarCount += 1
}
}
}
if (similarImages.length > 1) {
val similar = new SimilarImages(rootImage, similarImages.toList)
debug(s"Found similar images: ${similar.toString}")
allSimilarImages += similar
}
processedCount += 1
}
}
info(s"Finished processing ${images.size} images. Found $similarCount similar images")
allSimilarImages.toList
}
}
class ConcurrentEngineController extends Actor with ActorLogging {
val imageCache = AppConfig.cacheManager.getCache("images")
val numOfRouters = 10
val router = context.actorOf(Props[ConcurrentEngineActor].withRouter(RoundRobinRouter(nrOfInstances = numOfRouters)))
@ -56,6 +139,7 @@ class ConcurrentEngine extends Actor with ActorLogging {
def fileProcessed(command:EngineFileProcessed) = {
processed += 1
if (processed % 25 == 0) log.info(s"Processed ${processed}/${toProcess}")
if (command.image != null) {
log.debug(s"processed image: ${command.image.imagePath}")
images += command.image

76
src/main/scala/com/sothr/imagetools/Engine.scala

@ -5,78 +5,22 @@ import scala.collection.mutable
import java.io.File
import grizzled.slf4j.Logging
import net.sf.ehcache.Element
import akka.actor.{ActorSystem, Props}
/**
* Created by drew on 1/26/14.
*/
class Engine() extends Logging{
abstract class Engine extends Logging{
val imageFilter:ImageFilter = new ImageFilter()
val imageCache = AppConfig.cacheManager.getCache("images")
val system = ActorSystem("EngineActorSystem")
val engineController = system.actorOf(Props[ConcurrentEngine], name = "EngineController")
def getImagesForDirectory(directoryPath:String):List[Image] = {
debug(s"Looking for images in directory: $directoryPath")
val images:mutable.MutableList[Image] = new mutable.MutableList[Image]()
val directory:File = new File(directoryPath)
var count = 0
if (directory.isDirectory) {
val files = directory.listFiles(imageFilter)
info(s"Found ${files.length} files that are images in directory: $directoryPath")
for (file <- files) {
count += 1
if (count % 25 == 0) info(s"Processed ${count}/${files.size}")
if (imageCache.isKeyInCache(file.getAbsolutePath)) {
images += imageCache.get(file.getAbsolutePath).getObjectValue.asInstanceOf[Image]
} else {
val image = ImageService.getImage(file)
if (image != null) {
imageCache.put(new Element(file.getAbsolutePath, image))
images += image
}
}
}
} else {
error(s"Provided path: $directoryPath is not a directory")
}
images.toList
}
def getSimilarImagesForDirectory(directoryPath:String):List[SimilarImages] = {
debug(s"Looking for similar images in directory: $directoryPath")
val images = getImagesForDirectory(directoryPath)
info(s"Searching ${images.length} images for similarities")
val ignoreSet = new mutable.HashSet[Image]()
val allSimilarImages = new mutable.MutableList[SimilarImages]()
var processedCount = 0
var similarCount = 0
for (rootImage <- images) {
if (!ignoreSet.contains(rootImage)) {
info(s"Processed ${processedCount}/${images.length - ignoreSet.size} About ${images.length - processedCount} images to go")
debug(s"Looking for images similar to: ${rootImage.imagePath}")
ignoreSet += rootImage
val similarImages = new mutable.MutableList[Image]()
for (image <- images) {
if (!ignoreSet.contains(image)) {
if (rootImage.isSimilarTo(image)) {
debug(s"Image: ${image.imagePath} is similar")
similarImages += image
ignoreSet += image
similarCount += 1
}
}
}
if (similarImages.length > 1) {
val similar = new SimilarImages(rootImage, similarImages.toList)
debug(s"Found similar images: ${similar.toString}")
allSimilarImages += similar
}
processedCount += 1
}
}
info(s"Finished processing ${images.size} images. Found $similarCount similar images")
allSimilarImages.toList
}
/**
* Get all images for a directory with hashes
*/
def getImagesForDirectory(directoryPath:String):List[Image];
/**
* Get all similar images for a directory with hashes
*/
def getSimilarImagesForDirectory(directoryPath:String):List[SimilarImages];
}

76
src/main/scala/com/sothr/imagetools/SequentialEngine.scala

@ -0,0 +1,76 @@
package com.sothr.imagetools
import com.sothr.imagetools.image.{SimilarImages, ImageFilter, Image}
import scala.collection.mutable
import java.io.File
import grizzled.slf4j.Logging
import net.sf.ehcache.Element
/**
* Created by drew on 1/26/14.
*/
class SequentialEngine extends Engine with Logging {
def getImagesForDirectory(directoryPath:String):List[Image] = {
debug(s"Looking for images in directory: $directoryPath")
val images:mutable.MutableList[Image] = new mutable.MutableList[Image]()
val directory:File = new File(directoryPath)
var count = 0
if (directory.isDirectory) {
val files = directory.listFiles(imageFilter)
info(s"Found ${files.length} files that are images in directory: $directoryPath")
for (file <- files) {
count += 1
if (count % 25 == 0) info(s"Processed ${count}/${files.size}")
if (imageCache.isKeyInCache(file.getAbsolutePath)) {
images += imageCache.get(file.getAbsolutePath).getObjectValue.asInstanceOf[Image]
} else {
val image = ImageService.getImage(file)
if (image != null) {
imageCache.put(new Element(file.getAbsolutePath, image))
images += image
}
}
}
} else {
error(s"Provided path: $directoryPath is not a directory")
}
images.toList
}
def getSimilarImagesForDirectory(directoryPath:String):List[SimilarImages] = {
debug(s"Looking for similar images in directory: $directoryPath")
val images = getImagesForDirectory(directoryPath)
info(s"Searching ${images.length} images for similarities")
val ignoreSet = new mutable.HashSet[Image]()
val allSimilarImages = new mutable.MutableList[SimilarImages]()
var processedCount = 0
var similarCount = 0
for (rootImage <- images) {
if (!ignoreSet.contains(rootImage)) {
info(s"Processed ${processedCount}/${images.length - ignoreSet.size} About ${images.length - processedCount} images to go")
debug(s"Looking for images similar to: ${rootImage.imagePath}")
ignoreSet += rootImage
val similarImages = new mutable.MutableList[Image]()
for (image <- images) {
if (!ignoreSet.contains(image)) {
if (rootImage.isSimilarTo(image)) {
debug(s"Image: ${image.imagePath} is similar")
similarImages += image
ignoreSet += image
similarCount += 1
}
}
}
if (similarImages.length > 1) {
val similar = new SimilarImages(rootImage, similarImages.toList)
debug(s"Found similar images: ${similar.toString}")
allSimilarImages += similar
}
processedCount += 1
}
}
info(s"Finished processing ${images.size} images. Found $similarCount similar images")
allSimilarImages.toList
}
}

20
src/test/scala/com/sothr/imagetools/EngineTest.scala

@ -4,13 +4,25 @@ package com.sothr.imagetools
* Created by drew on 1/26/14.
*/
class EngineTest extends BaseTest{
test("Test getImagesForDirectory for sample directory") {
val engine:Engine = new Engine()
test("SequentialEngine Test getImagesForDirectory for sample directory") {
val engine:Engine = new SequentialEngine()
assertResult(3) { engine.getImagesForDirectory("sample").length }
}
test("Test getSimilarImagesForDirectory for sample directory") {
val engine = new Engine()
test("SequentialEngine Test getSimilarImagesForDirectory for sample directory") {
val engine = new SequentialEngine()
val similarImages = engine.getSimilarImagesForDirectory("sample")
assertResult(1) { similarImages.length }
assertResult(2) { similarImages(0).similarImages.length }
}
test("ConcurrentEngine Test getImagesForDirectory for sample directory") {
val engine:Engine = new ConcurrentEngine()
assertResult(3) { engine.getImagesForDirectory("sample").length }
}
test("ConcurrentEngine Test getSimilarImagesForDirectory for sample directory") {
val engine = new ConcurrentEngine()
val similarImages = engine.getSimilarImagesForDirectory("sample")
assertResult(1) { similarImages.length }
assertResult(2) { similarImages(0).similarImages.length }

Loading…
Cancel
Save