Skip to content
Apr 21 15

Parser combinators creating functions for DSL execution

by Frank Sauer

Last year in August I wrote about creating an Alerting DSL using Scala parser combinators. We are currently in the process of upgrading our metrics collection and alerting system to InfluxDB 0.9 which introduced tags and the need for us to compare a set of tag values in order to decide whether to fire an alert or not. Previously all information to make this decision was encoded in the series name, but now series names are very simple. They are just the name of the measurements, for example HeapSize. All the information about where the measurement came from is now encoded in tags like , host, process, etc.

Comparing the values in a set of tags with constants written in a rule sounds a lot like something we all know very well: SQL where clauses… So let’s see how we can parse a where clause into a function that we can evaluate on a set of tag values. We would also like to support complex logical expressions using AND and OR as well as the SQL “like” operator, which in our case does a regular expression match.

Let’s start with the test case demonstrating the resulting API:

"AlertRule" should {
    "include a filter when a where clause is present" in new AlertFixture {
      val rule: Seq[AlertRule] = AlertRule("""
          Rule1: 
            when /somemeasurement.*/ > 1000 MB 
            for 30 seconds
            where name="bla" and whatever like /foo.*/ 
            alert (name="boo")
      """)

      rule.length shouldEqual(1) // we specified a single rule in the DSL  
      rule(0).filter match {     // it has a where clause so there must be a filter
        case Some(filter) => // execute the filter on some good and bad data
          filter(Map("name" -> "bla", "whatever"->"fooBar")) shouldEqual(true)
          filter(Map("name" -> "bla", "whatever"->"goober")) shouldEqual(false)
          filter(Map("name" -> "boo", "whatever"->"fooBar")) shouldEqual(false)
        case _ => failure("filter should have been defined with a given where clause")
      }
    }
}

What this test case shows us is that if we have a rule with a where clause (they’re optional) then the resulting AlertRule should contains a filter function and this filter function takes a Map of tags as its parameter and applying it results in a Boolean which is true if the set of tags makes the entire where clause evaluate to true. Please refer to the previous post for details on the rest of the DSL and the AlertRule case class and parser.

Let’s see how we parse this where clause from the top down, ignoring the where keyword itself. The expression that follows it is defined by this rule:

    // combine where-clause conjunctions with logical or
    def disjunction: Parser[(Map[String,String])=>Boolean] = 
        conjunction ~ ("or" ~> conjunction).* ^^ {
          case head ~ tail =>  tail.foldLeft(head) {
            case (left,right) => (tags:Map[String,String]) => left(tags) || right(tags)
        }
    }

What we see here is that the top level expression combines 1 or more conjunctions using a logical ‘or’ operator and the result is a boolean function that accepts a Map with tags. Note that left and right (the conjunctions) are also parsers that return a boolean function so the result is simply a function that takes tags and combines left and right using || and propagating the tag collection to both. The foldLeft repeats this for every conjunction in the list.

Conjunctions look very similar, except they use && to combine left and right:

    // combine where clause relations with and
    def conjunction: Parser[(Map[String,String])=>Boolean] = 
        relation ~ ("and" ~> relation).* ^^ {
           case head ~ tail =>  tail.foldLeft(head) {
              case (left,right) => (tags: Map[String,String]) => left(tags) && right(tags)
      }
    }

Separating the parsing into disjunctions and conjunctions will give the proper precedence to the and and or operators. At the lowest level in our where-clause parse tree we find relations. This is where the actual comparison of tags takes place:

    def relEq:Parser[(Map[String,String])=>Boolean] = ident ~ "=" ~ stringLiteral ^^ {
      case left ~ "=" ~ right =>
        (tags: Map[String,String]) => {  
          val k = tags.get(left)         
          k match {
            // if left-hand side not found in the tags, the result is false, 
            // otherwise v has to be equal to value specified in rule after dropping 
            // leading and trailing double quote we get from using stringLiteral
            case Some(v) => v == right.drop(1).dropRight(1)   
            case _ => false
          }
        }
    }

    // reuse the regex parser we already had back in August
    def relLike:Parser[(Map[String,String])=>Boolean] = ident ~ "like" ~ regex ^^ {
      case left ~ "like" ~ right =>
        (tags: Map[String,String]) => {
          val k = tags.get(left)
          k match {  
            // if left-hand side not found in the tags, the result is false, 
            // otherwise v has to be match the regex specified in rule
            case Some(v) => right.findFirstMatchIn(v).nonEmpty
            case _ => false
          }
        }

    }

    // parses a simple comparison in a where clause (tag = "value" or tag like /regex/)
    def relation: Parser[(Map[String,String])=>Boolean] = relEq | relLike    
    

There are two cases here, one for simple comparison (relEq) and one for the like operator (relLike). They both have to also deal with the case where the tag key specified in the DSL is not actually found in the set of tags being evaluated. I’m sure there’s a nice refactoring hiding in here somewhere…

Once again I hope I’ve demonstrated the incredible power and versatility of the Scala parser combinators. I especially like parsers that build functions as shown here. The way conjunctions and disjunctions combine simple comparisons into bigger and bigger logical expressions that can be evaluated later when we’re streaming measurements through the rules is just the coolest thing ever.

Mar 14 15

Dining with a lost uncle philosopher

by Frank Sauer

For some reason the actor model has always felt natural and somewhat familiar to me and I never really knew why until recently. Even when I was first dabbling in Erlang, which is when I first saw them ‘officially’, it felt familiar even though the language was completely alien.

Recently, I was going through some old stuff from my college days and stumbled on an old 1988 technical report from the Philips Research Laboratories containing a collection of papers on a language called POOL-T (pl and parpl documentation release 3.0, May 1988). I had taken a class back in early 1989, the goal of which was to learn two new programming languages, implement the same project in both and then write a report comparing and contrasting the two languages based on the implementations of that project. I do not remember the reasons why, but I chose Smalltalk and POOL-T. I suspect I took the class as an excuse to learn Smalltalk and I have no idea why I even knew about the existence of POOL-T… They sure knew how to name things though; POOL (Parallel Object oriented Language) was designed to run on DOOM (Decentralized Object oriented Machine)!

As I started to read this report, to my surprise its content again felt very familiar…. Objects in POOL-T feel a lot like what we call actors today, as we’ll see in the examples below. I do not know if the researchers at Philips were aware of Erlang and the OTP at the time they developed it. It is more likely that both POOL-T and the Erlang actor model were at least in part based on the 1973 paper that introduced the actor model: “A Universal Modular Actor Formalism for Artificial Intelligence” by Carl Hewitt, Peter Bishop and Richard Steiger.

However, it is also entirely possible the similarity is coincidental and perhaps POOL-T is not based on actors at all… more on this later.

Let’s look at an example POOL-T program (Excerpt from a larger Dining Philosopher example from the POOL-T User Manual by Lex Augusteijn, Nov 23, 1987) and see if these objects are actually actors:

CLASS Fork

  METHOD pickup () Fork :
    RETURN SELF
  END pickup

  METHOD putdown () Fork :
    RETURN SELF
  END putdown

  ROUTINE new () Fork :
    RETURN NEW 
  END new

  BODY
    DO TRUE THEN
      ANSWER (pickup);
      ANSWER (putdown)
    OD
END Fork

CLASS Room
  VAR occupancy, max_occupancy: Integer

  METHOD init(max : Integer) Room :
    max_occupancy <- max;
    occupancy <- 0;
    RETURN SELF
  END init

  METHOD enter () Room :
    occupancy <- occupancy + 1;
    RETURN SELF
  END enter

  METHOD exit () Room :
    occupancy <- occupancy - 1;
    RETURN SELF
  END exit

  ROUTINE new (max: Integer) Room :
    RETURN NEW ! init(max)
  END new

  BODY
    ANSWER (init);
    DO TRUE THEN
       SEL occupancy > 0             ANSWER(exit)
        OR occupancy < max_occupancy ANSWER(enter)
       LES
    OD
