Skip to content

Embedding Esper as an Akka Actor

by Frank Sauer on March 15th, 2014

In the previous two posts I explored the idea of using Esper to route messages inside an Akka event bus. I briefly mentioned I thought about wrapping an Esper engine inside an Actor but that an event bus might be a better match. In this post I will explore if that was a good design decision or not. So the goal for this post is to explore wrapping an Esper engine inside a regular Akka actor, which then in turn can participate in any Event bus it wants.

When wrapping an Esper engine inside an actor we want to be able to configure that actor by registering event types and install EPL statements and once configured, we want that actor to route incoming messages into the embedded esper engine. The first design question we need to answer is whether we configure the Esper actor by extension or dynamically by sending messages to it. The second form seems far more flexible, so let’s explore that.  The second question is, once the EPL statements fire and produce events, where do they go?

Before  getting into details I need to talk about some refactorings to the code of the earlier posts in order to reuse some of it for this new Esper Actor.

Sidebar: Refactoring the Esper Engine into its own trait

In the previous posts, the Esper engine was embedded directly into the EsperClassification trait. That’s unfortunate because that way we cannot reuse the same code in our new  Actor. Let’s fix that first. To allow this code to be reused in both the EsperClassification trait which applies only to event busses as well as an individual actor, we extract a minimal set of Esper engine specific code into its own trait and mix that into both the EsperClassification trait and our new Actor. Here is the result:

case class EsperEvent(eventType: String, underlying: AnyRef)

trait EsperEngine {

  val esperConfig = new Configuration()

  // these are lazy so esperConfig can be configured before using it
  lazy val epService = EPServiceProviderManager.getDefaultProvider(esperConfig)
  lazy val epRuntime = epService.getEPRuntime

  def registerEventType(name:String, clz: Class[_ <: Any]) {
    esperConfig.addEventType(name, clz.getName)
  }

  def insertEvent(evt:Any) {
    epRuntime.sendEvent(evt)
  }

  def createEPL(epl:String)(notifySubscribers: EsperEvent=>Unit):Try[EPStatement] = {
    try {
      val stat = epService.getEPAdministrator.createEPL(epl)
      stat.addListener(new UpdateListener() {
        override def update(newEvents: Array[EventBean], oldEvents: Array[EventBean]) {
          newEvents foreach (evt => notifySubscribers(EsperEvent(evt.getEventType.getName, evt.getUnderlying)))
        }
      })
      Success(stat)
    } catch {
        case x: EPException => Failure(x)
    }
  }
}

A few things to note here:

  • The  epService and epRuntime are lazy again to allow for an opportunity to configure the Esper engine before the creating the runtime
  • publishEvent was renamed to insertEvent because I thought publishEvent was a confusing name since it consumes an event as opposed to emitting one.
  • insertEvent now takes an AnyRef. The Esper Engine itself accepts any object as events and I see no valid reason to restrict that here. The event bus from the previous posts can still use the Union types to further restrict the events entering the bus before they get to this more generic layer.
  • createEPL now returns a Try[EPStatement]. That way it can return the created EPLStatement if it was successfully created or the error if it was not. Previously the exception was simply logged but not handled or propagated.

Now we can mix an Esper engine into both our existing event bus and our new actor. Check out the refactored code on github, where you’ll also find a refactored version of EsperModule. 

EsperActor

Using our new EsperEngine trait it’s pretty simple to write our new actor wrapper around the engine and it’s pretty tiny too:

package experiments.esperakka

import akka.actor.{ActorRef, Actor}

object EsperActor {
  case object StartProcessing
  // register the given class with the given name with the esper engine
  case class RegisterEventType(name:String, clz: Class[_ <: Any])
  // not all statements will require listeners, they could simply be inserting into other streams
  case class DeployStatement(epl:String, listener: Option[ActorRef])
  // deploy an entire Esper module, send results to the given listeners keyed by event type
  case class DeployModule(text: String, listeners: Map[String,ActorRef])
}

class EsperActor extends Actor with EsperEngine with EsperModule {
  import experiments.esperakka.EsperActor._

  override def receive = {
    case RegisterEventType(name, clz) => esperConfig.addEventType(name, clz.getName)
    case DeployStatement(epl, listener) => createEPL(epl)(evt => listener map ( l => l ! evt))
    case DeployModule(text, listeners) => installModule(text) { evt => listeners.get(evt.eventType) map (_ ! evt)}
    case StartProcessing => context.become(dispatchingToEsper)
  }

  private def dispatchingToEsper():Receive = {
    case evt@_ => epRuntime.sendEvent(evt)
  }
}

This is the entire EsperActor implementation… In its initial state it answers 3 different messages:

  • RegisterEventType(String,Class) will register the given type with the given name with the embedded Esper engine
  • DeployStatement(String,Option[ActorRef]) will install the EPL rule into the engine and when the rule produces events it will send them to the given listener if one was supplied
  • DeployModule(String,Map[String,ActorRef]) will deploy the given text as an EPL module and match listeners with the named statements in the module
  • StartProcessing will move the actor into its next state in which all incoming messages are inserted into the Esper engine

