Thursday, February 23, 2012

Akka actors versus Scala actors : control the message throughput of actors

In the post following this one, I show how to enhance the example code in order to be able to process a very high number of messages : Akka actors : 10 millions messages processed (1s / message) in 75 seconds !

Scala default actor system doesn't come with any mechanism to control messages throughput between actors, so it is up to you to implement such feature in order to avoid OutOfMemory problem, in particular when actors are not processing messages at the same rate.
Akka framework is very interesting because it brings you all the features to control and manage throughputs. Let's see how it work through a simple example :
import akka.actor._

sealed trait MyMessage
case class DoItMessage(cmd:String) extends MyMessage

object Dummy {
  def main(args:Array[String]) {
    val system=ActorSystem("DummySystem")
    val processor = system.actorOf(Props[MyMessageProcessor],name="default")
    for(i <- 1 to 10000000) {
      processor ! DoItMessage("Do the job with ID#%d now".format(i))
    }
    println("All jobs sent")
  }
}

class MyMessageProcessor extends Actor {
  def receive = {
    case _:DoItMessage => Thread.sleep(1000)
  }
}
Each message sent is more than 50 bytes, so a JVM with a 512Mo of maximum heap size will quickly run out of memory.

The good news is that akka support bounded actor mailboxes, just add a configuration file with such content :
dummy {
  akka {
    loglevel = WARNING
    actor {
      default-dispatcher {
         mailbox-capacity = 10000
      }
    }
  }
}
And make this configuration file taken into account with the following code update :
    import com.typesafe.config.ConfigFactory
    val system=ActorSystem("DummySystem",ConfigFactory.load.getConfig("dummy"))
This time if you run again the previous code, you'll get a stable memory footprint :

Of course it will take a long time to process as each message requires 1s (and actors are blocked during 1s because of Thread.sleep call, which monopolize a thread during the same duration => waste of resources)... If we want to reduce the processing time although each message requires 1s to be taken into account, we'll have to load-balance message processing accross a high number of actors.


CONTEXT : Scala 2.9.1 / SBT 0.11.2 / AKKA 2.0-rc2 / sbteclipse 2.0.0

Revisiting Git/SVN Update script... and let's try to make it faster

The following script is an enhancement of the one shown in the previous post. It now detects all git or svn projects in the current directory, and then update and compile them.

It could have been straightforward to make it faster by just turning "project" collection into a parallel one (projects.par) but unfortunately, it won't work because of the use of a global mutable variable used to store the current worker directory (Classical trap) ! So more work is mandatory to make this faster.
#!/bin/sh
exec scala -deprecation -savecompiled "$0" "$@"
!#
import java.io.File
import sys.process.Process
import sys.process.ProcessBuilder._

// ======================================================================
case class CurDir(cwd:File)
def file(f:String)=new File(f)
def file(d:File, f:String)=new File(d, f)
implicit def stringToCurDir(d:String) = CurDir(file(d))
implicit def fileToCurDir(d:File) = CurDir(d)
implicit def stringToFile(f:String) = file(f)
implicit def stringToProcess(cmd: String)(implicit curDir:CurDir) = Process(cmd, curDir.cwd)
implicit var cwd:CurDir=file(".").getPath
def cd(dir:File=file(util.Properties.userDir)) = cwd=CurDir(dir)
// ======================================================================

trait Project {
  val dir:File
}
case class GitProject(dir:File) extends Project
case class SvnProject(dir:File) extends Project

// Find GIT or SVN project directories
val projects = file(".").listFiles filter { _.isDirectory} flatMap { d =>
  if (file(d, ".git").exists) Some(GitProject(d))
  else if (file(d, ".svn").exists) Some(SvnProject(d))
  else None
}

// Update compile project (update also eclipse conf files generated by sbteclipse plugin)
for(project<- projects) {
  println(project)
  cd(project.dir)
  project match {
    case GitProject(_) => "git pull" !
    case SvnProject(_) => "svn update" !
  }
  "sbt eclipse compile" !
}
Let's now revisit the current directory hack to make it compatible with parallel processing and avoid border side effect.
Therefore, we have to modify the "cd" command hack in order to remove the mutable cwd variable :
#!/bin/sh
exec scala -deprecation -savecompiled "$0" "$@"
!#
import java.io.File
import sys.process.Process
import sys.process.ProcessBuilder._

// ======================================================================
case class CurDir(cwd:File)
def file(f:String)=new File(f)
def file(d:File, f:String)=new File(d, f)
implicit def stringToCurDir(d:String) = CurDir(file(d))
implicit def fileToCurDir(d:File) = CurDir(d)
implicit def stringToFile(f:String) = file(f)
implicit def stringToProcess(cmd: String)(implicit curDir:CurDir) = Process(cmd, curDir.cwd)
def cd[A](dir:File=file(util.Properties.userDir)) (indir:(CurDir)=>A) = indir(CurDir(dir))
// ======================================================================

trait Project {
  val dir:File
}
case class GitProject(dir:File) extends Project
case class SvnProject(dir:File) extends Project

// Find GIT or SVN project directories
val projects = file(".").listFiles filter { _.isDirectory} flatMap { d =>
  if (file(d, ".git").exists) Some(GitProject(d))
  else if (file(d, ".svn").exists) Some(SvnProject(d))
  else None
}

// Update compile project (update also eclipse conf files generated by sbteclipse plugin)
for(project<- projects.par) {
  println(project)
  cd(project.dir) { implicit cwd =>
    project match {
      case GitProject(_) => "git pull" !
      case SvnProject(_) => "svn update" !
    }
    "sbt eclipse compile" !
  }
}

println("%d project updated".format(projects.size))

So we couln't keep the classical shell cd behavior, because it implies a mutable state, so in this script we turn the cd "classical" definition into a new one, with a new argument, a function which will take as argument, the given working directory to use.

In my case (7 projects), using "projects.par", the script executes in 10s instead of 33s !

CONTEXT : Scala 2.9.1

Wednesday, February 1, 2012

Small script to update & compile a list of svn project

Scala script skeleton to update a set of subversion projects.
#!/bin/sh
exec scala -deprecation -savecompiled "$0" "$@"
!#

// ======================================================================
import sys.process.Process
import sys.process.ProcessBuilder._

case class CurDir(cwd:java.io.File)
implicit def stringToCurDir(d:String) = CurDir(new java.io.File(d))
implicit def stringToProcess(cmd: String)(implicit curDir:CurDir) = Process(cmd, curDir.cwd)
implicit def stringSeqToProcess(cmd:Seq[String])(implicit curDir:CurDir) = Process(cmd, curDir.cwd)

implicit var cwd:CurDir=scala.util.Properties.userDir
def cd(dir:String=util.Properties.userDir) = cwd=dir
// ======================================================================

val updateList=List("project1", "project2", "project3")

for(dir<- updateList) {
  println("----------------------------------------")
  println("Processing %s".format(dir))

  cd(dir)

  "svn update" !

  "sbt eclipse compile" !
}