END Room

Let's look at forks first as they are the simplest. They are created using the routine new (think of a routine as a static method). A new fork is assumed to be lying on the table right after creation. The BODY of a POOL-T class describes a process that starts executing when an instance is created. Each POOL-T object has its own active process! Forks can answer two messages, pickup and putdown, however, they have to alternate - a fork can not be picked up twice in succession without being putdown first. This behavior is represented by the infinite loop DO TRUE ... OD and the sequence of ANSWER statements in it. The ANSWER statement states that the object executing it is prepared to answer any message listed in its list of method ids (here it lists only one in each case). When an ANSWER statement is executed the object selects a message that has arrived and whose method name occurs in the list of the ANSWER statement (you can use ALL to stand for all methods). If no such message has yet arrived, the object keeps waiting until one does arrive (which may take forever). If there is more than one such message, the one that arrived first is selected. This feels very much like an actor, in fact the Akka actor would look something like this:


case object Pickup
case object Putdown

class Fork extends Actor {

  def receive = onTable

  def onTable: Receive = {
    case Pickup => context become inUse
  }

  def inUse: Receive = {
    case Putdown => context become onTable
  }
}

Let's examine the Room class next. Rooms are also created using the new routine. In this case the new object is sent an init message with the maximum occupancy for the new room. The initialization of rooms is enforced by the process described in the BODY as the first and ONLY message it is willing to answer is the init message. Only after init has been answered can a room be entered, but only if the maximum occupancy has not been reached. Furthermore, a room can be exited, but only if it still has occupants. Note the SEL statement represents the conditional answering of messages. It is executed as follows (from "Definition of the programming language POOL-T", 1986 by Pierre America):

  1. All guards are evaluated in the order in which they are specified in the program text
  2. All commands whose guards evaluated to false are discarded
  3. The set of methods from the ANSWER statement of one of the SEL clauses is determined
  4. The set of all messages that have arrived for this object and whose method identifier is in the above set is determined
  5. If this set of messages is empty, and there are open guarded commands without an ANSWER statement, the textually first of them is selected. Its statement sequence after THEN is executed if present and the SEL terminates
  6. If this set of messages is empty and there is no open guarded command without an ANSWER statement, the object waits until a message that belongs to this set arrives, and then proceeds with this message
  7. If this set of messages is not empty, the message that arrived first is selected. The first guarded command is chosen whose ANSWER statement contains the method named in the message. Of this guarded command, first the ANSWER statement is executed, then the statement sequence after THEN (if present). After that the select statement terminates.

This SEL statement also translates quite naturally:


case class Init(max: Int)
case object Enter
case object Exit

class Room extends Actor {

  def receive = init

  def init: Receive = {
    case Init(max) => context become ready(max, 0)
  }

  def ready(max_occupancy: Int, occupancy: Int): Receive = {
    case Enter if occupancy < max_occupancy => 
      context become ready(max_occupancy, occupancy + 1)
    case Exit  if (occupancy > 0 => 
      context become ready(max_occupancy, occupancy - 1)
  }
}

So at first glance, all of the above sounds a LOT like actors... Messages are sent using !, they are queued up at the receiver, and they may or may not be answered... The SEL feels a lot like a pattern match with guards used in the receive method of an Akka actor.

So are POOL-T objects in fact actors?

To find out, let's look at the body (just the body for brevity) of the Philosopher class and how it makes use of the rooms and forks and sends messages to these objects:

CLASS Philosopher
 ...
BODY
  ANSWER (init)
  DO TRUE THEN
    think();
    room ! enter();
    lfork ! pickup() ; rfork ! pickup();
    eat();
    lfork ! putdown(); rfork ! putdown();
    room ! exit();
  OD
END Philosopher

It turns out the room and forks are used like locks and this will only work as intended if ! is synchronous and blocks... So, the ! is not asynchronous as in the actor model, but concurrency can be achieved by having statements in a POST block following the RETURN statement in a receiving method. These are executed after the return value is returned to the sender and execute in parallel with the sender. The language spec states this about the send expression:

... Finally the message, consisting of the indicated method identifier and the parameters is sent to the destination object. When it is answered by the destination object, the corresponding method is invoked with the parameters filled in and the result is sent back. This result is then the result of the send expression.

And about the RETURN and POST section:

...After that, the RETURN expression is evaluated and the result of this expression is returned to the object that sent the message. The part of the execution so far is called the "render-vous": the sending object is waiting for the receiving one to process its message. After the rendezvous both objects will execute independently. Finally the post processing section (if present) is executed and then the answer statement terminates. So it is possible for the sender of the message to resume its execution while the receiver is executing the post-processing section.

And there we have it. Even though at first glance POOL-T objects feel like actors, the semantics of ! are in fact the opposite of actors and it would be hard to consider POOL-T objects actors for this reason alone. According to the Message Passing semantics of the actor model asynchronous communication between actors is a core feature of the actor model. Therefore POOL-T is much more like CSP instead. It would be much harder to implement the Philosopher with actors for forks and the room, but a quick google shows that it can and has in fact be done; see Victor Klang's post Dining Hakkers but to me this doesn't feel nearly as natural as the synchronous POOL-T implementation.

Jan 25 15

Filtering Akka Streams by elapsed time

by Frank Sauer

It has been a while since my last blog post, sometimes the day job gets in the way of the more interesting experiments worth blogging about. However, lately I’ve had some time to play with the latest Akka Streams (1.0-M2) and figure out how to map a common CEP use case involving the use of elapsed time onto it.

Consider a stream of data coming from temperature sensors, say in the form of a Temperature(sensor: String, value: Double, time: Long). Let’s further assume we want to alert on this data if we see a temperature over 100 degrees. However, our sensors are a bit flaky and the data stream will have occasional spikes where one measurement suddenly shoots up for a very brief period of time, perhaps only a single measurement. We don’t want to emit false alarms, so we have to deal with these spikes and filter them out. Or as an alternative use case let’s say the sensors are part of some equipment that can handle a temperature of 100 degrees for 2 minutes but no longer, so we only want to alert if the temperature stays over 100 for more than 2 minutes. I’m sure you can come up with other use cases for the use of time as well, it is a common scenario.

Testing a condition like “temperature over 100″ sounds like the standard filter operation, and the akka streams documentation for 1.0-M2 has an example of a Filter operator, implemented as a subclass of PushStage (renamed type A to E for Event):

class Filter[E](p: E => Boolean) extends PushStage[E, E] {
    override def onPush(evt: E, ctx: Context[E]): Directive =
       if (p(evt)) ctx.push(evt)
       else ctx.pull()
}

Let’s use this as our starting point and add elapsed time to its signature. Let’s try this definition :

class FilterFor[E](duration: FiniteDuration)(p: E => Boolean) extends PushStage[E, E] {
    override def onPush(evt: E, ctx: Context[E]): Directive = ???
}

Conceptually this is what we’re looking for, but there are some technicalities that will make this a bit more complicated, caused by the fact that we’re potentially receiving multiple events during the given timeframe for which the predicate evaluates to true. With a regular filter function there is no problem, each event for which the predicate yields true is pushed downstream immediately, but in FilterFor we have to wait for the given duration and compare the inbound events with a previous event to validate that the condition still holds or the elapsed time has exceeded the given duration. This means we need the ability to compare the inbound events, and cache them. So we need a key for the events. The events themselves probably won’t make good keys so let’s provide a key function E=>K that extracts a key from an event.

We also need to decide what to do with all the events received during the time interval. Do we keep the first, the last, all of them? Because this is a filter and the expectation of a filter is that the inbound and outbound type is the same, we need to push an E downstream, but which one? Let’s leave that up to the user as well and expose both the key function as well as this reducer in the API:

class FilterFor[E,K](duration: FiniteDuration)
                  (key: E=>K, reduce: Seq[(Long,E)]=>E)
                  (predicate: E => Boolean) extends PushStage[E, E] {

    var pending: Map[K,Seq[(Long,E)]] = Map.empty

    override def onPush(evt: E, ctx: Context[E]): Directive = ???
}

Before we look at the implementation I want to emphasize one very important rule from the akka streams documentation:

Warning
There is one very important rule to remember when working with a Stage. Exactly one method should be called on the currently passed Context exactly once and as the last statement of the handler where the return type of the called method matches the expected return type of the handler. Any violation of this rule will almost certainly result in unspecified behavior (in other words, it will break in spectacular ways). Exceptions to this rule are the query methods isHolding() and isFinishing()

This is very important and limits the ways we can implement our filterFor behavior. For example, in a first attempt I tried to use timers but this broke this rule (and the Context is not thread-safe) and indeed failed quite spectacularly.

As already shown above, we will keep some internal state in the form of a Map that maps keys to sequences of events and the timestamp at which they were received. We’ll use System.nanoTime() as the timestamp. If this is not accurate enough for your hardware or use case I’d like to talk to you :-)

