Monday, November 26, 2012

Sharing information between SBT build.sbt and your project code...

One way to share data between SBT build.sbt file and your project code is to automatically generate a class. This is quite easy, everything is done directly inside build.sbt file :
import AssemblyKeys._

seq(assemblySettings: _*)

name := "ScalaDummyProject"

version := "0.1.3"

scalaVersion := "2.9.2"

scalacOptions ++= Seq("-unchecked", "-deprecation")

mainClass in assembly := Some("dummy.Dummy")

jarName in assembly := "dummy.jar"

libraryDependencies += "org.scalatest" %% "scalatest" % "1.8" % "test"

libraryDependencies += "junit" % "junit" % "4.10" % "test"


sourceGenerators in Compile <+= 
 (sourceManaged in Compile, version, name, jarName in assembly) map {
  (dir, version, projectname, jarexe) =>
  val file = dir / "dummy" / "MetaInfo.scala"
  IO.write(file,
  """package dummy
    |object MetaInfo { 
    |  val version="%s"
    |  val projectName="%s"
    |  val jarbasename="%s"
    |}
    |""".stripMargin.format(version, projectname, jarexe.split("[.]").head) )
  Seq(file)
}

And that's all, then once done, you can write such things :
package dummy

object Dummy {
  val message = "Hello %s by %s release %s".format(
       util.Properties.userName,
       MetaInfo.projectName,
       MetaInfo.version
  )

  def main(args:Array[String]) {
    println(message)
  }
}

Full example is available on github.

Using custom libraries directly in SBT task code...

Using custom library directly in SBT task code is quite easy, just add your dependencies into project/plugins.sbt, and then your imports in built.sbt and that's all folk.

The following example shows how to use a custom SSH API (JASSH) to send an executable jar to a remote server :

import AssemblyKeys._
import jassh._

seq(assemblySettings: _*)

name := "MyProject"

version := "0.1"

scalaVersion := "2.10.0-RC2"

mainClass in assembly := Some("com.mycompany.myproject.Main")

jarName in assembly := "myproject.jar"


...


TaskKey[Unit]("export", "Send 2 server")<<= (assembly in assembly) map { jarfile=>
  SSH.once(host="localhost", username="test") { ssh =>
    println("Exporting %s on %s".format(jarfile.getPath, ssh.execute("hostname")))
    ssh.send(jarfile.getPath, jarfile.getName)
  }
}

The content of SBT project/plugins.sbt :
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.4")

libraryDependencies += "fr.janalyse" %% "janalyse-ssh" % "0.9.5-b3" % "compile"

resolvers += "JAnalyse repository" at "http://www.janalyse.fr/repository/"

It is worth noting that dependency management system is used both for your project and for your build specification.

Tuesday, October 23, 2012

In order to go far beyond of simple configuration files

I'm fed up with classic configuration approach through static files content using various formats such as properties, JSON, XML, ... That's right, that they are easy to implement and use, but they do not allow to extend or personalize application in depth.

One usage case that I've in mind, is to allow insertion of small application code fragments directly inside the configuration file. Once loaded (and of course automatically compiled), the code fragment is directly executed, it becomes directly part of the application ! It will allow very good performances, no systematic internal tests, no parsing, simplicity, syntax coherency and type safety.

One would argue that using a programming language directly inside a configuration file would make it becoming in fact a source file... but by using DSL techniques, on the fly compile/load, it is possible to make it transparent in particular with simple configuration content.

Of course, there are drawbacks, one of them concerns security, this configuration power must only be available to application administrator, it must not become accessible to anyone... an other one is that the compiler invocation has a high cpu cost, it is mandatory to save compiled classes for future usage.

The approach is the following :
  • the configuration file is the content of a scala class which inherits config DSL dedicated to your requirements.
  • when the application starts, it looks for an already compiled config file, if none is found then the configuration file is compiled and saved
  • Then the compiled configuration file is loaded, and you get an instance of your generated configuration class


Now let's have a look at my first implementation, following the described approach :
import java.net.URLClassLoader
import java.util.jar.Manifest
import scala.tools.nsc.Global
import scala.tools.nsc.reporters.ConsoleReporter
import scala.tools.nsc.Settings
import scala.tools.nsc.io.VirtualDirectory
import scala.tools.nsc.interpreter.AbstractFileClassLoader
import scala.tools.nsc.io.File
import scala.tools.nsc.io.JarWriter
import scala.tools.nsc.util.BatchSourceFile
import java.util.jar.JarFile
import java.util.jar.JarEntry
import scala.reflect.io._

/**
 * A config base class, a place where to put generic DSL logic (perhaps)
 */
trait ConfigBase {}

/**
 * Config compiler logic
 */
class ConfigCompiler[T <% ConfigBase](val configFilename: String,
                                      val baseclass: Class[T],
                                      val chosenClassNameOption: Option[String]) {

  class CustomConsoleReporter(settings: Settings) extends ConsoleReporter(settings) {
  }

  private def computeConfigClassName(): String = chosenClassNameOption getOrElse {
    "Generated" + (baseclass.getName().split("[.]").map(_.capitalize).mkString(""))
  }

  val chosenClassName = computeConfigClassName()
  val configFile = File(configFilename)
  val backupFile = File(configFilename.split("[.]").init.mkString("", ".", ".jar"))

  /**
   * Compile configuration file
   */
  private def compileConfig(): VirtualDirectory = {
    val settings = new Settings()
    settings.usejavacp.value = true
    settings.deprecation.value = true

    val body = io.Source.fromFile(configFilename)
    val code =
      """class %s extends %s {
        |%s
        |}""".stripMargin.format(
        chosenClassName,
        baseclass.getName(),
        body.getLines.mkString("\n"))

    val sources = List(new BatchSourceFile(configFilename, code))
    val outdir = new VirtualDirectory("(memory)", None)

    settings.outputDirs.setSingleOutput(outdir)

    val global = new Global(settings, new ConsoleReporter(settings))
    lazy val compiler = new global.Run()

    compiler.compileSources(sources)

    outdir
  }

  /**
   * Backup compiled file
   */
  private def backup(generatedClasses: VirtualDirectory): VirtualDirectory = {
    val storedir = configFile.parent

    val writer = new JarWriter(backupFile, new java.util.jar.Manifest())
    for (f <- generatedClasses) writer.addStream(new JarEntry(f.name), f.input)
    writer.close

    generatedClasses
  }

  /**
   * Generate the class load that will allow just compiled class to be loaded
   */
  private def buildConfigClassLoader(): ClassLoader = {
    if (backupFile.exists && configFile.lastModified <= backupFile.lastModified)
      new URLClassLoader(Array(backupFile.toURL), getClass.getClassLoader())
    else
      new AbstractFileClassLoader(backup(compileConfig()), getClass.getClassLoader())
  }

  val configClassLoader = buildConfigClassLoader
  val configClass = configClassLoader.loadClass(chosenClassName)
  val config: T = configClass.newInstance().asInstanceOf[T]
}

/**
 * ConfigCompiler Object, the main entry point to get your config
 */

object ConfigCompiler {
  def loadConfig[T <% ConfigBase](
    filename: String,
    baseclass: Class[T],
    chosenClassName: Option[String] = None): T = {
    val compiler = new ConfigCompiler(filename, baseclass, chosenClassName)
    compiler.config
  }
}

Usage

val config =
  ConfigCompiler.loadConfig(
            "conf/dummy.conf", 
            classOf[examples.DummyConfigBase]
  )
println(config.projectName)
println(config.servers)



Remaining caveats I still need to work on :
  • Modify compiler messages, to have the right line number printed
  • Bring various DSL techniques that help to get simple configuration
  • Example, example, example, ...

Thursday, September 6, 2012

JASERIES (JAnalyseSeries) 1.3.0 released

JASERIES is a scala API for time numeric series operations. ( ** project page **)