Here’s an example using it (with the EPL from the module in the previous post):

object EsperActorExample extends App {

  val windowSize=4
  val orderSize=1000

  val system = ActorSystem()
  val esperActor = system.actorOf(Props(classOf[EsperActor]))
  val buyer = system.actorOf(Props(classOf[BuyingActor]))
  val debugger = system.actorOf(Props(classOf[Debugger]))

  val statement1 = s"""
    insert rstream into Delayed
      select rstream symbol,price
      from Price.std:groupwin(symbol).win:length(${windowSize-1})
  """
  val statement2 = s"""
    insert into Averages
      select symbol,avg(price) as price
      from Price.std:groupwin(symbol).win:length_batch($windowSize) group by symbol
  """

  val statement3 = s"""
     insert into Buy
      select p.symbol, p.price, $orderSize as amount
      from Price.std:unique(symbol) p
      join Delayed.std:unique(symbol) d on d.symbol = p.symbol
      join Averages a unidirectional on a.symbol = p.symbol
      where a.price > d.price
    """

  esperActor ! RegisterEventType("Price", classOf[Price])
  esperActor ! RegisterEventType("Buy", classOf[Buy])

  esperActor ! DeployStatement(statement1 , None)
  esperActor ! DeployStatement(statement2, None)
  esperActor ! DeployStatement(statement3, Some(buyer))

  esperActor ! StartProcessing

  val prices = Array(
    Price("BP", 7.61), Price("RDSA", 2101.00), Price("RDSA", 2209.00),
    Price("BP",7.66), Price("BP", 7.64), Price("BP", 7.67)
  )

  prices foreach (esperActor ! _)
}

when running this sample, we’ll see the same output as before from the BuyingActor:

Buyer got a new order: 1000 BP @ $7.67

Instead of deploying individual statements as in the above example, you can also deploy an entire Esper module:

   esperActor ! RegisterEventType("Price", classOf[Price])
   esperActor ! RegisterEventType("Buy", classOf[Buy])

   esperActor ! DeployModule(s"""
      module SimpleAverageTrader;

      @Name("Delayed")
      insert rstream into Delayed
      select rstream symbol,price
      from Price.std:groupwin(symbol).win:length(${windowSize-1});

      @Name("Averages")
      insert into Averages
      select symbol,avg(price) as price
      from Price.std:groupwin(symbol).win:length_batch($windowSize) group by symbol;

      @Name("Buy")
      insert into Buy
      select p.symbol, p.price, $orderSize as amount
      from Price.std:unique(symbol) p
      join Delayed.std:unique(symbol) d on d.symbol = p.symbol
      join Averages a unidirectional on a.symbol = p.symbol
      where a.price > d.price;
    """, Map[String,ActorRef]("Buy" -> buyer, "Delayed" -> debugger))

  esperActor ! StartProcessing

Conclusions

The first thing I (re-)learned is that traits should be small and very, very specific. Embedding what is now in EsperEngine into EsperClassification as shown in the earlier posts was conflating concerns and a mistake. Extracting the EsperEngine trait made writing the EsperActor incredibly easy.

The EsperActor as written is completely generic. It accepts messages to register event types and to deploy EPL statements or modules and requires no extension to be fully functional. This is a  different approach than the event bus explored in previous posts since those were configured with types and EPL by extending them. The EsperActor does not require an event bus at all to function, it’s just a regular actor, and like any actor it can still participate in an architecture with an event bus. For example, on the inbound side you could subscribe the EsperActor with an event bus to receive the events to be pumped through the EPL rules. On the outbound side, you could modify the EsperActor to accept an event bus instead of a Map with listeners. It could then send all the events coming out of the Esper engine to the outbound event bus.

Given the flexibility of the EsperActor in that it is useful with or without an event bus, I think I prefer this approach over the event bus specific solution described in the previous posts. As always, all code is available on github.

From → programming, scala

2 Comments
  1. Nader Hadji Ghanbari permalink

    Perfect! I came to exactly same idea and I implemented it in a similar way. I guess it is more than 90% similar to yours!, the logic is exactly the same but I created my own iconic DSL around Esper. For example I used * as brain, i.e. esper engine, ? as ear (Listeners), : as eye (Statements), # as mouth (update listeners), and ! for event insertion.
    Now when I want to add a statement I do it like this :
    *===: “SELECT * FROM Ev1″

    I am trying to implement some additional esper io components for accepting events over websockets (Of course still as a scala wrapper layer around the glorious Esper)

Trackbacks & Pingbacks

  1. Akka: DDD and Esper | Tales from a Trading Desk

Comments are closed.