When a new event arrives we have to deal with 4 cases:

  • The predicate matches and we’ve seen this event before. In this case if the first occurrence was far enough in the past we want to reduce all the events we’ve seen during the time interval and push the result downstream, if not, we simply add it to the sequence of pending events and continue
  • The predicate no longer matches but we have seen this event before. The condition no longer holds so we can simply remove any pending events and continue
  • The predicate matches but we’ve never seen this event before. In this case we store the event in our list of pending events and continue
  • The predicate does not match and we’ve never seen this event before. This is the don’t care case and we simply continue, however this is also a good opportunity to eliminate stale pending events, which are possible and even likely because we may see just one or a few events for which the predicate holds but not enough to fill the time frame and we never see a matching event for which the predicate no longer holds. In this case the pending events would not get cleared out by the second case

In the above description, when I say “and continue” this means invoke pull on the context, indicating to upstream stages that we’re ready to receive more data. Translated to code this looks like this:

class FilterFor[E,K](duration: FiniteDuration)
                  (key: E=>K, reduce: Seq[(Long,E)]=>E)
                  (predicate: E => Boolean) extends PushStage[E, E] {

    var pending: Map[K,Seq[(Long,E)]] = Map.empty
    val nanos = duration.toNanos

    override def onPush(evt: E, ctx: Context[E]): Directive = {
       val k = key(evt)
       val now = System.nanoTime

       pending.get(k) match {
          case Some(previous) if predicate(evt)  => 
               val withNext = previous :+ now->evt
               if (now - previous.head._1 >= nanos) {
                  pending = pending - k
                  ctx.push(reduce(withNext))
               } else {
                  pending = pending.updated(k, withNext)
                  ctx.pull
               }
          case Some(previous) if !predicate(evt) => 
               pending = pending - k
               ctx.pull
          case None if predicate(evt) =>
               pending = pending + (k -> Vector(now->evt))
               ctx.pull
          case _ =>
               // none of the above, good time to remove stale entries from the cache
               for {
                 (k,(t,_)::_) <- pending // get key and oldest timestamp
                 if (now - t) > nanos
               } yield {pending = pending - k} 
               ctx.pull
       } 
    }
}

That’s it, let’s try it out. To do so we’ll create a stream of mock temperature sensor data and figure out a way to stream it at a fixed rate, say 1 sample every 100 milliseconds, so we can test out FilterFor stage. The latter can be achieved by zipping our test data with a tick stream using the impressive Akka streams FlowGraph DSL:

  case class Tick(time:Long)

  case class Temperature(sensor:String, value: Double, time:Long = 0)

  val data = Source(List[Temperature](
      Temperature("S1",100), Temperature("S2",100),
      Temperature("S1",101), Temperature("S2",101),
      Temperature("S1",102), Temperature("S2",102),
      Temperature("S1",105), Temperature("S2",99),
      Temperature("S1",100), Temperature("S2",100),
      Temperature("S1",101), Temperature("S2",101),
      Temperature("S1",102), Temperature("S2",102),
      Temperature("S1",103), Temperature("S2",103),
      Temperature("S1",101), Temperature("S2",99),
      Temperature("S1",102), Temperature("S2",102),
      Temperature("S1",100), Temperature("S2",100)
  ))

  // emit a tick every 100 millis
  val ticks = Source(0 second, 100 millis, () => Tick(System.nanoTime()))

  val temps: Source[Temperature] = Source() { implicit b =>
     import FlowGraphImplicits._
     val out = UndefinedSink[Temperature]
     val zip = ZipWith[Tick,Temperature,Temperature]((tick,temperature)=> temperature.copy(time = tick.time))
     ticks ~> zip.left
     data ~> zip.right
     zip.out ~> out
     out
  }

temps is now a Source[Temperature] that emits a Temperature object every 100 millisecond. Note that in the above we also inject a timestamp into our fake measurements, normally I would assume the measurements will already contain such timestamp. We now need a key function and a reducer. Let’s write a reducer that returns the maximum temperature sample seen during the interval for which the predicate held. The key is simply the sensor id:

  def tempKey (t:Temperature):String = t.sensor
  def tempReduce (ts: Seq[(Long,Temperature)]):Temperature = {
    def compare(t1: (Long,Temperature),t2 : (Long,Temperature)) = if (t1._2.value > t2._2.value) t1 else t2
    ts.reduceLeft(compare)._2
  }

And now, finally, we can write a flow in which we use our filterFor:

  temps
    .filterFor(1 seconds)(t => t.value >= 100)(tempKey,tempReduce)
    .map(t=>s"HOT!!!: $t")
    .to(Sink.foreach(println(_)))
    .run

This prints the following :

HOT!!!: Temperature(S1,105.0,1422202132063859000)

The careful reader will now say “Wait a minute! temp is a Source[Temperature], how can we invoke filterFor on that? We never defined such a thing!” I left out a bit of magic that I thought was so cool I left it for last… After adding shiny new tools like FilterFor to our toolbox we can add them to the existing DSL using an implicit class, like so:

  // pimp the dsl
  implicit class FilterForInSource[E](s: Source[E]) {
    def filterFor[K](duration:FiniteDuration)(predicate: E=>Boolean)
                 (key: E => K = (evt:E)=>evt.asInstanceOf[K],
                  reduce: Seq[(Long,E)] => E = (evts:Seq[(Long,E)])=>evts.head._2):Source[E] =
      s.transform(() => new FilterFor(duration)(key,reduce)(predicate))
  }

Without this bit of implicit magic, you’d have to use transform in your flows. Note also that I’ve provided defaults for the key function (the identity function) and the reducer (returns the oldest event), so if this works for your use case – in other words, events are themselves usable keys and you want to push the oldest one downstream – you can simply leave the entire last parameter list off and improve the readability of your flows.

If you’ve read this far, you deserve a present: all this code is available on github, enjoy :-)

I want to thank Endre and Konrad from the Akka mailing list for helping me out and the akka team as a whole for coming up with this cool stuff. I can see myself using this often and with pleasure in the future.

Aug 31 14

Parser combinators FTW

by Frank Sauer

In my previous post I introduced the AlertRule case class as one of the domain classes of our performance monitoring application. An AlertRule captures under which conditions to fire alerts when metrics violate a given threshold. In this post I will describe how instances of AlertRule get created by parsing an external DSL with a parser built with Scala’s parser combinators.

Here is a slightly modified version of the AlertRule case class I defined in the previous post:

case class AlertRule(name:String, matcher: Regex, 
                     trigger: (Number,Number)=>Boolean, 
                     threshold: Number, msg: String, 
                     duration: Option[FiniteDuration])

To make the parser a bit more interesting, I have added an optional duration property. The goal is to not immediately fire an alert when a matching metric triggers the trigger function, but instead continue to monitor this metric and if it keeps triggering the trigger function for the amount of time specified in duration, then we’ll fire the alert. If at any time during the specified duration the metric stops triggering the trigger, we forget the pending alert and start over. In my next post I will describe an Akka actor that enforces these rules by using Akka schedulers to handle this timing aspect.

Let’s use an example rule to see what a DSL for alert rules like this might look like:

queueSize: 
when /messaging\.queues\..*\.spooled/ > 5000
for 30 seconds
alert "Queue $2 too large. $value messages spooled. Stalled consumer?"

Assuming we collect metrics with a ‘messaging.queues.queuename.spooled’ path name, this rule says that whenever we see a spooled metric on any queue with a value larger than 5000 for at least 30 seconds (without dropping under the 5000 threshold during that time) then fire an alert with the given message. The alert message will undergo some string interpolation to replace $0..$9 with the corresponding part of the metrics path and $value with the actual value of the metric causing the alert. How to do this is left as an exercise for the reader.

At this point I would offer the DSL grammar using some kind of grammar notation like EBNF, but parser implementations using Scala’s parser combinators look so much like such a grammar that I’ll skip this step. Just like in a formal grammar definition language, the idea of parser combinators is that larger constructs can be defined in terms of smaller ones using sequences, choices, optional terms and repetition of terms. The parser combinators correspond exactly to these grammatical constructions, as we will see below.

Scala comes with a few predefined parser traits that can be extended by your own parsers in order to obtain parsers for common language constructs. The JavaTokenParsers trait contains parsers for identifiers, numbers and string literals ready to be used by our DSL parser. Let’s jump right into the deep end of the pool and look at the entire DSL parser. It will turn out to be surprisingly small…

Following the code I’ll explain some of this in more detail, but it helps to know the notation of a few parser combinators up front:

Combinator Description
X ~ Y sequential combinator. Parser will be successful if first X is successful and then Y is successful as well
X ~> Y same as above but drop the result of X. Keep only the Y parser after X parses successfully
X | Y choice combinator. Parser will succeed if either X succeeds or Y does
X >> Y into combinator. It passes the result of parsing X into the function that defines the parser for Y
X.? optional combinator. parsing succeeds if X is absent or parses successfully
X.* expect 0 or more repetitions of X
X ^^ {function} If X parses successfully transform the result using the function
X ^^^ {value} change a successful result into the given value
object AlertRuleParser extends JavaTokenParsers {
  
  override protected val whiteSpace = """(\s|#.*)+""".r

  def duration: Parser[FiniteDuration] =
    "for" ~> wholeNumber ~ timeUnit ^^ {
      case value ~ units => new FiniteDuration(value.toInt, units)
  }

  def timeUnit: Parser[TimeUnit] = (
       "s(econd(s)?)?".r ^^^ {TimeUnit.SECONDS}
    |  "m(inute(s)?)?".r ^^^ {TimeUnit.MINUTES}
    |  "h(our(s)?)?".r ^^^ {TimeUnit.HOURS}
  )

  def op: Parser[(Number,Number)=>Boolean] = (
       "<" ^^^ {(x:Number,y:Number) => x.doubleValue < y.doubleValue}
    | "<=" ^^^ {(x:Number,y:Number) => x.doubleValue <= y.doubleValue}
    | "==" ^^^ {(x:Number,y:Number) => x.doubleValue == y.doubleValue}
    | ">=" ^^^ {(x:Number,y:Number) => x.doubleValue >= y.doubleValue}
    |  ">" ^^^ {(x:Number,y:Number) => x.doubleValue > y.doubleValue}
  )

  def regex: Parser[Regex] = "/" ~> "[^/]*".r <~ "/" ^^ {_.r}

  def ruleHeader: Parser[String] = ident <~ ":"

  def ruleBody(name:String): Parser[AlertRule] =
    "when" ~> regex ~ op ~ floatingPointNumber ~
    duration.? ~
    ("alert" ~> stringLiteral) ^^ {
      case expr ~ trigger ~ threshold ~ duration ~ alert => 
           AlertRule(name,expr,trigger,threshold.toDouble, duration, alert)
  }

  def alert: Parser[AlertRule] = ruleHeader >> ruleBody

  def alerts:Parser[List[AlertRule]] =alert.*

  def apply(rules: String):ParseResult[List[AlertRule]] = parseAll(alerts, rules)

}

And there we have it, the entire DSL parser in 42 lines of code. Let’s go through this rule by rule.

  • The definition of whitespace is overridden on line 3 to allow the use of # as a one-line comment in our rule DSL
  • A valid duration clause (lines 5-14) is the keyword “for” followed by a whole number and a time unit which is either s, second, or seconds, or the equivalent terms for minutes and hours. Notice how the “for” keyword is dropped by using the ~> combinator. By doing so we won’t have to repeat it in the partial function of our transformation following the ^^.Also notice (lines 11-13) how each time unit alternate uses the ^^^ combinator to return an actual instance of TimeUnit. This allows us to very easily create the appropriate FiniteDuration object in the final transformation function which is a partial function matching the number and units to use in the construction of a FiniteDuration. It is common to see the use of partial functions in the ^^ transformers and they use the same parser combinators as the grammar itself, which is a bit confusing at first. It is important to understand that inside the ^^ transformations, we combine the results of the parsers (as opposed to the parsers themselves) and bind them to the given names. Here, value will be the actual result of the wholeNumber parser (30 in our example rule) and units will be the actual TimeUnit object (TimeUnit.SECOND in our example) parsed by the timeUnit parser.
  • On lines 16-22 we see how the comparison operators translate to the actual predicates on two numbers used by the rule to decide if an alert is required or not by comparing the metric value against the threshold using the specified comparator. Parser results can be functions! How cool is that?
  • A regex (line 24) is simply everything between two subsequent ‘/’. The resulting String gets transformed to a regular expression by the transformation function following ^^.
  • To keep the alert rule (line 36) small I split it into a ruleHeader (line 26) and a ruleBody (lines 28-34). (The parentheses can get a bit tricky if you try to use ~> multiple times to eliminate keyword boilerplate.) The ruleHeader simply parses into the name for the alert rule and the ruleBody transforms into the actual AlertRule object. However since the AlertRule constructor requires the name of the rule, how do we get that from the ruleHeader into the ruleBody? This is where the into (>>) combinator comes in. By writing ruleHeader >> ruleBody and defining the ruleBody function with a String parameter (the result of the ruleHeader parser is a String) we get to use the result of the ruleHeader in our ruleBody when we construct the final AlertRule object. Here we see the use of a partial function again matching the exact terms of the grammar minus the parts we left off by using ~>. Note that the AlertRule case class is defined with an Option[FiniteDuration] which is exactly the result of applying the optional combinator (?) to the duration parser.
  • Finally, a parser that can parse a sequence of alert rules is simply defined as alert.* (line 38)
  • The main entry point into our alert rule parser is the apply method (line 40). It invokes the parseAll function inherited indirectly via JavaTokenParsers to parse the given text with the alerts parser. The result is a ParseResult, which in case of success is of type Success[List[AlertRule]]. If parsing fails the result will be an Error or Failure containing an error message. Both can be captured using NoSuccess

Let’s illustrate what happens when we deliberately introduce an error into our rules (days is not a valid time unit):

  def main(args: Array[String]) {

     val text =
       """
         |# rule to catch large queues
         |largeQueue:
         |when /messaging\.queues\..*\.spooled/ > 25000
         |for 30 days
         |alert "Queue $2 too deep. $value messages spooled. Stalled consumer?"
       """.stripMargin

    RuleParser(text) match {
      case Success(alerts,_) => println(s"Got alerts: $alerts")
      case NoSuccess(msg,remaining) => println(s"We have a problem: $msg at: ${remaining.pos}")
    }
  }

