Sunday, March 11, 2012

Dependency injection using scala traits

Consider the following code example which does not depend on any kind of configuration system. We've just defined a trait, XConfiguration, which describes how we can get simple configuration data.
trait XConfigurable {
  def getSetting(key:String):Option[String] = None
}

trait XPlugin extends XConfigurable {
  def domain:String
}

trait XSniffPlugin extends XPlugin {
  val period=getSetting("period") map {_.toLong} getOrElse 5000L
  val periodInS=period/1000
}

abstract class XSSHPlugin extends XSniffPlugin {
  val host = getSetting("host") getOrElse "127.0.0.1"
  val username = getSetting("username") getOrElse util.Properties.userName
  val password = getSetting("password") 
  val port = getSetting("port") map {_.toInt} getOrElse 22
  def cmd:String
}

case class XTopPlugin(domain:String) extends XSSHPlugin {
  val ignoreKeywords = getSetting("ignoreKeywords")
  def cmd="top -c -b -d %d".format(periodInS)
  override def toString = 
    "top each %ds through ssh to %s%s@%s".format(
        period/1000, username, password map {":"+_} getOrElse "", host
        )
}
Now I would like to modify XTopPlugin behavior in order to let it takes its configuration through typesafe config library, BUT without modifying anything in the code.

The idea here is to create a new trait which inheritates from XConfigurable :
object PlayWithTrait {
  import com.typesafe.config.Config
  import com.typesafe.config.ConfigFactory
  import com.typesafe.config.ConfigException.{Missing,WrongType}
    
  private lazy val context = ConfigFactory.parseString("""
        |Defaults {
        |  period=5000
        |}
        |SSHPlugin = ${Defaults}
        |SSHPlugin = {
        |  host:"127.0.0.1"
        |  port:22
        |}
        |TopPlugin = ${SSHPlugin}
        |TopPlugin = {
        |  ignoreKeywords:"tty$"
        |} """.stripMargin).resolve()