Latest changes :
  • "implicit format:CSVFormat" now correctly used
  • CSVFormat comes with a default implicit format.
  • CSVFormatHints class removed.
  • CSVFormat class and usage refactoring
  • Support for variable columns csv, example with this following csv file :
          date;             A
          20-03-2011 02:00; 2
          20-03-2011 03:00;
          20-03-2011 04:00; 8
          date;             A; B; C
          20-03-2011 05:00; 1; 2; 3
          20-03-2011 06:00; 2;  ; 5
          date;             C; D
          20-03-2011 07:00; 7; 0
      
  • apache commons 1.8.4 dependency added (for bzip2 implementation)
  • gzip, bzip2 compressed csv file now taken into account in fromFile and fromURL methods
        val s0 = CSV2Series.fromURL(
             "http://dnld.crosson.org/modstatus-totalaccesses.csv.bz2")
        val s1 = CSV2Series.fromFile("samples/modstatus-totalaccesses.jxtcsv.gz")
        val s2 = CSV2Series.fromFile("samples/modstatus-totalaccesses.jxtcsv.bz2")
      
  • some TimeModel refactoring + adding a new TimeModel implementation : TimeModelCustom
    But would require some refactoring, performance potential problem I think.
  • package series object added : provides helper functions + align : align some series together on common times they share (using a time normalizer)
  • now supports any order for series computation with numbers : "10 + s" or "s + 10"
    (thanks to an implicit conversion defined in series package objet
  • more test cases
  • series now overrides equals & hashCode (series alias field is not taken into acount - alias is a user friendly series name).
  • series times and values methods return IndexedSeq collection instead of Iterable

Apache total accesses counter to hit rate

The following example demonstrates how to convert a global http hit counter, into a hitrate series:
import fr.janalyse.series._
val csv = CSV2Series.fromURL("http://dnld.crosson.org/modstatus-totalaccesses.csv")
val totalaccesses=csv.values.head

val hitrate=
  totalaccesses
    .delta
    .toSeries[AddCell]
    .sample("10m")
    .toRate()
    .rename("http hit rate")

CSV2Series.toFile(hitrate, "hitrate.csv")

JVM processing time to JVM cpu usage

The following example demonstrates how to convert a java process cpu time counter (mbean : java.lang:type=OperatingSystem__ProcessCpuTime), into a cpu usage metric (here I didn't take into account the number of CPU, so max value = cpu count * 100):
import fr.janalyse.series._
val csv = CSV2Series.fromURL("http://dnld.crosson.org/processcputime.csv")
val cputime=csv.values.head

val cpucells = 
   for(Seq(a,b) <- (cputime / 1000d / 1000d).sliding(2).toIterable ) 
   yield a.time->(b.value-a.value)/(b.time-a.time)*100d

val cpuusage = Series[StatCell]("cpu usage percent", "5m") <<< cpucells

CSV2Series.toFile(cpuusage, "cpuusage.csv")

Monday, August 6, 2012

JASERIES (JAnalyseSeries) 1.2.0 released

JASERIES is a scala API for time numeric series operations. ( ** project page **)

Latest changes :
  • now using sbt 0.12
  • now using sbteclipse 2.1.0
  • now using sbt-assembly 0.8.3
  • now using scalatest 1.8
  • Series factories enhancements
  • showLegend boolean added to chartConfig.
  • Add support for QuoteCell (for stock series processing)
  • Various Code cleanup and enhancements.
  • Add date automatic support for google finance historical data (although the API will be shutdown ~ october 2012)
  • CSV2Series now supports direct parsing of Quote Series ( CSV2Series.quoteFromURL )
  • Automatic supports of CSV with all cells in quotes
  • new tests cases (quotes)
  • duration based takeRight and dropRight method added
  • chart classes refactoring
  • stacked chart type added : StackedChart class (BUT still work in progress as TableXYDataSet implies the same number of cells in each series !!!)
  • Series sample method now preserves cell type !! (was previously returning a StatCell!!)
  • statSample method added to Series to allow sampling using StatCell
  • toSeries method added to Series to allow cell type conversions
  • Series count2Rate method renamed to toRate

Apache total accesses counter to hit rate

The following example demonstrates how to convert a global http hit counter, into a hitrate series:
import fr.janalyse.series._
val csv = CSV2Series.fromURL("http://dnld.crosson.org/modstatus-totalaccesses.csv")
val totalaccesses=csv.values.head

val hitrate=
  totalaccesses
    .delta
    .toSeries[AddCell]
    .sample("10m")
    .toRate()
    .rename("http hit rate")

CSV2Series.toFile(hitrate, "hitrate.csv")

JVM processing time to JVM cpu usage

The following example demonstrates how to convert a java process cpu time counter (mbean : java.lang:type=OperatingSystem__ProcessCpuTime), into a cpu usage metric (here I didn't take into account the number of CPU, so max value = cpu count * 100):
import fr.janalyse.series._
val csv = CSV2Series.fromURL("http://dnld.crosson.org/processcputime.csv")
val cputime=csv.values.head

val cpucells = 
   for(Seq(a,b) <- (cputime / 1000d / 1000d).sliding(2).toIterable ) 
   yield a.time->(b.value-a.value)/(b.time-a.time)*100d

val cpuusage = Series[StatCell]("cpu usage percent", "5m") <<< cpucells

CSV2Series.toFile(cpuusage, "cpuusage.csv")

Wednesday, August 1, 2012

JAJMX (JAnalyseJMX) 0.5.0 released

JAJMX is high level scala JMX abstraction layer. The goal is to simplify to the maximum JMX operations. ( ** project page **)

Latest changes :
  • cleanup, refactoring & general enhancements
  • Support for both generic host/port and jmxservice url approaches (although hidden by default)
  • automatic jmxurl lookups using known formats (external jars may be required, as for example jboss-client.jar)
  • application server type detection (whoami method) : jboss, tomcat, jonas, webmethod, jetty taken into account
  • straightforward threads dumps method added (with automatic support of dumpAllThreads starting from Java 6)
  • JMXOptions can now be built from a JMXServiceURL + credentials
  • Performance enhancements (mbeaninfos cached)
  • README file added
  • JMX.apply now with optional username & password as arguments
  • new methods & fields for RichMBean : "attributeNames():List[String]" and "domain"

jvm force gc script

#!/bin/sh
JAVA_OPTS=""
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.port=9999"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.authenticate=false"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.ssl=false"
exec java $JAVA_OPTS -jar jajmx.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

jajmx.JMX.once("127.0.0.1", 9999) { jmx =>
  jmx.memory map { _ set("Verbose", true)}  // Activate Verbose GC, to see on stdout next call effect 
  jmx.memory map {_ call "gc"}              // Force GC
}
usage example :
$ ./gcforce
Usage   : jmxgrep host port
  no args given so now let's connecting to myself, and force a gc...
[GC 33835K->800K(245952K), 0.0036500 secs]
[Full GC 800K->673K(245952K), 0.0443690 secs]
[GC 7455K->929K(245952K), 0.0007100 secs]
[Full GC 929K->774K(245952K), 0.0448880 secs]

lsthreads script

#!/bin/sh
JAVA_OPTS=""
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.port=9999"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.authenticate=false"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.ssl=false"
exec java $JAVA_OPTS -jar jajmx.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#
import jajmx._

val host = if (args.size>=1) args(0) else "localhost" 
val port = if (args.size>=2) args(1).toInt else 9999

JMX.once(host,port) { jmx =>
  for (dump <- jmx.threadsDump(0)) {
    val threads = dump.threads.toList
    val countByState = threads groupBy {_.status} map { case (s,l) => s -> l.size}
    val countByStateStr = countByState map {case (s,l) => s+":"+l} mkString " "
    println("Total %d  %s".format(threads.size, countByStateStr))
    for ( ti <- threads sortBy {_.id } ) {
      println("%d - %s - %s".format(ti.id, ti.status, ti.name) )
    }
  }
}
usage example :
$ ./lsthreads 
Total 14  TIMED_WAITING:5 WAITING:2 RUNNABLE:7
1 - RUNNABLE - main
2 - WAITING - Reference Handler
3 - WAITING - Finalizer
4 - RUNNABLE - Signal Dispatcher
9 - RUNNABLE - RMI TCP Accept-0
10 - RUNNABLE - RMI TCP Accept-9999
11 - RUNNABLE - RMI TCP Accept-0
13 - RUNNABLE - RMI TCP Connection(1)-127.0.0.1
14 - TIMED_WAITING - RMI Scheduler(0)
15 - TIMED_WAITING - RMI RenewClean-[127.0.0.1:40993]
16 - TIMED_WAITING - GC Daemon
17 - RUNNABLE - RMI TCP Connection(2)-127.0.0.1
20 - TIMED_WAITING - JMX server connection timeout 20
21 - TIMED_WAITING - Thread-4

jmx grep script

#!/bin/sh
JAVA_OPTS=""
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.port=9999"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.authenticate=false"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.ssl=false"
exec java $JAVA_OPTS -jar jajmx.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

import jajmx._

if (args.size == 0) {
  println("Usage   : jmxgrep host port searchMask1 ... searchMaskN")
  println("  no args given so now let's connecting to myself, and list my mbeans...") 
}
val host  = if (args.size>0) args(0) else "localhost"
val port  = if (args.size>1) args(1).toInt else 9999
val masks = if (args.size>2) args.toList.drop(2) map {s=>("(?i)"+s).r} else List.empty[util.matching.Regex]


def truncate(str:String, n:Int=60) = {
  val nonl=str.replaceAll("\n", " ").replaceAll("\r", "")
  if (nonl.size>n) nonl.take(n)+"..." else nonl
}

JMX.once(host, port) { jmx =>
  for(mbean <- jmx.mbeans ; attr <- mbean.attributes; value <- mbean.getString(attr)) {
    val found = List(mbean.name, attr.name, value) exists { item => masks exists {_.findFirstIn(item).isDefined } }
    if (masks.isEmpty || found) println("%s - %s = %s".format(mbean.name, attr.name, truncate(value)))
  }
}
usage example :
$ ./jmxgrep localhost 9999 version
java.lang:type=Runtime - SpecVersion = 1.0
java.lang:type=Runtime - ManagementSpecVersion = 1.2
java.lang:type=Runtime - VmVersion = 20.8-b03
java.lang:type=Runtime - SystemProperties = javax.management.openmbean.TabularDataSupport(tabularType=ja...
java.lang:type=OperatingSystem - Version = 3.2.1-gentoo-r2
JMImplementation:type=MBeanServerDelegate - ImplementationVersion = 1.6.0_33-b03
JMImplementation:type=MBeanServerDelegate - SpecificationVersion = 1.4

Friday, July 13, 2012

JAJMX (JAnalyseJMX) 0.4.0 released

JAJMX is high level scala JMX API. The goal is to simplify to the maximum JMX operations. ( ** project page **)

Latest changes :
  • ConnectException taken into account in RichMBean genericGetter in order to propagates connection failures and allow api users to restarts...
  • CompositeData & TabularData support enhancements
  • API CHANGES !! getOption disappear, get method now returns an Option. previous get method replaced by apply.
  • more tests cases
  • jmx mbeans queries was broken, now the query is build with an object name instead of a string
  • findMBeanServers made public and moved to JMX object
  • checkServiceURL added to JMXObject : checks if a jmx service url works.
  • getInt added to mbean
  • now using sbt 0.11.3
  • now using sbt-assembly 0.8.1
  • now using sbteclipse 2.1.0-RC1
  • now using scalatest 1.8
  • trap exceptions while building attributes map in RichMBean
  • various examples scripts fixes

jvm force gc script

#!/bin/sh
JAVA_OPTS=""
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.port=9999"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.authenticate=false"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.ssl=false"
exec java $JAVA_OPTS -jar jajmx.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

jajmx.JMX.once("127.0.0.1", 9999) { jmx =>
  jmx.memory map { _ set("Verbose", true)}  // Activate Verbose GC, to see on stdout next call effect 
  jmx.memory map {_ call "gc"}              // Force GC
}

jmx grep script

#!/bin/sh
JAVA_OPTS=""
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.port=9999"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.authenticate=false"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.ssl=false"
exec java $JAVA_OPTS -jar jajmx.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

import jajmx._

if (args.size == 0) {
  println("Usage   : jmxgrep host port searchMask1 ... searchMaskN")
  println("  no args given so now let's connecting to myself, and list my mbeans...") 
}
val host  = if (args.size>1) args(0) else "localhost"
val port  = if (args.size>2) args(1).toInt else 9999
val masks = if (args.size>3) args.toList.drop(2) map {s=>("(?i)"+s).r} else List.empty[util.matching.Regex]


def truncate(str:String, n:Int=60) = {
  val nonl=str.replaceAll("\n", " ").replaceAll("\r", "")
  if (nonl.size>n) nonl.take(n)+"..." else nonl
}

JMX.once(host, port) { jmx =>
  for(mbean <- jmx.mbeans ; attr <- mbean.attributes; value <- mbean.getString(attr)) {
    val found = List(mbean.name, attr.name, value) exists { item => masks exists {_.findFirstIn(item).isDefined } }
    if (masks.isEmpty || found) println("%s - %s = %s".format(mbean.name, attr.name, truncate(value)))
  }
}

Thursday, July 12, 2012

Parallel remote command ssh execution script

The following script executes the given commands on specified remote hosts in parallel. Notice that default actors corePoolSize parameter has been set to a higher value than the default one, as cpu usage for this script will remain low, actors will mostly wait for remote server results. (jassh.jar executable jar is available here - JASSH project page)
#!/bin/sh
exec java -jar jassh.jar -deprecation -savecompiled -usejavacp -nocompdaemon "$0" "$@"
!#

import fr.janalyse.ssh._
import util.Properties

if (args.size<2) {
  println("""usage : rexec.scala command [user[:password]@]host[:port] ...""")
  println("""  Of course prefer public key authentication, default behavior if no password is provided """)
  println("""  example : rexec.scala "hostname" 192.168.1.10 toto@192.168.1.11 toto@192.168.1.12:22""")
  System.exit(0)
}

// host | host:port | username@host |username:password@host | username@host:port | ... 
val serverRE="""(?:(\w+)(?:[:](.*))?@)?((?:(?:\d+[.]){3}\d+)|(?:\w+))(?:[:](\d+))?""".r

val cmd2exec=args.head
val servers = args.tail map {
  case serverRE(user, password, host, port) => 
    SSHOptions(
      host     = host,
      username = Option(user).getOrElse(Properties.userName),
      password = SSHPassword(Option(password)),
      port     = Option(port).map(_.toInt).getOrElse(22)
    )
  case notUnderstood => 
    throw new RuntimeException("Couln'd understand remote host description : "+notUnderstood)
}


def rexec(server:SSHOptions, cmd2exec:String):String = 
  SSH.once(server)(_.execute(cmd2exec))
     .split("\n")
     .map("%8s@%-16s: %s".format(server.username, server.host, _))
     .mkString("\n")
 
  
var sys=new scala.sys.SystemProperties()
sys+="actors.corePoolSize"->"25"


import actors.Actor._
val caller=self
for(server <- servers) actor { caller ! rexec(server, cmd2exec) }
for(_ <- servers) receive {case msg => println(msg) }

usage example :

$ ./rexec.scala "uname -r" test@127.0.0.1  test:testtest@192.168.1.200 
    test@127.0.0.1       : 3.2.1-gentoo-r2
    test@192.168.1.200   : 3.2.0-2-486

From what I've read, scala 2.10 will provide better solutions to tune threads pool, will update this post later.

Tuesday, July 10, 2012

Find the common basename (prefix) in a collection of String

Not tail recursive implementation

  def basename(names:Iterable[String]):Option[String] = {
    if (names.exists(_.size==0) || names.size<=1) None
    else {
      val firsts = names.map(_.head)
      if (firsts.toSet.size>1) None
      else {
        Some(firsts.head.toString+basename(names.map(_.tail)).getOrElse(""))
      }
    }
  }

Tail recursive implementation

  @annotation.tailrec
  def basename(names:Iterable[String], accu:Option[String]=None):Option[String] = {
    if (names.exists(_.size==0) || names.size<=1) accu
    else {
      val firsts = names.map(_.head)
      if (firsts.toSet.size>1) accu
      else basename(names.map(_.tail), Some(accu.getOrElse("")+firsts.head))
    }
  }

Usage example

$ scala
Welcome to Scala version 2.9.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_33).
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

  @annotation.tailrec
  def basename(names:Iterable[String], accu:Option[String]=None):Option[String] = {
    if (names.exists(_.size==0) || names.size<=1) accu
    else {
      val firsts = names.map(_.head)
      if (firsts.toSet.size>1) accu
      else basename(names.map(_.tail), Some(accu.getOrElse("")+firsts.head))
    }
  }

// Exiting paste mode, now interpreting.

basename: (names: Iterable[String], accu: Option[String])Option[String]

scala> basename(List("trucmuche", "trucmoche", "trucduq"))
res0: Option[String] = Some(truc)

scala> basename(List("was1", "was2"))
res1: Option[String] = Some(was)

scala> basename(List("was1"))
res2: Option[String] = None

Of course if we annotate with @tailrec the not tail recursive implementation we got the following error :
scala> :paste
// Entering paste mode (ctrl-D to finish)

@annotation.tailrec
def basename(names:Iterable[String]):Option[String] = {
    if (names.exists(_.size==0) || names.size<=1) None
    else {
      val firsts = names.map(_.head)
      if (firsts.toSet.size>1) None
      else {
        Some(firsts.head.toString+basename(names.map(_.tail)).getOrElse(""))
      }
    }
  }

// Exiting paste mode, now interpreting.

:15: error: could not optimize @tailrec annotated method basename: it contains a recursive call not in tail position
               Some(firsts.head.toString+basename(names.map(_.tail)).getOrElse(""))

Thursday, June 28, 2012

SSH intricated tunnels

Let's simulate intricated tunnels using JASSH (high level scala SSH API).

In the following example, we simulate bouncing between 9 SSH hosts, using SSH tunnel intrication :
  • A:22 -> B:22 -> C:22 -> D:22 -> E:22 -> F:22 -> G:22 -> H:22 -> I:22 ; (typical case : a given host is only accessible through the previous one)
  • All "foreign" hosts become directly accessible using new ssh local ports
  • A->10022, B-> 10023, ... I->10030, so now I (and all others) are direcly accessible from local ssh client host !
    // From host/port, bring back locally remote fhost/fport to local host using tport. 
    case class Sub(host:String, port:Int, fhost:String, fport:Int, tport:Int)
    
    val intricatedPath = Iterable(
        Sub("localhost", 22,    "127.0.0.1", 22, 10022), // A 
        Sub("localhost", 10022, "127.0.0.1", 22, 10023), // B
        Sub("localhost", 10023, "127.0.0.1", 22, 10024), // C
        Sub("localhost", 10024, "127.0.0.1", 22, 10025), // D
        Sub("localhost", 10025, "127.0.0.1", 22, 10026), // E
        Sub("localhost", 10026, "127.0.0.1", 22, 10027), // F
        Sub("localhost", 10027, "127.0.0.1", 22, 10028), // G
        Sub("localhost", 10028, "127.0.0.1", 22, 10029), // H
        Sub("localhost", 10029, "127.0.0.1", 22, 10030)  // I
        )
        
    def intricate[T](path:Iterable[Sub], curSSHPort:Int=22)(proc:(SSH)=>T):T = {
      path.headOption match {
        case Some(curSub) =>
          SSH.once(curSub.host, "test", port=curSub.port) { ssh =>
            ssh.remote2Local(curSub.tport, curSub.fhost, curSub.fport)
            intricate(path.tail, curSub.tport)(proc)
          }
        case None => 
          SSH.once("localhost", "test", port=curSSHPort) { ssh =>
            proc(ssh)
          }
      }
    }
    
    // Build the intricated tunnels and execute a ssh command on the farthest host (I)
    val result = intricate(intricatedPath) {ssh =>
      ssh.executeAndTrim("echo 'Hello intricated world'")
    }

    println(result)
So now let's automate complex SSH tunnels construction, with automatic rebuild on failure and direct integration into tools, when some direct access are not possible...

Wednesday, June 27, 2012

JASSH (JAnalyseSSH) 0.9.3 released

JASSH is a high level scala SSH API for easy and fast operations on remote servers. It is JSCH based.

Latest changes for 0.9.3

  • now using sbt-assembly 0.8.3
  • fixes relatives to implicit conversions with SSHPassword
  • fixes relatives to implicit conversion to SSHCommand and SSHBatch
  • For SSHBatch : execute, executeAndTrim, executeAndTrimSplit
    renamed to : executeAll, executeAllAndTrim, executeAllAndTrimSplit
  • Using Iterable instead of List
  • external (package) usage tests completed (ExternalSSHAPITest.scala)
  • small fix about how private key passphrase is taken into account (when pub-key auth is used)

Latest changes for 0.9.2

  • date '+%Y-%m-%d %H:%M:%S %z' %z and %Z gives the same result on AIX, this result corresponds to linux %Z. So modifying code to use %Z instead of %z. Now using GMT, "date -u '+%Y-%m-%d %H:%M:%S %Z'" in order to everything work well in all cases.
  • SSH.once(Option[SSHOptions]) fix linked to Option type result not at the right place
  • New test source file : ExternalSSHAPITest.scala => Testing the API from an external package
  • Fixed : minor problem with script when invoking jajmx.SSH... or fr.janalyse.sh.SSH... without imports...

Latest changes for 0.9.1

  • SSH tunneling fix, cleanup, and scaladocumented
  • Intricated SSH tunneling test added (self intrication, to simplify test case)

Latest changes for 0.9.0:

  • now using sbt-assembly 0.8.1
  • now using scalatest 0.8
  • new helper methods (For the list of already available helper methods):
    • test
    • exists
    • isFile
    • isDirectory
    • isExecutable
  • findAfterDate & date helper fix !!
    Shell.date -> remote system time zone is now taken into account
  • Test cases fixes :
    Forcing parallelism to 6 ! for test case "Simultaenous SSH operations"
  • Code factorization :
    • ShellOperations trait added. Inherited by SSH and SSHShell.
    • TransferOperations trait added. Inherited by SSH and SSHFtp.
  • SCP supported, for no-persistent transferts sessions, SCP is now used by default (instead of SFTP)
    (e.g. : SSH class transfert operation is now using SCP by default).
  • noneCipher switch added to SSHOptions for higher performance SCP transfert (true by default)
    (http://www.psc.edu/index.php/hpn-ssh)
  • transfert (receive) tests added
    Reference time on a local system: 500Mb using 5 SCP command (100Mb/cmd) takes on the same system 8.7s (~62Mo/s by file)
    file transfert performances (with content loaded in memory)
    • Bytes rate : 38,6Mb/s 500Mb in 12,9s for 5 files - byterates using SCP
    • Bytes rate : 44,9Mb/s 500Mb in 11,1s for 5 files - byterates using SCP (with none cipher)
    • Bytes rate : 38,5Mb/s 500Mb in 13,0s for 5 files - byterates using SFTP
    • Bytes rate : 46,0Mb/s 500Mb in 10,9s for 5 files - byterates using SFTP (with none cipher)
    • Bytes rate : 39,5Mb/s 500Mb in 12,7s for 5 files - byterates using SFTP (session reused
    • Bytes rate : 46,7Mb/s 500Mb in 10,7s for 5 files - byterates using SFTP (session reused, with none cipher)
    • Bytes rate : 29,5Mb/s 500Mb in 16,9s for 500 files - byterates using SCP
    • Bytes rate : 32,1Mb/s 500Mb in 15,6s for 500 files - byterates using SCP (with none cipher)
    • Bytes rate : 26,7Mb/s 500Mb in 18,7s for 500 files - byterates using SFTP
    • Bytes rate : 29,5Mb/s 500Mb in 16,9s for 500 files - byterates using SFTP (with none cipher)
    • Bytes rate : 37,7Mb/s 500Mb in 13,3s for 500 files - byterates using SFTP (session reused)
    • Bytes rate : 43,7Mb/s 500Mb in 11,4s for 500 files - byterates using SFTP (session reused, with none cipher)
  • Code cleanup & Scaladocumenting
  • SSH compression now supported
  • For easier SSH Tunneling, new methods are now available :
    • def remote2Local(rport:Int, lhost:String, lport:Int)
    • def local2Remote(lport:Int, rhost:String, rport:Int)
  • SSHCommand, SSHBatch methods ! renamed to §§

CAVEATS :

  • ssh persisted shell session operations must be executed within the same thread, do not span a persisted shell session across several threads => it may generate exception
    So be careful when using REPL with default config, as each "evaluation" is done within a new thread !
    Workaround : Start the interpreter (REPL) with the "-Yrepl-sync" option.
    No problem with SBT as a scala console started from SBT will execute all its entries in the same thread !
    No problem in scala scripts.
  • SCP operations can't retrieve special file such as /proc/cpuinfo, because their size are not known !
    Workarounds : use SFTP OR use a command such as "cat /proc/cpuinfo". (The last one is the "best workaround", will work in all cases)

hello scala script

#!/bin/sh
exec java -jar jassh.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

print(jassh.SSH.shell("localhost", "test", "testtest") {_ execute "echo Hello `hostname`" } )
  
  

remote vmstat scala script

(The following script assume that the current user has already automatic access to the given remote host using SSH public key authentication.)
#!/bin/sh
exec java -jar jassh.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#
val host=if (args.size>0) args(0) else "localhost"
val user=if (args.size>1) args(1) else util.Properties.userName
val freq=if (args.size>2) args(2) else ""
val numb=if (args.size>3) args(3) else ""
val vmstatcmd="vmstat %s %s".format(freq, numb)

jassh.SSH.once(host, user)  {
  _.run(vmstatcmd, _.foreach(println _)).waitForEnd
}

smallest script : print remote system name (uname)

#!/bin/sh
exec java -jar jassh.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

println(jassh.SSH.shell("localhost", "test", "testtest") { _.uname})
 

Monday, May 28, 2012

JASSH (JAnalyseSSH) 0.8.0 released

JASSH is a high level scala SSH API for easy and fast operations on remote servers. ( ** project page **)

Latest changes :
  • now using sbt 0.11.3
  • now using sbt-assembly 0.8.1
  • now using sbteclipse 2.1.0-RC1
  • Set of new method to help with commons remote commands :
    fileSize, md5sum, sha1sum, uname, ls, pwd, cd(*), hostname, date, findAfterDate (*) of course only for shell sessions
  • JSCH updated to 0.1.48
  • md5sum method added to SSHTools object
  • manage well connect timeout (default = 30s) and general socket timeout (default = 5mn)

hello scala script

#!/bin/sh
exec java -jar jassh.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

print(jassh.SSH.shell("localhost", "test", "testtest") {_ execute "echo Hello `hostname`" } )
  
  

remote vmstat scala script

#!/bin/sh
exec java -jar jassh.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

jassh.SSH.once("localhost", "test", "testtest")  {
  _.run("vmstat 1 10", println(_.getOrElse("")).waitForEnd
}

smallest script : print remote system name (uname)

#!/bin/sh
exec java -jar jassh.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

println(jassh.SSH.shell("localhost", "test", "testtest") { _.uname})
 

Tuesday, May 8, 2012

SCALA TIPS : remove unnecessary tests when working with collections...

Thanks to scala Option type and scala returning the right type behavior, many various kinds of checks can be removed. Example :

val m = Map("A" -> 1, "B" ->3)

var l1 = List.empty[Int]
var l2 = List.empty[Int]

// Don't write such thing :
if (m.contains("A")) l1 ::= m.get("A").get
if (m.contains("Z")) l1 ::= m.get("Z").get
println(l1)

// Instead just write something like :
l2 ++:= m.get("A")
l2 ++:= m.get("Z")
println(l2)

Because concatenating List[Int] with Option[Int] will get you a List[Int], and if the Option[Int] type value is None, nothing will be added to the list :)

Tuesday, May 1, 2012

Variance and bounds example

class A
class B extends A
class C extends B
class D extends C

class E

class X[+T] {                        // T covariant for input types
  def add[U >: T](x: U): X[U] = this // U contravariant for return type
}

class Y[+T <: A] {                   // This time with an upper bound A
  def add[U >: T <: A](y: U): Y[U] = this
}

val a = new A
val b = new B
val c = new C
val d = new D

val x = new X[C]
val x2 = x.add(b)    // x2 type becomes X[B]
val x3 = x.add(d)    // x3 remains of type X[C]
val x4 = x.add(10)   // x4 type becomes X[Any]
val x6 = x.add("Z")  // x5 type becomes X[java.lang.Object]

val y = new Y[C]
val y2 = y.add(b)    // y2 type becomes X[B]
val y3 = y.add(d)    // y3 remains of type X[C]
//val y4 = y.add(2)  // Won't compile as Y has an upper bound, it must inherits from A


class Z {
  def add[T<%E](e:T) = this  // View Bound, it exists an implicit conversion to E
}
val z  = new Z
//val z1 = z.add(a)  // Won't compile as there no implicit conversion from A to E
implicit def a2E(a:A) = new E
val z2 = z.add(a)

Thursday, April 26, 2012

sbt scala drools expert system skeleton project

Just released a simple scala drools project skeleton. The Knowledge base is used to evaluate drools-mvel / scala interaction.
The Knowledge Base :
package dummy;

dialect "mvel"

import java.util.LinkedList;

// --------------------------------------------------------------

rule "who is young"
  when
    Someone($name:name, age < 30)
  then
    System.out.println($name+" is very young")
end

// --------------------------------------------------------------

rule "people cars"
  when
    $someone:Someone($name:name)
    $car : Car(someone == $someone)
  then
    System.out.println("One of "+$name+"'s car is "+$car.color.name+" "+$car.model)
end

// --------------------------------------------------------------

rule "who own at least one red car"
  when
    $someone:Someone($name:name)
    exists Car(someone == $someone, color == Color.red)
  then
    System.out.println($name+" has at least one red car")
end

// --------------------------------------------------------------

rule "who doesn't have a home"
  when
    $someone:Someone($name:name)
    not Home(someone == $someone)
  then
    insert(new InformationRequest($someone, $name+"'s home is unknown"))
end

// --------------------------------------------------------------

rule "who doesn't have an address"
  when
    $someone:Someone($name:name)
    Home(someone == $someone, !address.isDefined)
  then
    insert(new InformationRequest($someone, $name+" address is unknown"))
end

// --------------------------------------------------------------

rule "process information requests"
  when
    InformationRequest($who:someone, $what:message)
  then
    System.out.println("INFORMATION REQUESTED FOR "+$who+" : "+$what)
end

// --------------------------------------------------------------

rule "who owns more than 1 car"
  when
    $someone : Someone($name:name)
    $cars    : LinkedList( size>1 ) from collect(Car(someone == $someone))
  then
    System.out.println($name+" has "+$cars.size+" cars")  
end

// --------------------------------------------------------------

And the startup code :
package dummy

// ----------------------------------------------------------------
// DOMAIN MODEL

case class Someone(name:String, age:Int)

case class Car(someone:Someone, model:String, year:Int, color:Color)

case class Color(name:String)

object Color {
  val red = Color("red")
  val blue = Color("blue")
  val green = Color("green")
  val black = Color("black")
}

case class Address(street:String, town:String, country:String)

case class Home(someone:Someone, address:Option[Address]) 

case class InformationRequest(someone:Someone, message:String)




// ----------------------------------------------------------------


import java.io.FileInputStream
import java.io.InputStreamReader
import org.drools.RuleBaseFactory
import org.drools.audit.WorkingMemoryFileLogger
import org.drools.compiler.PackageBuilder


object Dummy {

  def main(args: Array[String]) {
    System.setProperty("drools.dialect.mvel.strict", "false")
    
    val rulesfilename = "src/main/resources/KBExpertise.drl"
    val source = new InputStreamReader(new FileInputStream(rulesfilename))
    
    val builder = new PackageBuilder()
    builder.addPackageFromDrl(source)
    if (builder.hasErrors()) {
      System.out.println(builder.getErrors().toString())
      throw new RuntimeException("Unable to compile " + rulesfilename + ".")
    }

    val pkg = builder.getPackage()
    val ruleBase = RuleBaseFactory.newRuleBase()
    ruleBase.addPackage(pkg)

    val session = ruleBase.newStatefulSession()
    //session.addEventListener(new org.drools.event.DebugAgendaEventListener())
    //session.addEventListener(new org.drools.event.DebugWorkingMemoryEventListener())
    
    // setup the audit logging
    val logger:WorkingMemoryFileLogger = new WorkingMemoryFileLogger(session)
    logger.setFileName("drools")
    logger.writeToDisk()

    val martine = Someone(name="Martine", age=30)
    val martin  = Someone(name="Martin", age=40)
    val jack    = Someone(name="Jack", age=12)
    val martineCar = Car(martine, "Ford", 2010, Color.blue)
    val martinCar  = Car(martin, "GM", 2010, Color.black)
    val martinCar2 = Car(martin, "Ferrari", 2012, Color.red)
    val martinCar3 = Car(martin, "Porshe", 2011, Color.red)
    
    val martinHome = Home(martin, None)
    val jackHome   = Home(jack, Some(Address("221B Baker Street", "London", "England")))
    
    List(martine, martin, jack, 
        martineCar, martinCar, martinCar2, martinCar3,
        martinHome, jackHome
    ).foreach(session.insert(_)) 

    session.fireAllRules()

    session.dispose()
  }
}
To finish, the build.sbt file, to build and run the project :
import AssemblyKeys._

seq(assemblySettings: _*)

name := "ScalaDroolsDummyProject"

version := "0.0.1"

scalaVersion := "2.9.2"

mainClass in assembly := Some("dummy.Dummy")

jarName in assembly := "dummy.jar"



libraryDependencies += "org.drools" % "drools-compiler" % "5.3.1.Final"

libraryDependencies += "org.drools" % "drools-core" % "5.3.1.Final"

libraryDependencies += "org.drools" % "drools-jsr94"  % "5.3.1.Final"

libraryDependencies += "org.drools" % "drools-decisiontables"  % "5.3.1.Final"

libraryDependencies += "org.drools" % "knowledge-api"  % "5.3.1.Final"



libraryDependencies += "com.thoughtworks.xstream" % "xstream" % "1.4.2"
            


libraryDependencies += "org.scalatest" %% "scalatest" % "1.7.2" % "test"

libraryDependencies += "junit" % "junit" % "4.10" % "test"

resolvers += "JBoss third party releases repository" at "https://repository.jboss.org/nexus/content/repositories/thirdparty-releases"

Test it by your self :
$ sbt run
INFORMATION REQUESTED FOR Someone(Martin,40) : Martin address is unknown
One of Martin's car is red Porshe
Martin has 3 cars
Martin has at least one red car
One of Martin's car is red Ferrari
One of Martin's car is black GM
One of Martine's car is blue Ford
Jack is very young
INFORMATION REQUESTED FOR Someone(Martine,30) : Martine's home is unknown
[success] Total time: 6 s, completed 27 avr. 2012 22:35:37
And for those who don't know about SBT or GIT :
  • Download SBT jar file
    • You'll get a java jar executable named sbt-launch.jar
  • Download the scala drools skeleton zip archive
    • You'll get a file which will look like : "dacr-scala-drools-dummy-project-6b43143.zip"
    • unzip the content and rename the unzipped directory to scala-drools-dummy-project
test@localhost /tmp/test $ unzip dacr-scala-drools-dummy-project-6b43143.zip 
Archive:  dacr-scala-drools-dummy-project-6b43143.zip
6b43143f369bda03361ca2c5f2376d51ba7d5524
   creating: dacr-scala-drools-dummy-project-6b43143/
  inflating: dacr-scala-drools-dummy-project-6b43143/.gitignore  
  inflating: dacr-scala-drools-dummy-project-6b43143/README  
 extracting: dacr-scala-drools-dummy-project-6b43143/RELEASE-NOTES  
  inflating: dacr-scala-drools-dummy-project-6b43143/build.sbt  
  inflating: dacr-scala-drools-dummy-project-6b43143/cleanup.sh  
   creating: dacr-scala-drools-dummy-project-6b43143/project/
  inflating: dacr-scala-drools-dummy-project-6b43143/project/plugins.sbt  
   creating: dacr-scala-drools-dummy-project-6b43143/src/
   creating: dacr-scala-drools-dummy-project-6b43143/src/main/
   creating: dacr-scala-drools-dummy-project-6b43143/src/main/resources/
  inflating: dacr-scala-drools-dummy-project-6b43143/src/main/resources/KBExpertise.drl  
   creating: dacr-scala-drools-dummy-project-6b43143/src/main/scala/
   creating: dacr-scala-drools-dummy-project-6b43143/src/main/scala/dummy/
  inflating: dacr-scala-drools-dummy-project-6b43143/src/main/scala/dummy/Dummy.scala  
   creating: dacr-scala-drools-dummy-project-6b43143/src/test/
   creating: dacr-scala-drools-dummy-project-6b43143/src/test/scala/
   creating: dacr-scala-drools-dummy-project-6b43143/src/test/scala/dummy/
  inflating: dacr-scala-drools-dummy-project-6b43143/src/test/scala/dummy/DummyTest.scala  
test@localhost /tmp/test $ mv dacr-scala-drools-dummy-project-6b43143 scala-drools-dummy-project
test@localhost /tmp/test $ cd scala-drools-dummy-project
test@localhost /tmp/test/scala-drools-dummy-project $ java -jar ~/Downloads/sbt-launch.jar run
[info] Loading project definition from /tmp/test/scala-drools-dummy-project/project
[info] Set current project to ScalaDroolsDummyProject (in build file:/tmp/test/scala-drools-dummy-project/)
[info] Updating {file:/tmp/test/scala-drools-dummy-project/}default-d556eb...
[info] Resolving org.scala-lang#scala-library;2.9.2 ...
[info] Resolving org.drools#drools-compiler;5.3.1.Final ...
...
[info] Compiling 1 Scala source to /tmp/test/scala-drools-dummy-project/target/scala-2.9.2/classes...
[info] Running dummy.Dummy 
INFORMATION REQUESTED FOR Someone(Martin,40) : Martin address is unknown
One of Martin's car is red Porshe
Martin has 3 cars
Martin has at least one red car
One of Martin's car is red Ferrari
One of Martin's car is black GM
One of Martine's car is blue Ford
Jack is very young
INFORMATION REQUESTED FOR Someone(Martine,30) : Martine's home is unknown
[success] Total time: 8 s, completed 30 avr. 2012 12:59:17

Friday, April 20, 2012

JAJMX scala script to list remote JVM threads using JMX...

A new example script using JAJMX API. This script, named lsthreads, connects to a remote JVM using just the host adress and the used JMX port, and then list all active threads and theirs current states. Some java options (JAVA_OPTS) have been added to allow using this script against itself.
#!/bin/sh
JAVA_OPTS=""
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.port=9999"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.authenticate=false"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.ssl=false"
exec java $JAVA_OPTS -jar jajmx.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#
import jajmx._
import javax.management.openmbean.CompositeData

val host = if (args.size>=1) args(0) else "localhost" 
val port = if (args.size>=2) args(1).toInt else 9999

case class ThreadLock(name:String, ownerId:Option[Long], ownerName:Option[String])

case class ThreadInfo(name:String, id:Long, state:String,lock:Option[ThreadLock],
                      blockedCount:Long, waitedCount:Long)

object ThreadInfo {
  def apply(ti:CompositeData) :ThreadInfo = {
   val id           = ti.get("threadId").toString.toLong
   val name         = ti.get("threadName").toString
   val state        = ti.get("threadState").toString
   val blockedCount = ti.get("blockedCount").toString.toLong
   val waitedCount  = ti.get("waitedCount").toString.toLong
   val lock = Option(ti.get("lockName")) map {_.toString} filter {_.size>0} collect {
                case lockName =>
                    val lockOwnerId = Option(ti.get("lockOwnerId")) map {_.asInstanceOf[Long]} filterNot {_ == -1}
                    val lockOwnerName = Option(ti.get("lockOwnerName")) map {_.toString.trim}
                    ThreadLock(lockName, lockOwnerId, lockOwnerName)
              }
   ThreadInfo(name, id, state, lock, blockedCount, waitedCount)
  }
}

JMX.once(host,port) { jmx =>
  for (threading <- jmx.threading) {
    val ids = threading.get[Array[Long]]("AllThreadIds")
    val rawThreadsInfos = threading.call[Array[CompositeData]]("getThreadInfo", ids).get
    val threadsInfos = rawThreadsInfos map {ThreadInfo(_)} toList
    val countByState = threadsInfos groupBy {_.state} map { case (s,l) => s -> l.size}
    val countByStateStr = countByState map {case (s,l) => s+":"+l} mkString " "
    println("Total %d  %s".format(threadsInfos.size, countByStateStr))
    for ( ti <- threadsInfos sortBy {_.id } ) {
      println("%d - %s - %s".format(ti.id, ti.state, ti.name) )
    }
  }
}

Usage examples, with itself and then with a remote apache tomcat:
$ lsthreads
Total 13  TIMED_WAITING:4 WAITING:2 RUNNABLE:7
1 - RUNNABLE - main
2 - WAITING - Reference Handler
3 - WAITING - Finalizer
4 - RUNNABLE - Signal Dispatcher
9 - RUNNABLE - RMI TCP Accept-0
10 - RUNNABLE - RMI TCP Accept-9999
11 - RUNNABLE - RMI TCP Accept-0
12 - RUNNABLE - RMI TCP Connection(1)-127.0.0.1
13 - TIMED_WAITING - RMI Scheduler(0)
14 - TIMED_WAITING - RMI RenewClean-[127.0.0.1:45376]
15 - TIMED_WAITING - GC Daemon
16 - RUNNABLE - RMI TCP Connection(2)-127.0.0.1
17 - TIMED_WAITING - JMX server connection timeout 17

$ lsthreads 192.168.1.10 1099
Total 17  TIMED_WAITING:6 WAITING:2 RUNNABLE:9
1 - RUNNABLE - main
2 - WAITING - Reference Handler
3 - WAITING - Finalizer
4 - RUNNABLE - Signal Dispatcher
10 - RUNNABLE - RMI TCP Accept-0
11 - RUNNABLE - RMI TCP Accept-1099
12 - RUNNABLE - RMI TCP Accept-0
13 - TIMED_WAITING - GC Daemon
16 - TIMED_WAITING - ContainerBackgroundProcessor[StandardEngine[Catalina]]
17 - RUNNABLE - http-bio-8080-Acceptor-0
18 - TIMED_WAITING - http-bio-8080-AsyncTimeout
19 - RUNNABLE - ajp-bio-8009-Acceptor-0
20 - TIMED_WAITING - ajp-bio-8009-AsyncTimeout
22 - RUNNABLE - RMI TCP Connection(5)-127.0.0.1
23 - TIMED_WAITING - RMI Scheduler(0)
24 - RUNNABLE - RMI TCP Connection(6)-127.0.0.1
27 - TIMED_WAITING - JMX server connection timeout 27

Wednesday, April 18, 2012

First nasdaq stock query using scala and JASeries

The following script is a first attempt to make a stock query over nasdaq using JASeries (scala time series API).
#!/bin/sh
exec java -jar jaseries.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

import io.Source
import fr.janalyse.series._

// Get nasdaq codes
val codes = 
  Source.fromURL("http://www.nasdaqtrader.com/dynamic/SymDir/nasdaqlisted.txt")
    .getLines
    .map(_.split("[|]",2).head)
    .toList.tail.init //.take(10)

// Get nasdaq stocks history
val stocks =
  codes.flatMap(code =>
    try {
     Some(code -> CSV2Series.fromURL("http://ichart.finance.yahoo.com/table.csv?s="+code))
    } catch {
      case x:java.io.FileNotFoundException => 
        println("Could'nt find %s stock data".format(code))
        None
    }
  )

// get close series
val closes = stocks flatMap { case (code, allSeries) => 
  allSeries.get("Close").map(_.rename(code+" Close"))
}

val top10incr =
  closes
    .sortBy( _.stat.linearApproximation.slope)
    .take(20)
    
// print highest slope top20, but remember that all history data is taken into
// account, so the last value may not be quiter higher than the first one.
for(stock <- top10incr) {
  println("%s : %f -> %f".format(stock.name, stock.head.value, stock.last.value))
}

This script is the starting point to next JASeries improvements and new features.

Saturday, April 14, 2012

JAJMX scala script to list all available JVM numerical JMX values

Using JAJMX it becomes very easy to list all available numerical values of a given JVM JMX plateform :
#!/bin/sh
JAVA_OPTS=""
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.port=9999"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.authenticate=false"
JAVA_OPTS=$JAVA_OPTS" -Dcom.sun.management.jmxremote.ssl=false"
exec java $JAVA_OPTS -jar jajmx.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

import fr.janalyse.jmx._

val host  = args.headOption.getOrElse("localhost")
val port  = if (args.size>1) args.tail.head.toInt else 9999

JMX.once(host, port) { jmx =>
  for(mbean <- jmx.mbeans ;
      attr  <- mbean.attributes collect {case n:RichNumberAttribute => n};
      value <- mbean.getLong(attr)) {
    println("%s - %s = %d".format(mbean.name, attr.name, value))
  }
}
Some jvm parameters (JAVA_OPTS) have been added in order to make possible to test this script against itself. So without any arguments it will give you such results :
$ ./lsnum
java.lang:type=Memory - ObjectPendingFinalizationCount = 0
java.lang:type=MemoryPool,name=PS Eden Space - CollectionUsageThreshold = 0
java.lang:type=MemoryPool,name=PS Eden Space - CollectionUsageThresholdCount = 0
java.lang:type=MemoryPool,name=PS Survivor Space - CollectionUsageThreshold = 0
java.lang:type=MemoryPool,name=PS Survivor Space - CollectionUsageThresholdCount = 0
java.lang:type=MemoryPool,name=Code Cache - UsageThreshold = 0
java.lang:type=MemoryPool,name=Code Cache - UsageThresholdCount = 0
java.lang:type=GarbageCollector,name=PS MarkSweep - CollectionCount = 1
java.lang:type=GarbageCollector,name=PS MarkSweep - CollectionTime = 90
java.lang:type=Runtime - StartTime = 1334354060679
java.lang:type=Runtime - Uptime = 5497
java.lang:type=ClassLoading - LoadedClassCount = 4806
java.lang:type=ClassLoading - UnloadedClassCount = 0
java.lang:type=ClassLoading - TotalLoadedClassCount = 4806
java.lang:type=Threading - DaemonThreadCount = 12
java.lang:type=Threading - PeakThreadCount = 13
java.lang:type=Threading - CurrentThreadCpuTime = 40000000
java.lang:type=Threading - CurrentThreadUserTime = 40000000
java.lang:type=Threading - ThreadCount = 13
java.lang:type=Threading - TotalStartedThreadCount = 13
java.lang:type=Compilation - TotalCompilationTime = 7144
java.lang:type=MemoryPool,name=PS Perm Gen - CollectionUsageThreshold = 0
java.lang:type=MemoryPool,name=PS Perm Gen - CollectionUsageThresholdCount = 0
java.lang:type=MemoryPool,name=PS Perm Gen - UsageThreshold = 0
java.lang:type=MemoryPool,name=PS Perm Gen - UsageThresholdCount = 0
java.lang:type=GarbageCollector,name=PS Scavenge - CollectionCount = 3
java.lang:type=GarbageCollector,name=PS Scavenge - CollectionTime = 57
java.lang:type=OperatingSystem - MaxFileDescriptorCount = 4096
java.lang:type=OperatingSystem - OpenFileDescriptorCount = 19
java.lang:type=OperatingSystem - CommittedVirtualMemorySize = 4598407168
java.lang:type=OperatingSystem - FreePhysicalMemorySize = 10819850240
java.lang:type=OperatingSystem - FreeSwapSpaceSize = 10742177792
java.lang:type=OperatingSystem - ProcessCpuTime = 13030000000
java.lang:type=OperatingSystem - TotalPhysicalMemorySize = 16816541696
java.lang:type=OperatingSystem - TotalSwapSpaceSize = 10742177792
java.lang:type=OperatingSystem - AvailableProcessors = 6
java.lang:type=OperatingSystem - SystemLoadAverage = 0
java.lang:type=MemoryPool,name=PS Old Gen - CollectionUsageThreshold = 0
java.lang:type=MemoryPool,name=PS Old Gen - CollectionUsageThresholdCount = 0
java.lang:type=MemoryPool,name=PS Old Gen - UsageThreshold = 0
java.lang:type=MemoryPool,name=PS Old Gen - UsageThresholdCount = 0

Friday, April 13, 2012

JASeries (JAnalyseSeries) 1.1.0 released

JASeries is a time numeric series operations API. The goal is to make simple, time series summaries generation, using sampling and various kind of cells merging. ( ** project page **)

Latest changes :

  • Statistics : alive added = OpenCell.time - CloseCell.time = series alive period of time
  • realias() without any argument means take the name as the alias.
  • added braces to delta, compact, zeroBased
  • Chart seriesScales method renamed to scales
  • Chart new method : colors which returns a map of [Series[Cell], Color]
  • Added a package object jaseries to define shortcuts to fr.janalyse.series.CSV2Series, fr.janalyse.series.view.Chart classes and objects

Google stock summary catchup scala script

#!/bin/sh
exec java -jar jaseries.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

val allSeries = jaseries.CSV2Series.fromURL("http://ichart.finance.yahoo.com/table.csv?s=GOOG")
val closeSeries = allSeries("Close")

println("GOOGLE stock summary")
println("Higher : "+closeSeries.max)
println("Lowest : "+closeSeries.min)
println("Week Trend : "+closeSeries.stat.linearApproximation.daySlope*7)
println("Latest : "+closeSeries.last)

Apple trend chart generation scala script

#!/bin/sh
exec java -jar jaseries.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

val allSeries = jaseries.CSV2Series.fromURL("http://ichart.finance.yahoo.com/table.csv?s=AAPL")
val closeSeries = allSeries("Close").realias("Apple stock value")    
jaseries.Chart(closeSeries).toFile("AppleStockTrend.jpg")

JAJMX (JAnalyseJMX) 0.3.0 released

JAJMX is high level scala JMX API. The goal is to simplify to the maximum JMX operations. ( ** project page **)

Latest changes :
  • JMXOptions now contains an extra field "name" which allow user to friendly identify a remote jmx system
  • added a package object jajmx to define shortcuts to fr.janalyse.jmx.JMX class and object
  • JMX.connect renamed to JMX.once
  • scaladoc generation now works fine, follow hack described here : https://github.com/harrah/xsbt/issues/85

jvm force gc script

#!/bin/sh
exec java -jar jajmx.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

jajmx.JMX.once("127.0.0.1", 1099) { _.memory map {_ call "gc"} }
 

jmx grep script

#!/bin/sh
exec java -jar jajmx.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

import jajmx._

if (args.size < 2) {
  println("Usage   : jmxgrep host port searchMask1 ... searchMaskN")
  System.exit(1)
}
val host  = args(0)
val port  = args(1).toInt
val masks = args.toList.drop(2) map {s=>("(?i)"+s).r}

def truncate(str:String, n:Int=60) = if (str.size>n) str.take(n)+"..." else str

JMX.once(host, port) { jmx =>
  for(mbean <- jmx.mbeans ; attr <- mbean.attributes; value <- mbean.getString(attr)) {
    val found = List(mbean.name, attr.name, value) exists { 
         item => masks exists {_.findFirstIn(item).isDefined }
    }
    if (found) println("%s - %s = %s".format(mbean.name, attr.name, truncate(value)))
  }
}


Thursday, April 12, 2012

JASSH (JAnalyseSSH) 0.7.3 released

JASSH is a high level scala SSH API for easy and fast operations on remote servers. ( ** project page **)

Latest changes :
  • JSCH updated to release 0.1.47
  • SSHOptions now contains an extra field "name" which allow user to friendly identify a remote ssh system
  • SSHOptions password type is now of SSHPassword type instead of String. Implicit conversions are provided from String, Option[String]
  • SSHShell batch method renamed to execute

hello scala script

#!/bin/sh
exec java -jar jassh.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

print(jassh.SSH.shell("localhost", "test", "testtest") {_ execute "echo Hello `hostname`" } )
  
  

remote vmstat scala script

#!/bin/sh
exec java -jar jassh.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

jassh.SSH.once("localhost", "test", "testtest")  {
  _.run("vmstat 1 10", println(_.getOrElse("")).waitForEnd
}

 

Friday, April 6, 2012

JASSH (JAnalyseSSH) 0.7.2 released

JASSH is a high level scala SSH API for easy and fast operations on remote servers. ( ** project page **)

Latest changes :
  • added a package object jassh to define shortcuts to fr.janalyse.ssh.SSH class and object
  • SSHOptions, host parameter is now in first position !

These small changes enable even simpler scripts :

hello scala script

#!/bin/sh
exec java -jar jassh.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

println(jassh.SSH.shell("localhost", "test", Some("testtest")) { _ execute "echo -n Hello `hostname`"})

remote vmstat scala script

#!/bin/sh
exec java -jar jassh.jar -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#

jassh.SSH.once("localhost", "test", password=Some("testtest"))  {
  _.run("vmstat 1 10", println(_.getOrElse("")).waitForEnd
}

Sunday, April 1, 2012

Leave it crash, akka will restart it for you !!!

Akka is an awesome tool, even using default configuration, you do not have anymore to worry about failure management and recovery details. I've been bluffed to discover that my new monitoring system prototype (akka/scala based) was able to recovery by itself from errors, while I didn't yet implement such features !!

So In order to illustrate this feature, I've written a simple example that illustrates this behavior. The RobustActor actor keeps a ssh connection to a remote system (using janalyse-ssh scala ssh api), and execute/print every 5s the "date" command :
package dummy

import akka.actor.ActorSystem
import akka.actor.Actor
import akka.util.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Props

import fr.janalyse.ssh._

object DummySSH {
 def main(args:Array[String]) {
      val system=ActorSystem("DummySSHSystem",ConfigFactory.load.getConfig("dummySSH"))
      system.actorOf(
        Props(new RobustActor(system)),
        name="RobustActor")
 }
}


class RobustActor(system:ActorSystem) extends Actor {
  
  val sh = SSH(host="localhost", username="test", password=Some("testtest")).newShell
  
  override def preStart() {
    system.scheduler.schedule(1 seconds, 5 seconds, self, "doit")
  }
  
  override def postStop() {
    sh.close()
  }
    
  def receive = {
    case "doit" => print(sh execute "date")
  }
}
If we start this example code, we wait a little, we identify the sshd process that manages the established ssh connection and then we kill the sshd process, akka will intercept RobustActor internal exception and restart this actor for us !
$ sbt run
[info] Loading project definition from /home/dcr/dev-new/akka-sandbox/project
[info] Set current project to AkkaSandbox (in build file:/home/dcr/dev-new/akka-sandbox/)
[info] Compiling 1 Scala source to /home/dcr/dev-new/akka-sandbox/target/scala-2.9.1/classes...

Multiple main classes detected, select one to run:

 [1] dummy.Dummy
 [2] dummy.DummySSH

Enter number: 2

[info] Running dummy.DummySSH 
dim. avril  1 22:15:32 CEST 2012
dim. avril  1 22:15:37 CEST 2012
dim. avril  1 22:15:42 CEST 2012
dim. avril  1 22:15:47 CEST 2012
[ERROR] [04/01/2012 22:15:53.92] [DummySSHSystem-akka.actor.default-dispatcher-3] [akka://DummySSHSystem/user/RobustActor] Pipe closed
java.io.IOException: Pipe closed
 at java.io.PipedInputStream.checkStateForReceive(PipedInputStream.java:244)
 at java.io.PipedInputStream.receive(PipedInputStream.java:210)
 at java.io.PipedOutputStream.write(PipedOutputStream.java:132)
 at java.io.OutputStream.write(OutputStream.java:58)
 at fr.janalyse.ssh.SSHShell$Producer.sendCommand(SSHAPI.scala:480)
 at fr.janalyse.ssh.SSHShell.sendCommand(SSHAPI.scala:475)
 at fr.janalyse.ssh.SSHShell.execute(SSHAPI.scala:448)
 at dummy.RobustActor$$anonfun$receive$1.apply(DummySSH.scala:34)
 at dummy.RobustActor$$anonfun$receive$1.apply(DummySSH.scala:33)
 at akka.actor.Actor$class.apply(Actor.scala:290)
 at dummy.RobustActor.apply(DummySSH.scala:21)
 at akka.actor.ActorCell.invoke(ActorCell.scala:617)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:179)
 at akka.dispatch.Mailbox.run(Mailbox.scala:161)
 at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
 at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
 at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:997)
 at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1495)
 at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

dim. avril  1 22:15:54 CEST 2012
dim. avril  1 22:15:58 CEST 2012


To identify the process which manages our example ssh connection, just use the following commands, list all sshd processes which belongs to test user, then take the process that do not use your current test shell session.
test@lanfeust ~ $ ps -ef | grep "^test"
test      8824  8815  0 11:20 ?        00:00:00 sshd: test@pts/7 
test      8825  8824  0 11:20 pts/7    00:00:00 -bash
test     32468 32458  0 22:15 ?        00:00:00 sshd: test@pts/8 
test     32469 32468  0 22:15 pts/8    00:00:00 -bash
test     32474  8825  0 22:15 pts/7    00:00:00 ps -ef
test     32475  8825  0 22:15 pts/7    00:00:00 grep --colour=auto ^test
test@lanfeust ~ $ tty
/dev/pts/7
test@lanfeust ~ $ kill -9 32468
test@lanfeust ~ $ 

This example source code is available here : akka-sandbox

JASSH (JAnalyseSSH) 0.7.0 released

JAnalyseSSH is a jsch based scala SSH API which aims to simplify SSH operations.

The hello world script...

(It requires a local user named "test" with password "testtest", remember that you can remove the password, if your public key has been added in authorized_keys file of the test user)
#!/bin/sh
exec java -jar jassh.jar -usejavacp "$0" "$@"
!#
import fr.janalyse.ssh._
SSH.shell(host="localhost", username="test", password=Some("testtest")) { sh =>
  println(sh.execute("""echo -n "Hello World from `hostname`" """))
}

... and the remote vmstat script

#!/bin/sh
exec java -jar jassh.jar -usejavacp "$0" "$@"
!#
import fr.janalyse.ssh._
SSH.once(host="localhost", username="test", password=Some("testtest"))  { ssh =>
  val executor = ssh.run("vmstat 1 10", (l)=> l foreach {println(_)})
  executor.waitForEnd
}


Releases notes for 0.7.0

  • Added new method to SSH : newShell & newSftp for user to manage themselves shell and sftp session
  • Some internal changes to SSHExec class, in order to try to remove actor dependency. Mixing actors systems looks problematic
  • SSHShell new implementation, no more actors used, better performances and behavior, ... (throughput : 504 cmd/s using persistency)
  • SSHExec last result line is no longer lost
  • SSHOptions : new parameter : "prompt" to enable custom shell or console command to be use.
    prompt provide to SSHShell the way to separate command results
  • SSHOptions : connectionTimeout renamed into timeout
  • Various cleanup and enhancements
  • Tests : compare performances persistent SSHShell versus SSHExec commands throughputs
  • SSH : Add an execute immediate method which rely on SSHExec, not SSHShell ! (throughput : 93cmd/s)
    execOnce & execOnceAndTrim
  • SSHExec : Do not rely on DaemonActor/Actor anymore
  • SSHShell : Removed init Thread.sleep => Better performances (throughput : 37 cmd/s instead 1cmd/s)
  • SSH.connect becomes SSH.once
  • Removing apply in SSH class as it may encourage bad usage, and close not called

Sunday, March 11, 2012

Dependency injection using scala traits

Consider the following code example which does not depend on any kind of configuration system. We've just defined a trait, XConfiguration, which describes how we can get simple configuration data.
trait XConfigurable {
  def getSetting(key:String):Option[String] = None
}

trait XPlugin extends XConfigurable {
  def domain:String
}

trait XSniffPlugin extends XPlugin {
  val period=getSetting("period") map {_.toLong} getOrElse 5000L
  val periodInS=period/1000
}

abstract class XSSHPlugin extends XSniffPlugin {
  val host = getSetting("host") getOrElse "127.0.0.1"
  val username = getSetting("username") getOrElse util.Properties.userName
  val password = getSetting("password") 
  val port = getSetting("port") map {_.toInt} getOrElse 22
  def cmd:String
}

case class XTopPlugin(domain:String) extends XSSHPlugin {
  val ignoreKeywords = getSetting("ignoreKeywords")
  def cmd="top -c -b -d %d".format(periodInS)
  override def toString = 
    "top each %ds through ssh to %s%s@%s".format(
        period/1000, username, password map {":"+_} getOrElse "", host
        )
}
Now I would like to modify XTopPlugin behavior in order to let it takes its configuration through typesafe config library, BUT without modifying anything in the code.

The idea here is to create a new trait which inheritates from XConfigurable :
object PlayWithTrait {
  import com.typesafe.config.Config
  import com.typesafe.config.ConfigFactory
  import com.typesafe.config.ConfigException.{Missing,WrongType}
    
  private lazy val context = ConfigFactory.parseString("""
        |Defaults {
        |  period=5000
        |}
        |SSHPlugin = ${Defaults}
        |SSHPlugin = {
        |  host:"127.0.0.1"
        |  port:22
        |}
        |TopPlugin = ${SSHPlugin}
        |TopPlugin = {
        |  ignoreKeywords:"tty$"
        |} """.stripMargin).resolve()

  trait XConfigurableUsingConfig extends XConfigurable {
    def config:Config
    override def getSetting(key:String):Option[String] = {
      try {
        Some(config.getString(key))
      } catch {
        case x:Missing => None
        case x:WrongType => None
      }
    }
  }

...
And then we'll take advantage of scala trait feature which allow us to stack a new behavior to an object at instanciation time :
  def main(args: Array[String]) {
    import collection.JavaConversions._
    
    val localCfg = Map(
        "host" -> "192.168.2.1",
        "username" -> "test",
        "password" -> "testtest"
        )
    
    val topConfig = ConfigFactory.parseMap(localCfg).withFallback(context.getConfig("TopPlugin"))
    
    val top = new XTopPlugin("myserver") with XConfigurableUsingConfig {def config=topConfig}
    
    println(top)
  }
So now our XTopPlugin instance is taking its configuration through typesafe "Config" library, using a kind of closure to give the configuration context (topConfig).


One can object that it is less flexible than using for example spring and its xml configuration files, but remember that scala compiler can be easily embedded in any application, so you just have to use a scala configuration file which will get compiled at runtime... it will bring you very awesome dependencies injection features using pure scala.

Thursday, March 1, 2012

Akka actors : 10 millions messages processed (1s / message) in 75 seconds !

In the previous POST, Akka actors versus Scala actors : control the message throughput of actors, we've seen how to control message throughput and avoid OutOfMemory problem in particular when actors doesn't have the same messages processing rate.

Now we have to find a way to find a better way to simulate our 1 second delay, because using Thread.sleep breaks actors management as it will monopolize all threads of the used executor. In fact we would like to simulate a not-blocking 1s processing time.

So we should remove the following Thread.sleep call :
class MyMessageProcessor extends Actor {
  def receive = {
    case _:DoItMessage => Thread.sleep(1000)
  }
}

The solution is straightforward as Akka comes with a scheduler system which allow you to send a message after a given delay.
class MySimulator(system:ActorSystem) extends Actor {
  def receive = {
    case _:DoItMessage =>
      system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done") 
  }
}
Using scheduleOnce, we didn't monopolize system resources, no thread is used, in fact we achieve a fake processing which simulates an asynchronous response which come 1s later. In the following code, the response is even received asynchronously by a future :
package dummy

import akka.actor._
import akka.util.duration._
import akka.util.Timeout
import akka.pattern.ask
import akka.dispatch.Future

sealed trait MyMessage
case class DoItMessage(cmd:String) extends MyMessage
case class DoneMessage extends MyMessage


object Dummy {
  def main(args:Array[String]) {
    
    val howmanyjob=10*1000000
    
 import com.typesafe.config.ConfigFactory
    implicit val system=ActorSystem("DummySystem",ConfigFactory.load.getConfig("dummy"))
    
    val simu = system.actorOf(
        Props(new MySimulator(system))
        .withDispatcher("simu-dispatcher"),
        name="simulator")

    val appManager = system.actorOf(
        Props(new ApplicationManager(system, howmanyjob))
        .withDispatcher("simu-dispatcher"),
        name="application-manager")

        
    import akka.routing.RoundRobinRouter
    val processor = system.actorOf(
        Props(new MyMessageProcessor(appManager, simu))
        .withDispatcher("workers-dispatcher")
        .withRouter(RoundRobinRouter(10)),
        name="default")
        
    for(i <- 1 to howmanyjob) {
      processor ! DoItMessage("Do the job with ID#%d now".format(i))
    }
    print("All jobs sent")
  }
}

class MyMessageProcessor(appManager:ActorRef, simu:ActorRef) extends Actor {
  def receive = {
    case msg:DoItMessage =>
      implicit val timeout = Timeout(5 minutes)
      val receivedTime = System.currentTimeMillis()
      val future = simu ? msg
      future.onComplete { 
        case result:Either[Throwable, String] =>
          assert(System.currentTimeMillis()-receivedTime >= 1000)
          appManager ! DoneMessage
      }
  }
}

class MySimulator(system:ActorSystem) extends Actor {
  def receive = {
    case _:DoItMessage =>
      // Fake processing, somewhere, the job is executed and we get 
      // the results 1s later asynchronously
      system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done") 
  }
}

class ApplicationManager(system:ActorSystem, howmanyjob:Int) extends Actor {
  val startedTime = System.currentTimeMillis()
  var count=0
  def receive = {
    case DoneMessage => 
      count+=1
      if (count%(howmanyjob/20)==0) println("%d/%d processed".format(count, howmanyjob))
      if (count == howmanyjob) {
        val now=System.currentTimeMillis()
        println("Everything processed in %d seconds".format((now-startedTime)/1000))
        system.shutdown() 
      }
  }
}
The application configuration is the following (application.conf file) :
dummy {
  akka {
    loglevel = WARNING
    actor {
      default-dispatcher {
      }
    }
    scheduler {
      tick-duration = 50ms
      ticks-per-wheel = 1000
    }
  }
  
  simu-dispatcher {
    type = Dispatcher
    mailbox-capacity = 100000
  }
  
  workers-dispatcher {
    mailbox-capacity = 10000
    executor = "fork-join-executor"
    fork-join-executor {
      parallelism-min = 0
      parallelism-max = 6000
      parallelism-factor = 3.0      
    }
  }
}
The build.sbt SBT file (To get dependencies, build, test & run) is the following :
name := "AkkaSandbox"

version := "0.1"

scalaVersion := "2.9.1"

libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0-RC3"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

Everything is processed in 75 seconds using :
  • linux 3.2.1-gentoo-r2 64bits
  • AMD Phenom(tm) II X6 1090T (6 CPU cores)
  • Java HotSpot(TM) 64-Bit Server 1.6.0.31
CPU usage ~80%, java memory footprint is below 300Mo; each message is requiring 50-60 bytes, so the 10 million messages are requiring at least 480Mb (but not simultaneously).

Thursday, February 23, 2012

Akka actors versus Scala actors : control the message throughput of actors

In the post following this one, I show how to enhance the example code in order to be able to process a very high number of messages : Akka actors : 10 millions messages processed (1s / message) in 75 seconds !

Scala default actor system doesn't come with any mechanism to control messages throughput between actors, so it is up to you to implement such feature in order to avoid OutOfMemory problem, in particular when actors are not processing messages at the same rate.
Akka framework is very interesting because it brings you all the features to control and manage throughputs. Let's see how it work through a simple example :
import akka.actor._

sealed trait MyMessage
case class DoItMessage(cmd:String) extends MyMessage

object Dummy {
  def main(args:Array[String]) {
    val system=ActorSystem("DummySystem")
    val processor = system.actorOf(Props[MyMessageProcessor],name="default")
    for(i <- 1 to 10000000) {
      processor ! DoItMessage("Do the job with ID#%d now".format(i))
    }
    println("All jobs sent")
  }
}

class MyMessageProcessor extends Actor {
  def receive = {
    case _:DoItMessage => Thread.sleep(1000)
  }
}
Each message sent is more than 50 bytes, so a JVM with a 512Mo of maximum heap size will quickly run out of memory.

The good news is that akka support bounded actor mailboxes, just add a configuration file with such content :
dummy {
  akka {
    loglevel = WARNING
    actor {
      default-dispatcher {
         mailbox-capacity = 10000
      }
    }
  }
}
And make this configuration file taken into account with the following code update :
    import com.typesafe.config.ConfigFactory
    val system=ActorSystem("DummySystem",ConfigFactory.load.getConfig("dummy"))
This time if you run again the previous code, you'll get a stable memory footprint :

Of course it will take a long time to process as each message requires 1s (and actors are blocked during 1s because of Thread.sleep call, which monopolize a thread during the same duration => waste of resources)... If we want to reduce the processing time although each message requires 1s to be taken into account, we'll have to load-balance message processing accross a high number of actors.


CONTEXT : Scala 2.9.1 / SBT 0.11.2 / AKKA 2.0-rc2 / sbteclipse 2.0.0

Revisiting Git/SVN Update script... and let's try to make it faster

The following script is an enhancement of the one shown in the previous post. It now detects all git or svn projects in the current directory, and then update and compile them.

It could have been straightforward to make it faster by just turning "project" collection into a parallel one (projects.par) but unfortunately, it won't work because of the use of a global mutable variable used to store the current worker directory (Classical trap) ! So more work is mandatory to make this faster.
#!/bin/sh
exec scala -deprecation -savecompiled "$0" "$@"
!#
import java.io.File
import sys.process.Process
import sys.process.ProcessBuilder._

// ======================================================================
case class CurDir(cwd:File)
def file(f:String)=new File(f)
def file(d:File, f:String)=new File(d, f)
implicit def stringToCurDir(d:String) = CurDir(file(d))
implicit def fileToCurDir(d:File) = CurDir(d)
implicit def stringToFile(f:String) = file(f)
implicit def stringToProcess(cmd: String)(implicit curDir:CurDir) = Process(cmd, curDir.cwd)
implicit var cwd:CurDir=file(".").getPath
def cd(dir:File=file(util.Properties.userDir)) = cwd=CurDir(dir)
// ======================================================================

trait Project {
  val dir:File
}
case class GitProject(dir:File) extends Project
case class SvnProject(dir:File) extends Project

// Find GIT or SVN project directories
val projects = file(".").listFiles filter { _.isDirectory} flatMap { d =>
  if (file(d, ".git").exists) Some(GitProject(d))
  else if (file(d, ".svn").exists) Some(SvnProject(d))
  else None
}

// Update compile project (update also eclipse conf files generated by sbteclipse plugin)
for(project<- projects) {
  println(project)
  cd(project.dir)
  project match {
    case GitProject(_) => "git pull" !
    case SvnProject(_) => "svn update" !
  }
  "sbt eclipse compile" !
}
Let's now revisit the current directory hack to make it compatible with parallel processing and avoid border side effect.
Therefore, we have to modify the "cd" command hack in order to remove the mutable cwd variable :
#!/bin/sh
exec scala -deprecation -savecompiled "$0" "$@"
!#
import java.io.File
import sys.process.Process
import sys.process.ProcessBuilder._

// ======================================================================
case class CurDir(cwd:File)
def file(f:String)=new File(f)
def file(d:File, f:String)=new File(d, f)
implicit def stringToCurDir(d:String) = CurDir(file(d))
implicit def fileToCurDir(d:File) = CurDir(d)
implicit def stringToFile(f:String) = file(f)
implicit def stringToProcess(cmd: String)(implicit curDir:CurDir) = Process(cmd, curDir.cwd)
def cd[A](dir:File=file(util.Properties.userDir)) (indir:(CurDir)=>A) = indir(CurDir(dir))
// ======================================================================

trait Project {
  val dir:File
}
case class GitProject(dir:File) extends Project
case class SvnProject(dir:File) extends Project

// Find GIT or SVN project directories
val projects = file(".").listFiles filter { _.isDirectory} flatMap { d =>
  if (file(d, ".git").exists) Some(GitProject(d))
  else if (file(d, ".svn").exists) Some(SvnProject(d))
  else None
}

// Update compile project (update also eclipse conf files generated by sbteclipse plugin)
for(project<- projects.par) {
  println(project)
  cd(project.dir) { implicit cwd =>
    project match {
      case GitProject(_) => "git pull" !
      case SvnProject(_) => "svn update" !
    }
    "sbt eclipse compile" !
  }
}

println("%d project updated".format(projects.size))

So we couln't keep the classical shell cd behavior, because it implies a mutable state, so in this script we turn the cd "classical" definition into a new one, with a new argument, a function which will take as argument, the given working directory to use.

In my case (7 projects), using "projects.par", the script executes in 10s instead of 33s !

CONTEXT : Scala 2.9.1

Wednesday, February 1, 2012

Small script to update & compile a list of svn project

Scala script skeleton to update a set of subversion projects.
#!/bin/sh
exec scala -deprecation -savecompiled "$0" "$@"
!#

// ======================================================================
import sys.process.Process
import sys.process.ProcessBuilder._

case class CurDir(cwd:java.io.File)
implicit def stringToCurDir(d:String) = CurDir(new java.io.File(d))
implicit def stringToProcess(cmd: String)(implicit curDir:CurDir) = Process(cmd, curDir.cwd)
implicit def stringSeqToProcess(cmd:Seq[String])(implicit curDir:CurDir) = Process(cmd, curDir.cwd)

implicit var cwd:CurDir=scala.util.Properties.userDir
def cd(dir:String=util.Properties.userDir) = cwd=dir
// ======================================================================

val updateList=List("project1", "project2", "project3")

for(dir<- updateList) {
  println("----------------------------------------")
  println("Processing %s".format(dir))

  cd(dir)

  "svn update" !

  "sbt eclipse compile" !
}

Monday, January 30, 2012

Simplifying scala scripts : Adding #include support to your scripts

Test a #include support example by executing the following 5 lines :
$ wget http://dnld.crosson.org/bootstrap.tar.gz
$ tar xvfz bootstrap.tar.gz
$ cd bootstrap
$ sbt assembly
$ ./scripts/test.scala
The test.scala example script is the following :
#!/bin/sh
DIRNAME=`dirname "$0"`
exec java -jar "$DIRNAME"/bootstrap.jar "$0" "$@"
!#

#include "shell.scala"

cd("/etc/")

"ls" #| "grep net" !

go to /etc directory, and prints files which contain net keyword in their names.

test.scala script includes the following file : scripts/include/shell.scala
import sys.process.Process
import sys.process.ProcessBuilder._
 
case class CurDir(cwd:java.io.File)
implicit def stringToCurDir(d:String) = CurDir(new java.io.File(d))
implicit def stringToProcess(cmd: String)(implicit curDir:CurDir) = Process(cmd, curDir.cwd)
implicit def stringSeqToProcess(cmd:Seq[String])(implicit curDir:CurDir) = Process(cmd, curDir.cwd)

implicit var cwd:CurDir=scala.util.Properties.userDir
def cd(dir:String=util.Properties.userDir) = cwd=dir
This file contains some definitions to make possible for the user to change current directory.

All the include mechanism logic in defined as follow :
package fr.janalyse.script

import scala.tools.nsc.ScriptRunner
import scala.tools.nsc.GenericRunnerCommand
import scala.io.Source
import java.io.File

object Bootstrap {
  val defaultOptions = List("-nocompdaemon","-usejavacp","-savecompiled", "-deprecation")
  val defaultExpandedScriptExt = ".pscala"
  
  val includeRE = """\s*#include\s+"(.+)"\s*"""r
  
  def expand(file:File, availableIncludes:List[File]) : List[String] = {
    val content=Source.fromFile(file).getLines().toList
    // First we remove "shell" startup lines, everything between #! and !#
    val cleanedContent = content.indexWhere { _.trim.startsWith("!#") } match {
      case -1 => content
      case i  => content.drop(i+1)
    }
   // Then we expand #include directives
   cleanedContent flatMap {
     case includeRE(filename) =>
       val fileOpt = availableIncludes find {_.getName() == filename}
       fileOpt orElse {
          throw new RuntimeException("%s : Couln't find include file '%s' ".format(file.getName, filename))
       }
       fileOpt map { file => expand(file, availableIncludes)} getOrElse List.empty[String]  
     case line => line::Nil
   }
  }
    
  def main(cmdargs:Array[String]) {
    val command = new GenericRunnerCommand(defaultOptions ++ cmdargs.toList)
    val scriptDir = new File(cmdargs(0)).getParentFile()
    val includePath = List(new File(scriptDir, "include"), scriptDir)
    val availableIncludes = includePath filter {_.exists()} flatMap {_.listFiles()}
    val scriptname = command.thingToRun 
    val script = new File(scriptname)
    val richerScript = new File(scriptname.replaceFirst(".scala", defaultExpandedScriptExt))
    
    if (script.exists()) {
      val jars = util.Properties.javaClassPath.split(File.pathSeparator) map {new File(_)} collect {
        case f if (f.exists() && f.isFile()) => f 
      }
      val jarsLastModified = (jars map {_.lastModified()} max)
      
      if (!richerScript.exists ||  // -- nothing already available
          (jarsLastModified > richerScript.lastModified) ||   // -- Bootstrap jar is newer
          (script.lastModified > richerScript.lastModified)) { // -- Script has been modified
        val newcontent =  expand(script, availableIncludes).mkString("\n")
        new java.io.FileOutputStream(richerScript) {
          write(newcontent.getBytes())
        }.close()
      }
    }
    ScriptRunner.runScript(command.settings, richerScript.getPath, command.arguments)
  }
}

How does it work :
The principle is to override scala standard script startup mechanism by introducing an additionnal step which consist to expand the script with all includes it contains, and then gives to scala the new script resulting of expansion process.
test.scala becomes test.pscala which will generate the savedcompile file test.pscala.jar. No recompilation will be required as soon as no change occured on test.scala or bootstrap.jar file.

You should also notice that the script is started using 'exec java -jar "$DIRNAME"/bootstrap.jar "$0" "$@"' and not 'exec scala ...' because bootstrap is an assembly jar which contains everything to run and compile scala scripts, and even more if you want, as it can include any third parties you may need, just add library dependencies ! So you only need one file, bootstrap.jar, to run any scala scripts, nothing to install, just one file to upload.

SBT build configuration : bootstrap/build.sbt
import AssemblyKeys._

seq(assemblySettings: _*)

name := "bootstrap"

version := "0.1"

scalaVersion := "2.9.1"

libraryDependencies <++=  scalaVersion { sv =>
   ("org.scala-lang" % "scala-swing" % sv) ::
   ("org.scala-lang" % "jline"           % sv  % "compile") ::
   ("org.scala-lang" % "scala-compiler"  % sv  % "compile") ::
   ("org.scala-lang" % "scala-dbc"       % sv  % "compile") ::
   ("org.scala-lang" % "scalap"          % sv  % "compile") ::
   ("org.scala-lang" % "scala-swing"     % sv  % "compile") ::Nil   
}

mainClass in assembly := Some("fr.janalyse.script.Bootstrap")

jarName in assembly := "bootstrap.jar"

SBT Plugins configuration : bootstrap/project/plugins.sbt file
resolvers += Classpaths.typesafeResolver

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.0.0-M3")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.7.2")

Thursday, January 26, 2012

Simplifying scala scripts

I've already written several dozens of scala scripts (series computations, ssh automation, various jmx operations, remote administration, garbage collector log analysis, ...) and found this language quite interesting for use as a script language. There's many reasons for that :
  • Script automatic compilation reduce runtime error. I am often amazed at the first attempt to get a script that works and without any runtime error !
  • Take benefits of scala powerfull collections that make possible to write "sql" like operations
  • It becomes straightforward to parallelize tasks using Actors; one of my favorite use case is a short script that trigger an explicit garbage collection on several dozens of remote jvm in a very short time
But I miss some features that will help to make scala scripts even simpler and concise :
  • A #include like feature within script
  • A way to modify default imports, to avoid adding always the same imports in all scripts
  • the #! !# shell scala bootstrap can become long (and not DRY) once you want to add many external java dependencies
In fact those missing features are no so difficult to implement, the following source code is a proof of concept that shows it no so difficult to implement those features. It defines a class, Bootstrap, which can be use to start a scala script and that will bring new imports and definitions to your script.
package fr.janalyse.script

import scala.tools.nsc.ScriptRunner
import scala.tools.nsc.GenericRunnerCommand
import scala.io.Source

object Bootstrap {

  val header = 
"""// WARNING
// Automatically generated file - do not edit !
import sys.process.Process
import sys.process.ProcessBuilder._
 
case class CurDir(cwd:java.io.File)
implicit def stringToCurDir(d:String) = CurDir(new java.io.File(d))
implicit def stringToProcess(cmd: String)(implicit curDir:CurDir) = Process(cmd, curDir.cwd)
implicit def stringSeqToProcess(cmd:Seq[String])(implicit curDir:CurDir) = Process(cmd, curDir.cwd)

implicit var cwd:CurDir=scala.util.Properties.userDir
def cd(dir:String=util.Properties.userDir) = cwd=dir

"""

  val footer = 
"""
"""

  def main(cmdargs:Array[String]) {

    def f(name:String) = new java.io.File(name)
    
    val na = List("-nocompdaemon","-usejavacp","-savecompiled", "-deprecation") ++ cmdargs.toList
    
    val command = new GenericRunnerCommand(na)
    
    import command.settings
    
    val scriptname = command.thingToRun 
    val script = f(scriptname)
    val richerScript = f(scriptname.replaceFirst(".scala", ".scala-plus"))
    
    if (script.exists()) {
      if (!richerScript.exists || (script.lastModified > richerScript.lastModified)) {
        val content=Source.fromFile(script).getLines().toList
        val cleanedContent = content.dropWhile(x => !x.startsWith("!#")).tail.mkString("\n")
        val newcontent =  List(header, cleanedContent, footer).mkString("\n")
        new java.io.FileOutputStream(richerScript) {
          write(newcontent.getBytes())
        }.close()
      }
    }
    
    val args = command.arguments
    
    ScriptRunner.runScript(settings, richerScript.getName, args)
  }  
}

Then generate a standalone executable jar with this class and all needed dependencies, thanks to such SBT build specification :
import AssemblyKeys._

seq(assemblySettings: _*)

name := "bootstrap"

version := "0.1"

scalaVersion := "2.9.1"

libraryDependencies <++=  scalaVersion { sv =>
   ("org.scala-lang" % "scala-swing" % sv) ::
   ("org.scala-lang" % "jline"           % sv  % "compile") ::
   ("org.scala-lang" % "scala-compiler"  % sv  % "compile") ::
   ("org.scala-lang" % "scala-dbc"       % sv  % "compile") ::
   ("org.scala-lang" % "scalap"          % sv  % "compile") ::
   ("org.scala-lang" % "scala-swing"     % sv  % "compile") ::Nil   
}

mainClass in assembly := Some("fr.janalyse.script.Bootstrap")

jarName in assembly := "bootstrap.jar"

you'll be able to directly run any scala script like that :
#!/bin/sh
exec java -jar bootstrap.jar "$0" "$@"
!#

cd("/etc/")

"ls" #| "grep net" !

Thanks to the assembly SBT plugin, you've generated a standalone executable jar, which contains the scala compiler, and our custom scala script startup mechanism.
In a next POST, I'll describe more in detail a new bootstrap implementation that will bring #include feature to scala script.