This will print We have a problem: string matching regex `h(our(s)?)?' expected but `d' found at: 5.8 The position is line.char. This error message is a bit strange in that it class to expect hours, whereas seconds or minutes would also be acceptable. Getting good quality error messages out of parsers is beyond the scope of this post (translate: I haven’t figured this out yet :-)

If we change 30 days to 10 minutes and run the parser again, we see Got alerts: List(AlertRule(largeQueue,messaging\.queues\..*\.spooled,,25000.0,Some(10 minutes),"Queue $2 too deep. $value messages spooled. Stalled consumer?"))

Thanks to parser combinators it is now a breeze to use external DSLs in our applications without requiring any extra tools (like ANTLR) other than our trusted compiler and I will be making good use of this in the future!

Aug 30 14

Right-sizing your domain model

by Frank Sauer

When we implement domain models using Scala case classes it is tempting to stop after writing a bunch of one-liner case class definitions like these:


case class Source(kind : String, keys: (String,String)*)(tags: (String,String)*)

case class Metric(entity: String, columns: (String,Number)*)

case class Measurement(source: Source, time:Long, metrics: List[Metric]) 

We are glad to have freed ourselves from Java’s boilerplate, and case classes are so compact! Awesome! Is it?

If our entire domain model consists of simple case class definitions like this we get what Martin Fowler calls an Anemic Domain Model, an anti-pattern often found in service-oriented environments, where somehow the design rules came to include that domain objects shall have no behavior – all behavior shall be implemented in services, using the domain model objects as simple bags of data.

Of course, I am not advocating that your entire application should be written as methods in your domain objects, but as shown in my previous post, there is some behavior that makes sense to implement in your domain objects. One of the examples shown there was:


case class Source(kind : String, keys: (String,String)*)(tags: (String,String)*) {
  
   /**
    * Create a new source by appending a new key->value pair at the end
    */
   def + (tuple: Tuple2[String,String]):Source = Source(kind, (keys.toArray ++ Array(tuple)):_*)(tags:_*)
 
}

This is a good example of the kind of behavior that belongs in a domain object. It is completely application independent and simply transforms one domain object into another one (of the same kind in this case, we’ll see different ones in a moment).

Another good example of behavior that belongs directly in the domain model is factory methods to create instances, in this case most likely alternative apply() methods on companion objects to the case classes. An example was shown in the previous post where an apply method with a simplified signature was defined to create instances of Source by parsing a String parameter.

A simple rule of thumb I use to decide what behaviors to add to the domain classes themselves is:

  • Transformations to and from other domain objects or standard Scala types like collections, String, etc.
  • Factory methods

All other behaviors like creating HTML representations or other application specific behaviors do not belong in the domain model, but we’ll see a nifty Scala feature that will allow us to write code in a way that makes it look like it actually does live there…

Sometimes it’s a judgement call and sometimes we get it wrong and that’s OK. Take for example the seriesName method I added to Source in the previous post:

case class Source(kind : String, keys: (String,String)*)(tags: (String,String)*) {
   def asList = List(kind) ++ keys.unzip._2
   def seriesName(sep: String) = asList.mkString(sep)
}

This one is debatable as it could be argued that this is application (or technology) specific behavior (dictated by graphite). If, however, we decide that a dot-separated path for metric names is intrinsic to our domain, then it belongs here.

This Source class is part of a monitoring application and represents the source of a Measurement. A Measurement is a collection of Metrics gathered from the same Source at the same time. Here are some more domain classes for this application (simplified):


case class AlertRule(name:String, matcher: Regex, trigger: (Number,Number)=>Boolean, threshold: Number, msg: String)

case class Alert(time:Long, name:String, source: Source, value:Number, msg:String)

Here is an example of a behavior that – in my opinion – belongs on the domain object, even though it appears to be very application specific, it does transform one kind of domain object (Measurement) into another (Alert) given a third (AlertRule):


case class AlertRule(name:String, matcher: Regex, trigger: (Number,Number)=>Boolean, threshold: Number, msg: String) {
   
   def matches(path: String) = matcher.findFirstIn(path).isDefined
   
   def alerts(m: Measurement): List[Alert] = for {
      metric <- m.metrics
      (key,value) <- metric.columns
      path = s"${m.source.seriesName(".")}.${metric.name}.$key"
      if matches(path) && trigger(value,threshold)
   } yield Alert(time, name, m.source, value, msg)
 
}

An AlertRule creates a list of alerts based on the metrics in a measurement for any metric that matches the regular expression in the rule and for which the trigger function returns true based on the metric's value and the rule's threshold. Given the domain, I don't see how this could ever work differently when (re-)used in a different context, hence my decision to write this code in the domain class itself.

So where do we write the code that does not belong here? As an example, lets see how we could add a toHTML method to the alert class without actually writing it there. The trick is to use an implicit class:

implicit class AlertHTMLNotifications(a:Alert) {
   def toHTML = views.html.alertview(a.name, a.source.seriesName("."), a.value, a.msg)
   def subject = s"${a.name} from ${a.source.seriesName(".")}" 
}

This implicit class is defined in the context of the alerting component of the application. By using an implicit class with a single non-implicit constructor argument the Scala compiler will use it when looking up implicit conversions. This allows us to invoke the toHTML method as-if it was defined on the Alert class itself. (Apple coders: think of this as categories in Objective-C):

class AlertNotifier extends Actor with ActorLogging with Mailer {
   def receive = {
      case a: Alert => mail(a.subject, a.toHTML) 
   }
}

As long as the AlertHTMLNotifications class is in scope for the AlertNotifier actor, it can invoke subject and toHTML on alerts directly. Using implicit classes, each application component or layer can augment the domain objects with as much behavior as they see fit, while at the same time keeping the domain classes themselves pure and context-free.

Note that this application is a Play! Framework application and the toHTML makes use of a Play! template to render the HTML - yet another reason to not have this code in the Alert class itself.

I'm open for feedback and love to find out if I'm totally off-base here. Leave a comment with your feedback.

In the next post I will describe how the AlertRule objects are created by parsing an alert rule DSL with Scala's awesome parser combinators... Stay tuned!

Jul 12 14

I love Scala!

by Frank Sauer

I am currently writing a system to collect application performance metrics and visualize them in various dashboards. The UI part is taken care of by Grafana and I won’t dwell on that in this post. The data collectors are written in Scala using Akka. The systems being measured are hardware messaging appliances and various applications written in Java using a proprietary high performance platform for distributed applications, providing such features as guaranteed messaging, high availability through transactional event sourcing, etc.

Metrics being collected include queue sizes in the messaging appliances, inbound and outbound message rates of the applications, all the JVM stats like heap size, GC stats and CPU stats, as well as numerous other metrics related to the proprietary middleware. All these metrics are received via proprietary java libraries and processed with Actors.

We wanted to evaluate several time-series databases, and there is an amazing number of choices here – the most common choice being Graphite – but others include OpenTSDB, InfluxDB, ElasticSearch (with Kibana as the UI), etc.. Because we wanted to evaluate all of these options, I designed the system around the Akka event stream. The data collectors collect data using the java APIs and publish this data to the Akka event stream using case classes of an internal model for time-series data. For each of the candidate databases we have an actor subscribing to these messages to transform them to whatever format the database desires before shipping it off for storage.

This is where it got interesting. Due to the uncertainty around data stores and the wide variety of storage models for metrics, I needed a flexible internal model. Graphite wants very long dot-separated metric names and each series stores only one numeric value. InfluxDB is much more flexible, it can store multiple columns and these can be of various data types. It looks much more like a traditional relational database in that respect. I wanted my internal model to support both concepts.

