Now we have to find a way to find a better way to simulate our 1 second delay, because using Thread.sleep breaks actors management as it will monopolize all threads of the used executor. In fact we would like to simulate a not-blocking 1s processing time.
So we should remove the following Thread.sleep call :
class MyMessageProcessor extends Actor {
def receive = {
case _:DoItMessage => Thread.sleep(1000)
}
}
The solution is straightforward as Akka comes with a scheduler system which allow you to send a message after a given delay.
class MySimulator(system:ActorSystem) extends Actor {
def receive = {
case _:DoItMessage =>
system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done")
}
}
Using scheduleOnce, we didn't monopolize system resources, no thread is used, in fact we achieve a fake processing which simulates an asynchronous response which come 1s later. In the following code, the response is even received asynchronously by a future :
package dummy
import akka.actor._
import akka.util.duration._
import akka.util.Timeout
import akka.pattern.ask
import akka.dispatch.Future
sealed trait MyMessage
case class DoItMessage(cmd:String) extends MyMessage
case class DoneMessage extends MyMessage
object Dummy {
def main(args:Array[String]) {
val howmanyjob=10*1000000
import com.typesafe.config.ConfigFactory
implicit val system=ActorSystem("DummySystem",ConfigFactory.load.getConfig("dummy"))
val simu = system.actorOf(
Props(new MySimulator(system))
.withDispatcher("simu-dispatcher"),
name="simulator")
val appManager = system.actorOf(
Props(new ApplicationManager(system, howmanyjob))
.withDispatcher("simu-dispatcher"),
name="application-manager")
import akka.routing.RoundRobinRouter
val processor = system.actorOf(
Props(new MyMessageProcessor(appManager, simu))
.withDispatcher("workers-dispatcher")
.withRouter(RoundRobinRouter(10)),
name="default")
for(i <- 1 to howmanyjob) {
processor ! DoItMessage("Do the job with ID#%d now".format(i))
}
print("All jobs sent")
}
}
class MyMessageProcessor(appManager:ActorRef, simu:ActorRef) extends Actor {
def receive = {
case msg:DoItMessage =>
implicit val timeout = Timeout(5 minutes)
val receivedTime = System.currentTimeMillis()
val future = simu ? msg
future.onComplete {
case result:Either[Throwable, String] =>
assert(System.currentTimeMillis()-receivedTime >= 1000)
appManager ! DoneMessage
}
}
}
class MySimulator(system:ActorSystem) extends Actor {
def receive = {
case _:DoItMessage =>
// Fake processing, somewhere, the job is executed and we get
// the results 1s later asynchronously
system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done")
}
}
class ApplicationManager(system:ActorSystem, howmanyjob:Int) extends Actor {
val startedTime = System.currentTimeMillis()
var count=0
def receive = {
case DoneMessage =>
count+=1
if (count%(howmanyjob/20)==0) println("%d/%d processed".format(count, howmanyjob))
if (count == howmanyjob) {
val now=System.currentTimeMillis()
println("Everything processed in %d seconds".format((now-startedTime)/1000))
system.shutdown()
}
}
}
The application configuration is the following (application.conf file) :
dummy {
akka {
loglevel = WARNING
actor {
default-dispatcher {
}
}
scheduler {
tick-duration = 50ms
ticks-per-wheel = 1000
}
}
simu-dispatcher {
type = Dispatcher
mailbox-capacity = 100000
}
workers-dispatcher {
mailbox-capacity = 10000
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 0
parallelism-max = 6000
parallelism-factor = 3.0
}
}
}
The build.sbt SBT file (To get dependencies, build, test & run) is the following :
name := "AkkaSandbox" version := "0.1" scalaVersion := "2.9.1" libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0-RC3" resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"Everything is processed in 75 seconds using :
- linux 3.2.1-gentoo-r2 64bits
- AMD Phenom(tm) II X6 1090T (6 CPU cores)
- Java HotSpot(TM) 64-Bit Server 1.6.0.31

No comments:
Post a Comment