  trait XConfigurableUsingConfig extends XConfigurable {
    def config:Config
    override def getSetting(key:String):Option[String] = {
      try {
        Some(config.getString(key))
      } catch {
        case x:Missing => None
        case x:WrongType => None
      }
    }
  }

...
And then we'll take advantage of scala trait feature which allow us to stack a new behavior to an object at instanciation time :
  def main(args: Array[String]) {
    import collection.JavaConversions._
    
    val localCfg = Map(
        "host" -> "192.168.2.1",
        "username" -> "test",
        "password" -> "testtest"
        )
    
    val topConfig = ConfigFactory.parseMap(localCfg).withFallback(context.getConfig("TopPlugin"))
    
    val top = new XTopPlugin("myserver") with XConfigurableUsingConfig {def config=topConfig}
    
    println(top)
  }
So now our XTopPlugin instance is taking its configuration through typesafe "Config" library, using a kind of closure to give the configuration context (topConfig).


One can object that it is less flexible than using for example spring and its xml configuration files, but remember that scala compiler can be easily embedded in any application, so you just have to use a scala configuration file which will get compiled at runtime... it will bring you very awesome dependencies injection features using pure scala.

Thursday, March 1, 2012

Akka actors : 10 millions messages processed (1s / message) in 75 seconds !

In the previous POST, Akka actors versus Scala actors : control the message throughput of actors, we've seen how to control message throughput and avoid OutOfMemory problem in particular when actors doesn't have the same messages processing rate.

Now we have to find a way to find a better way to simulate our 1 second delay, because using Thread.sleep breaks actors management as it will monopolize all threads of the used executor. In fact we would like to simulate a not-blocking 1s processing time.

So we should remove the following Thread.sleep call :
class MyMessageProcessor extends Actor {
  def receive = {
    case _:DoItMessage => Thread.sleep(1000)
  }
}

The solution is straightforward as Akka comes with a scheduler system which allow you to send a message after a given delay.
class MySimulator(system:ActorSystem) extends Actor {
  def receive = {
    case _:DoItMessage =>
      system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done") 
  }
}
Using scheduleOnce, we didn't monopolize system resources, no thread is used, in fact we achieve a fake processing which simulates an asynchronous response which come 1s later. In the following code, the response is even received asynchronously by a future :
package dummy

import akka.actor._
import akka.util.duration._
import akka.util.Timeout
import akka.pattern.ask
import akka.dispatch.Future

sealed trait MyMessage
case class DoItMessage(cmd:String) extends MyMessage
case class DoneMessage extends MyMessage


object Dummy {
  def main(args:Array[String]) {
    
    val howmanyjob=10*1000000
    
 import com.typesafe.config.ConfigFactory
    implicit val system=ActorSystem("DummySystem",ConfigFactory.load.getConfig("dummy"))
    
    val simu = system.actorOf(
        Props(new MySimulator(system))
        .withDispatcher("simu-dispatcher"),
        name="simulator")

    val appManager = system.actorOf(
        Props(new ApplicationManager(system, howmanyjob))
        .withDispatcher("simu-dispatcher"),
        name="application-manager")

        
    import akka.routing.RoundRobinRouter
    val processor = system.actorOf(
        Props(new MyMessageProcessor(appManager, simu))
        .withDispatcher("workers-dispatcher")
        .withRouter(RoundRobinRouter(10)),
        name="default")
        
    for(i <- 1 to howmanyjob) {
      processor ! DoItMessage("Do the job with ID#%d now".format(i))
    }
    print("All jobs sent")
  }
}

class MyMessageProcessor(appManager:ActorRef, simu:ActorRef) extends Actor {
  def receive = {
    case msg:DoItMessage =>
      implicit val timeout = Timeout(5 minutes)
      val receivedTime = System.currentTimeMillis()
      val future = simu ? msg
      future.onComplete { 
        case result:Either[Throwable, String] =>
          assert(System.currentTimeMillis()-receivedTime >= 1000)
          appManager ! DoneMessage
      }
  }
}

class MySimulator(system:ActorSystem) extends Actor {
  def receive = {
    case _:DoItMessage =>
      // Fake processing, somewhere, the job is executed and we get 
      // the results 1s later asynchronously
      system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done") 
  }
}

class ApplicationManager(system:ActorSystem, howmanyjob:Int) extends Actor {
  val startedTime = System.currentTimeMillis()
  var count=0
  def receive = {
    case DoneMessage => 
      count+=1
      if (count%(howmanyjob/20)==0) println("%d/%d processed".format(count, howmanyjob))
      if (count == howmanyjob) {
        val now=System.currentTimeMillis()
        println("Everything processed in %d seconds".format((now-startedTime)/1000))
        system.shutdown() 
      }
  }
}
The application configuration is the following (application.conf file) :
dummy {
  akka {
    loglevel = WARNING
    actor {
      default-dispatcher {
      }
    }
    scheduler {
      tick-duration = 50ms
      ticks-per-wheel = 1000
    }
  }
  
  simu-dispatcher {
    type = Dispatcher
    mailbox-capacity = 100000
  }
  
  workers-dispatcher {
    mailbox-capacity = 10000
    executor = "fork-join-executor"
    fork-join-executor {
      parallelism-min = 0
      parallelism-max = 6000
      parallelism-factor = 3.0      
    }
  }
}
The build.sbt SBT file (To get dependencies, build, test & run) is the following :
name := "AkkaSandbox"

version := "0.1"

scalaVersion := "2.9.1"

libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0-RC3"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

Everything is processed in 75 seconds using :
  • linux 3.2.1-gentoo-r2 64bits
  • AMD Phenom(tm) II X6 1090T (6 CPU cores)
  • Java HotSpot(TM) 64-Bit Server 1.6.0.31
CPU usage ~80%, java memory footprint is below 300Mo; each message is requiring 50-60 bytes, so the 10 million messages are requiring at least 480Mb (but not simultaneously).