The model

I won’t describe the entire model here, the part I want to focus on is the case class describing the source of the metrics being collected. This is the part of the model that in Graphite’s case results in the dot separated metric name, but in Influx’s case could result in a number of named columns being stored. Therefore, it is basically an ordered list of name-value pairs. The first part is just a fixed string, for example “system” or “application” or other very high level classifications of various kinds of metrics (This would be the table name when all keys are stored as columns). The next parts are all name value pairs for things like hostname, server name, thread name, queue name, etc.

Here is what it looks like:


case class Source(kind : String, keys: (String,String)*)(tags: (String,String)*)

The difference between keys and tags is that keys will be used to form graphite-style metrics names, whereas tags are simply additional string values that might be of interest (like the PID for a process) and could be stored by those databases that support it (like openTSDB and InfluxDB) but that do not really need to be part of the series name.

This all seemed to work nicely, but I quickly grew tired of typing long expressions like this all over the place:

val source = Source("appliance","host"->"applianceHost", "vpn"->"myVpnName", "type"->"queue")()

Out of the box, the Source case class does not support any way to add new elements to the keys, which would be handy so we could initially create a Source for a server, then pass it around to other parts of the code that could then add new key elements for whatever metric they are creating. In addition, it turns out that the server names being emitted by the applications follow a naming convention – “serverName@hostName”, which I could parse into separate keys for host and server…

Hmm, let’s see how we can modify the Source class to help us out… While we’re at it , let’s also add some code that formats a source into a graphite-style series name:

case class Source(kind : String, keys: (String,String)*)(tags: (String,String)*) {

   def + (tuple: Tuple2[String,String]):Source = 
         Source(kind, (keys.toArray ++ Array(tuple)):_*)(tags:_*)

   def asList = List(kind) ++ keys.unzip._2
   def seriesName(sep: String) = asList.mkString(sep)
}

The definition for + is a bit tricky, but only because the Source constructor takes a variable number of arguments for the keys. After we concatenate the extra tuple to the existing keys, we need to pass it into the Source constructor as a vararg. This is done by the funny :_* syntax.

The asList function collects the kinds followed by all the values of the keys. seriesName simply turns this list into a string with each element separated from the next by the given separator string.

Now we can do really cool stuff like this:

scala> val s = Source("system", "fooServer@barHost")
s: Source = (Source(system,WrappedArray((host,barHost), (server,fooServer))) 
scala> val s2 = s + ("a"->"b")
s2: Source = (Source(system,WrappedArray((host,barHost), (server,fooServer), (a,b)))
scala> s2.seriesName
res0: String = "barHost.fooServer.b"

Wait a minute, what happened there on the first line? That’s not a valid syntax to construct a Source! Or is it? And how did it magically separate the host and server parts? The magic here is a new apply function in the companion object for Source that parses the given server name and creates the Source using the parsed host and server pieces:

object Source {
   val ServerPattern = "([^@]*)@(.*)".r

   def apply(kind: String, serverName: String):Source = serverName match {
      case ServerPattern(server, host) => Source(kind, "host"->host, "server"->server)()
      case _ => Source(kind, "server"->serverName)()
   }
}

I think the pattern matching on the regular expression is amazing. There is a good bit of compiler checking going on here as well; Your regular expression MUST define at least one group and the number of parameters specified in the case MUST match the number of groups and the compiler will flag an issue if you get this wrong! How cool is that?

I love scala!

May 10 14

CEP using Akka Streams

by Frank Sauer

Ever since I took the Principles of Reactive Programming class on Coursera and learned about Rx I’ve had this lingering thought that streams are a perfect fit for CEP. Not really a surprise given that one of the commercial CEP products is TIBCO StreamBase… After the recent announcement of Akka Streams and watching Roland Kuhn’s webinar on it, I decided to give this a try and see what it would take to implement the same  trading algorithm I blogged about before but this time using Akka Streams. Some of the reasons to try this are:

  • Benefit from the expressiveness and type safety of the Scala language to implement the rules
  • Benefit from the inherent scalability of the Akka platform to multiple cores and even distributed clusters
  • At some point this can probably also benefit from Akka persistence to get high availability

Akka Streams is the Typesafe  implementation of the draft Reactive Streams specification, a standard for asynchronous stream processing with non-blocking backpressure. Akka Streams is interoperable with other Reactive Streams implementations. It is implemented as a fluent DSL on top of an actor based execution model. The DSL allows for the creation, filtering, splitting, and merging of streams, all operations crucial to express CEP-like rules “normally” expressed using a SQL -like language.

Let’s see what our algorithm looks like using this DSL. First we start out with the same data as in the previous posts, but we wrap it with a Flow:

  val prices = Array(
    Price("BP", 7.61), Price("RDSA", 2101.00), Price("RDSA", 2209.00),
    Price("BP",7.66), Price("BP", 7.64), Price("BP", 7.67)
  )

  Flow(prices.toVector).

The first step is to split this stream by symbol. The result will be new streams, one per symbol. The groupBy transforms the Flow[Price] to Flow[(String,Producer[Price])]. Each of the streams has to be processed as a new Flow:

 Flow(prices.toVector).
    groupBy(_.symbol).
    foreach {
       case (symbol, producer) => Flow(producer)
          ...
    }

Now that we have multiple streams with equal symbols  we can start grouping them into chunks to calculate the average. The DSL function to do this is called grouped. It transforms the Flow[Price] into Flow[Seq[Price]]:

 Flow(prices.toVector).
    groupBy(_.symbol).
    foreach {
       case (symbol, producer) => 
          Flow(producer).
          grouped(WINDOWSIZE).
          ...
    }

now that we have chunks we can calculate the average and implement our rule. The next step filters these chunks so only those matching our rule (upward trending price) will continue in the flow:

 Flow(prices.toVector).
    groupBy(_.symbol).
    foreach {
       case (symbol, producer) => 
          Flow(producer).
          grouped(WINDOWSIZE).
          filter(window => window.size == WINDOWSIZE && avg(window) > window.head.price).
    }

The first condition throws away incomplete windows and the second compares the average with the first element to determine the trend. Almost there! Now we can generate our BUY orders for everything left and send them to market (well, the console actually). The map function transforms the Flow[Seq[Price]] we still have here to a Flow[Buy]:

 Flow(prices.toVector).
    groupBy(_.symbol).
    foreach {
       case (symbol, producer) => 
          Flow(producer).
          grouped(WINDOWSIZE).
          filter(window => window.size == WINDOWSIZE && avg(window) > window.head.price).
          map(window => Buy(symbol, window.reverse.head.price, ORDERSIZE)).
          foreach(buy => println(s"generated a buy:  $buy"))
    }

There is only one crucially important step left to do. We need to initiate the consumption of data from the flow, otherwise nothing will happen:

 Flow(prices.toVector).
    groupBy(_.symbol).
    foreach {
       case (symbol, producer) => 
          Flow(producer).
          grouped(WINDOWSIZE).
          filter(window => window.size == WINDOWSIZE && avg(window) > window.head.price).
          map(window => Buy(symbol, window.reverse.head.price, ORDERSIZE)).
          foreach(buy => println(s"generated a buy:  $buy")).
          consume(materializer)
    }

The materializer passed to the consume method is a FlowMaterializer, and in this sample it is created using FlowMaterializer(MaterializerSettings()). The FlowMaterializer controls how the transformation steps defined by the flow are mapped to the underlying actor model. Materializer settings include things like buffer sizes and fanout.

OK, I lied, we’re not quite done yet. We also need to trigger our initial flow to start consuming data! The consume shown above only consumes data from each stream created by the groupBy. Below is the entire sample, including the last step to consume from the outer flow and shut down the akka system when done:

object StreamCEP extends App {

  case class Price(symbol: String, price: Double)
  case class Buy(symbol: String, price: Double, amount: Long)
  case class Sell(symbol: String, price: Double, amount: Long)

  implicit val system = ActorSystem("Sys")
  val materializer = FlowMaterializer(MaterializerSettings())

  val prices = Array(
    Price("BP", 7.61), Price("RDSA", 2101.00), Price("RDSA", 2209.00),
    Price("BP",7.66), Price("BP", 7.64), Price("BP", 7.67)
  )

  val WINDOWSIZE = 4
  val ORDERSIZE=1000

  def sum(prices: Seq[Price]):Double = prices.foldLeft(0.0)((partial, price) => partial + price.price)
  def avg(prices: Seq[Price]):Double = sum(prices) / prices.size

  // just use the test data from above, could be an actual (infinite) ticker stream
  Flow(prices.toVector).
    // group by symbol, splitting the stream into separate streams, one per symbol
    groupBy(_.symbol).
    // process them all
    foreach {
       case (symbol, producer) =>
          Flow(producer).
            // chunk the stream by the window size
            grouped(WINDOWSIZE).
            // the actual rule! only let through each complete window for which avg > first
            filter(window => window.size == WINDOWSIZE && avg(window) > window.head.price).
            // for those that pass, generate a Buy
            map(window => Buy(symbol, window.reverse.head.price, ORDERSIZE)).
            // send to market - for now just print them :-)
            foreach(buy => println(s"generated a buy:  $buy")).
            consume(materializer)
    }.
    onComplete(materializer) { _ => system.shutdown()}
}

I must say I’m pretty impressed already! Can’t wait to see this technology mature. The example works like a charm (Check out the activator template from the previous post, it should have been added to that automatically). Can’t wait to try this out on a real stream and run it on a large akka cluster, that should be fun!

P.S. When you run this you’ll notice some dead-letter messages from Akka on shutdown. I don’t know what causes this yet. If anybody knows, please let me know.

UPDATE

Somebody on twitter asked me how this compares to Esper and that got me thinking. Depending on your definition of CEP, the above may or may not qualify as true CEP… There is no notion of time involved in the above example, no events entering and leaving windows of various kinds, as is the case in Esper-land. For example, in an earlier post  this algorithm was implemented by forking the incoming stream into a delayed stream and an average stream, which were then later joined. The delayed stream was used to compare the oldest element of the window with the average of that window. As of now I see no way to do things like that using Akka streams. Not sure if this can be done on top of it or if it would require an internal change to the core implementation. I’m thinking of a delay(n:Int):Flow[T] primitive which would delay the Flow[T] by n Ts, so in effect representing a delay queue of size n. Perhaps this can be expressed in terms of the general transform primitive since that can handle state.

Another prime example of CEP involves checking for patterns of events over time, including the absence of events! This requires one event triggering a timer which in and of itself triggers another event when it expires, and then depending on what happens first, other events may get generated. Again, not sure how to map this idea to Akka Stream as of yet.

Mar 29 14

New activator template

by Frank Sauer

The code described in the previous 3 posts is now available as a typesafe activator template.

Contributing templates to the activator is remarkably simple and the integration of the tutorial HTML with the code browser is VERY cool!

Check it out here: http://typesafe.com/activator/template/akka-with-esper

 

Mar 15 14

Embedding Esper as an Akka Actor

by Frank Sauer

In the previous two posts I explored the idea of using Esper to route messages inside an Akka event bus. I briefly mentioned I thought about wrapping an Esper engine inside an Actor but that an event bus might be a better match. In this post I will explore if that was a good design decision or not. So the goal for this post is to explore wrapping an Esper engine inside a regular Akka actor, which then in turn can participate in any Event bus it wants.

When wrapping an Esper engine inside an actor we want to be able to configure that actor by registering event types and install EPL statements and once configured, we want that actor to route incoming messages into the embedded esper engine. The first design question we need to answer is whether we configure the Esper actor by extension or dynamically by sending messages to it. The second form seems far more flexible, so let’s explore that.  The second question is, once the EPL statements fire and produce events, where do they go?

Before  getting into details I need to talk about some refactorings to the code of the earlier posts in order to reuse some of it for this new Esper Actor.

Sidebar: Refactoring the Esper Engine into its own trait

In the previous posts, the Esper engine was embedded directly into the EsperClassification trait. That’s unfortunate because that way we cannot reuse the same code in our new  Actor. Let’s fix that first. To allow this code to be reused in both the EsperClassification trait which applies only to event busses as well as an individual actor, we extract a minimal set of Esper engine specific code into its own trait and mix that into both the EsperClassification trait and our new Actor. Here is the result:

case class EsperEvent(eventType: String, underlying: AnyRef)

trait EsperEngine {

  val esperConfig = new Configuration()

  // these are lazy so esperConfig can be configured before using it
  lazy val epService = EPServiceProviderManager.getDefaultProvider(esperConfig)
  lazy val epRuntime = epService.getEPRuntime

  def registerEventType(name:String, clz: Class[_ <: Any]) {
    esperConfig.addEventType(name, clz.getName)
  }

  def insertEvent(evt:Any) {
    epRuntime.sendEvent(evt)
  }

  def createEPL(epl:String)(notifySubscribers: EsperEvent=>Unit):Try[EPStatement] = {
    try {
      val stat = epService.getEPAdministrator.createEPL(epl)
      stat.addListener(new UpdateListener() {
        override def update(newEvents: Array[EventBean], oldEvents: Array[EventBean]) {
          newEvents foreach (evt => notifySubscribers(EsperEvent(evt.getEventType.getName, evt.getUnderlying)))
        }
      })
      Success(stat)
    } catch {
        case x: EPException => Failure(x)
    }
  }
}

A few things to note here:

  • The  epService and epRuntime are lazy again to allow for an opportunity to configure the Esper engine before the creating the runtime
  • publishEvent was renamed to insertEvent because I thought publishEvent was a confusing name since it consumes an event as opposed to emitting one.
  • insertEvent now takes an AnyRef. The Esper Engine itself accepts any object as events and I see no valid reason to restrict that here. The event bus from the previous posts can still use the Union types to further restrict the events entering the bus before they get to this more generic layer.
  • createEPL now returns a Try[EPStatement]. That way it can return the created EPLStatement if it was successfully created or the error if it was not. Previously the exception was simply logged but not handled or propagated.

Now we can mix an Esper engine into both our existing event bus and our new actor. Check out the refactored code on github, where you’ll also find a refactored version of EsperModule. 

EsperActor

Using our new EsperEngine trait it’s pretty simple to write our new actor wrapper around the engine and it’s pretty tiny too:

package experiments.esperakka

import akka.actor.{ActorRef, Actor}

object EsperActor {
  case object StartProcessing
  // register the given class with the given name with the esper engine
  case class RegisterEventType(name:String, clz: Class[_ <: Any])
  // not all statements will require listeners, they could simply be inserting into other streams
  case class DeployStatement(epl:String, listener: Option[ActorRef])
  // deploy an entire Esper module, send results to the given listeners keyed by event type
  case class DeployModule(text: String, listeners: Map[String,ActorRef])
}

class EsperActor extends Actor with EsperEngine with EsperModule {
  import experiments.esperakka.EsperActor._

  override def receive = {
    case RegisterEventType(name, clz) => esperConfig.addEventType(name, clz.getName)
    case DeployStatement(epl, listener) => createEPL(epl)(evt => listener map ( l => l ! evt))
    case DeployModule(text, listeners) => installModule(text) { evt => listeners.get(evt.eventType) map (_ ! evt)}
    case StartProcessing => context.become(dispatchingToEsper)
  }

  private def dispatchingToEsper():Receive = {
    case evt@_ => epRuntime.sendEvent(evt)
  }
}

This is the entire EsperActor implementation… In its initial state it answers 3 different messages:

  • RegisterEventType(String,Class) will register the given type with the given name with the embedded Esper engine
  • DeployStatement(String,Option[ActorRef]) will install the EPL rule into the engine and when the rule produces events it will send them to the given listener if one was supplied
  • DeployModule(String,Map[String,ActorRef]) will deploy the given text as an EPL module and match listeners with the named statements in the module
  • StartProcessing will move the actor into its next state in which all incoming messages are inserted into the Esper engine

Here’s an example using it (with the EPL from the module in the previous post):

object EsperActorExample extends App {

  val windowSize=4
  val orderSize=1000

  val system = ActorSystem()
  val esperActor = system.actorOf(Props(classOf[EsperActor]))
  val buyer = system.actorOf(Props(classOf[BuyingActor]))
  val debugger = system.actorOf(Props(classOf[Debugger]))

  val statement1 = s"""
    insert rstream into Delayed
      select rstream symbol,price
      from Price.std:groupwin(symbol).win:length(${windowSize-1})
  """
  val statement2 = s"""
    insert into Averages
      select symbol,avg(price) as price
      from Price.std:groupwin(symbol).win:length_batch($windowSize) group by symbol
  """

  val statement3 = s"""
     insert into Buy
      select p.symbol, p.price, $orderSize as amount
      from Price.std:unique(symbol) p
      join Delayed.std:unique(symbol) d on d.symbol = p.symbol
      join Averages a unidirectional on a.symbol = p.symbol
      where a.price > d.price
    """

  esperActor ! RegisterEventType("Price", classOf[Price])
  esperActor ! RegisterEventType("Buy", classOf[Buy])

  esperActor ! DeployStatement(statement1 , None)
  esperActor ! DeployStatement(statement2, None)
  esperActor ! DeployStatement(statement3, Some(buyer))

  esperActor ! StartProcessing

  val prices = Array(
    Price("BP", 7.61), Price("RDSA", 2101.00), Price("RDSA", 2209.00),
    Price("BP",7.66), Price("BP", 7.64), Price("BP", 7.67)
  )

  prices foreach (esperActor ! _)
}

when running this sample, we’ll see the same output as before from the BuyingActor:

Buyer got a new order: 1000 BP @ $7.67

Instead of deploying individual statements as in the above example, you can also deploy an entire Esper module:

   esperActor ! RegisterEventType("Price", classOf[Price])
   esperActor ! RegisterEventType("Buy", classOf[Buy])

   esperActor ! DeployModule(s"""
      module SimpleAverageTrader;

      @Name("Delayed")
      insert rstream into Delayed
      select rstream symbol,price
      from Price.std:groupwin(symbol).win:length(${windowSize-1});

      @Name("Averages")
      insert into Averages
      select symbol,avg(price) as price
      from Price.std:groupwin(symbol).win:length_batch($windowSize) group by symbol;

      @Name("Buy")
      insert into Buy
      select p.symbol, p.price, $orderSize as amount
      from Price.std:unique(symbol) p
      join Delayed.std:unique(symbol) d on d.symbol = p.symbol
      join Averages a unidirectional on a.symbol = p.symbol
      where a.price > d.price;
    """, Map[String,ActorRef]("Buy" -> buyer, "Delayed" -> debugger))

  esperActor ! StartProcessing

Conclusions

The first thing I (re-)learned is that traits should be small and very, very specific. Embedding what is now in EsperEngine into EsperClassification as shown in the earlier posts was conflating concerns and a mistake. Extracting the EsperEngine trait made writing the EsperActor incredibly easy.

The EsperActor as written is completely generic. It accepts messages to register event types and to deploy EPL statements or modules and requires no extension to be fully functional. This is a  different approach than the event bus explored in previous posts since those were configured with types and EPL by extending them. The EsperActor does not require an event bus at all to function, it’s just a regular actor, and like any actor it can still participate in an architecture with an event bus. For example, on the inbound side you could subscribe the EsperActor with an event bus to receive the events to be pumped through the EPL rules. On the outbound side, you could modify the EsperActor to accept an event bus instead of a Map with listeners. It could then send all the events coming out of the Esper engine to the outbound event bus.

Given the flexibility of the EsperActor in that it is useful with or without an event bus, I think I prefer this approach over the event bus specific solution described in the previous posts. As always, all code is available on github.

Feb 24 14

Improved Esper EventBus

by Frank Sauer

In my previous post on integrating  Esper into an Akka event bus, I pointed out two issues I wasn’t too happy with:

  1. The calls to registerEventType
  2. The sample EPL  could have been better

I’m happy to report that there has been some progress on both issues:

Reflecting on union types

I found a partial solution to this issue. The EsperClassification trait now registers the types at the right time:

  
  def esperEventTypes:Union[EsperEvents] // abstract! should be concrete val = new Union[EsperEvents]

  val esperConfig = new Configuration()

  // types are now automagically registered when we need them to, but it would be even cooler
  // if we did not have to ask the application programmer to instantiate the Union... 
  // missing TypeTags if we attempt to do it here
  esperEventTypes.typeMembers() foreach(t => registerEventType(t.typeSymbol.name.toString, m.runtimeClass(t)))

  // these now no longer have to be lazy, since types are guaranteed to be registered at this point
  val epService = EPServiceProviderManager.getDefaultProvider(esperConfig)
  val epRuntime = epService.getEPRuntime

Now there are no more assumptions that applications do this in the right order (before creating EPL statements or submitting events). It does this by reflecting on the Union type provided by esperEventTypes. This is an abstract method returning a value of type Union[EsperEvents]. It is totally silly that actual event busses should have to override this since the implementation will always be the same:

override def esperEventTypes = new Union[EsperEvents]

The issue is that implementing this in the EsperClassification trait itself does not work. The instantiation of Union requires an implicit TypeTag and I can’t seem to figure out how to provide it. It just works when doing it in the subclasses – without doing anything out of the ordinary.

So still not ideal, but much better than explicitly having to register all the types manually.

Better EPL for the sample algorithm

The pattern based EPL was not the best way to implement the simple average algorithm to generate the Buy orders. The rule using pattern matching was hard coded to a window of length 4, and it also did not use the built-in avg function to calculate the average. Esper can do much better, and here’s the result:

val windowSize = 4
val orderSize = 1000

// this will delay the Price stream by windowSize - 1: 
// the price at position (latest - windowSize) will fall out of the window into the Delayed stream
epl(s"insert rstream into Delayed select rstream symbol,price from Price.std:groupwin(symbol).win:length(${windowSize-1})")

// after every windowSize prices for a symbol, the average is inserted into the Averages stream
epl(s"insert into Averages select symbol,avg(price) as price from Price.std:groupwin(symbol).win:length_batch($windowSize) group by symbol")

// the join is only triggered by a new average (since it has the unidrectional keyword), 
// which (see above) is only generated after a complete window for a symbol has been seen
epl(
    s"""
      insert into Buy
      select p.symbol, p.price, $orderSize as amount
      from Price.std:unique(symbol) p
      join Delayed.std:unique(symbol) d on d.symbol = p.symbol
      join Averages a unidirectional on a.symbol = p.symbol
      where a.price > d.price
    """)

This solution allows for variable size windows and it also nicely demonstrates the windowing and stream joining features of  Esper. For more details on these features see the Esper EPL reference.

Next steps

Next I will look into simplifying the subscription API by removing support for the “removed/*” topics. It turns out that this isn’t really needed since Esper will only ever pass data in the oldEvents parameter of the updateListener when the keyword “irstream” is used  in the select clause, and I can’t remember the last time I’ve seen an example of that… Stop me now if you need this :-)