Skip to content

Complex Event Processing with Akka and Esper

by Frank Sauer on February 22nd, 2014

Complex Event Processing (CEP) is a – surprise – complex topic, and this post is not about that. What it is about is one possible way one could integrate a popular open source CEP engine – Esper – into an Akka system of actors. This seemed like a natural fit somehow and I wanted to explore it. My first thought was to integrate Esper as an actor itself, but then it occurred to me that Esper could also easily be thought of as an Event bus, where events entering the bus are internally routed into the CEP engine, and actors subscribing to the bus will receive the events coming out of it as a result of the firing rules in the CEP engine reacting to the input events. This post explores how this can be accomplished.

A good explanation of using an Akka Event bus can be found on Kotan Code. Please read that first before continuing, you’ll need to understand it since the rest of this blog builds on it and I did not feel the need to repeat the basics here.

Before explaining how the integration was accomplished, let’s see how we use the result. The following code creates an esper event bus with one simple rule that generates Buy orders at the latest price if the average of the last 4 prices is larger than the oldest of those 4 prices

(all the code for this post is of course available on github)

case class Price(@BeanProperty symbol: String, @BeanProperty price: Double)
case class Buy(@BeanProperty symbol: String, @BeanProperty price: Double, @BeanProperty amount: Long)
case class Sell(@BeanProperty symbol: String, @BeanProperty price: Double, @BeanProperty amount: Long)

class EsperEventBusExample extends ActorEventBus with EsperClassification {

  type EsperEvents = union[Price] #or [Sell] #or [Buy]

  // you need to register all types BEFORE adding any statements or publishing any events
  registerEventType("Price", classOf[Price])
  registerEventType("Buy", classOf[Buy])
  registerEventType("Sell", classOf[Buy])

  // generate a Buy order for a quantity of 1000 at the newest price,
  // if the average of the last 4 prices is greater than the oldest of these 4 prices
  // This is just one way you could do this in Esper, not necessarily the best way...
  epl(
    """
      insert into Buy
      select a.symbol as symbol, d.price as price, 1000 as amount
      from pattern[every a=Price -> b=Price(symbol=a.symbol) -> c=Price(symbol=a.symbol) -> d=Price(symbol=a.symbol)]
      where (a.price + b.price + c.price + d.price) > 4*a.price
    """)
}

This defines the complete Esper event bus for our example. You have to mix in the EsperClassification trait which has the requirement that you define the type EsperEvents since it is abstract in that trait. This is a design pattern used by all the other Eventbus traits, so I stuck with it. It’s called EsperEvents (plural!) because you want to be able to submit events of more than just one type into the event bus and I did not want to use Any since that’s lame and there is a much cooler way to keep things nicely typed. I’ll get into that weird union stuff later in more detail, it’s totally worth it, trust me…

Of course, an Esper event bus would not be very useful if it did not have some rules, so we add the simple rule here that creates a Buy order if it sees 4 prices for the same symbol such that the average of these 4 prices is larger than the first. The calls to registerEventType are a nuisance but currently a requirement, I’ll explain later. It allows us to use the defined case classes as Esper events. The use of @BeanProperty in these case classes is also unfortunately an Esper related necessity since Esper is really a java framework and using POJOs as events requires them to be beans (oversimplified, see details for other ways to use Java objects that are not beans). For a (very!) detailed explanation of the Esper language (EPL), once again see the Esper docs.

Now that we have defined our bus, let’s use it. First we create an actor and subscribe it to new Buy orders:

class BuyingActor extends Actor {
  def receive = {
    case Buy(sym,price,amt) => println(s"Got a new buy: $amt $sym @ $$$price") // not as expensive as it looks
  }
}

object EsperEventBusApp extends App {
  // set up the event bus and actor(s)
  val system = ActorSystem()
  val evtBus = new EsperEventBusExample
  val buyer = system.actorOf(Props(classOf[BuyingActor]))

  // subscribe to buys
  evtBus.subscribe(buyer, "inserted/Buy")

then we insert some data and see what happens:

val prices = Array(
    Price("BP", 7.61), Price("RDSA", 2101.00), Price("RDSA", 2209.00),
    Price("BP",7.66), Price("BP", 7.64), Price("BP", 7.67)
  )

  // feed in the market data
  prices foreach (evtBus.publishEvent(_))

At this point we’ll see the BuyingActor printing this text:

Got a new buy: 1000 BP @ $7.67

How ?

Before getting into the implementation of the EsperClassification trait, I have to talk about that union business… I can’t hold it back any longer. The scalavro utils library contains a very cool implementation of union types as described by Miles Sabin in this amazing blog post. What this allows us to do is define types as the union of other types, so we can write a function that takes both Strings and Ints as parameter but nothing else. Passing any other type will result in a compile error. Remember I promised an explanation for why I think the calls to registerEventType are a nuisance? It’s because the union library also has a way to reflect on the members of the union types using Union.typeMembers() which I could have used to register the types based strictly on the type definition. However, I kept getting a TypeTag not found error and was unable to resolve that. If anybody knows how to fix that (Pull requests welcomed) I would very much appreciate it!

OK, on to the heart of the beast:

EsperClassification

The EsperClassification trait extends the LookupClassification trait with the Classifier type defined as a String. I also decided that I was only ever going to use this with instances of  ActorEventBus so I require that using a self type. This allowed me to use ActorRef subscribers and send messages to them using !.

abstract trait EsperClassification extends LookupClassification {

  this : ActorEventBus =>

  type EsperEvents

  sealed trait InternalEvent
  case class NewEvent (evt: EventBean) extends InternalEvent
  case class RemovedEvent(evt: EventBean) extends InternalEvent

  type Event = InternalEvent
  type Classifier = String

  val esperConfig = new Configuration()

  lazy val epService = EPServiceProviderManager.getDefaultProvider(esperConfig)
  lazy val epRuntime = epService.getEPRuntime

Here we see how EsperClassification extends LookupClassification which requires a value for the types Event and Classifier. We will be creating String classifiers and as far as the LookupClassification is concerned the (internal) event types are just that, InternalEvent. We’ll get to that in a minute.
The Esper engine is created from a Configuration object which is used for many things, but we will only use it to register event types. The epService and epRuntime are used later to create EPL statements and insert events. The reason they are declared lazy is that we have to register the event types with esperConfig before using it to create the epService and this way we postpone instantiating the epService. This is critical and yet another reason I would like to eliminate the calls to registerEventType.

  /**
   * The topic will be "inserted/<event-type>" or "removed/<event-type>"
   * @param event
   * @return
   */
  protected def classify(event: Event): Classifier = event match {
    case NewEvent(evt) => s"inserted/${evt.getEventType.getName}"
    case RemovedEvent(evt) => s"removed/${evt.getEventType.getName}"
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    event match {
      case NewEvent(evt) => subscriber ! evt.getUnderlying
      case RemovedEvent(evt) => subscriber ! evt.getUnderlying
    }
  }

The classify method is required by the LookupClassification and we use it to construct a topic for the events coming out of the Esper engine. “inserted” or “removed” followed by the name of the event type. The publish method is also a LookupClassification requirement and is used to route the events to the subscribers. In our case we send the underlying event of the Esper EventBean to the subscribers. This will usually be an instance of a case class as in the above example, or a Map if the EPL does a projection that does not map cleanly to (all) the attributes of a defined event type (if they do Esper will create an instance of the right class, as in our example)

The implementation is now complete with respect to LookupClassification, all we have left to do is add EPL statements and get events into the engine:

  

  def publishEvent[T: prove[EsperEvents]#containsType](evt:T) {
    epRuntime.sendEvent(evt)
  }

  def epl(epl: String) {
    def insert(evt: EventBean) = publish(NewEvent(evt))
    def remove(evt: EventBean) = publish(RemovedEvent(evt))
    try {
      val stat = epService.getEPAdministrator.createEPL(epl)
      stat.addListener(new UpdateListener() {
        override def update(newEvents: Array[EventBean], oldEvents: Array[EventBean]) {
          newEvents foreach (insert(_))
          oldEvents foreach (remove(_))
        }
      })
    } catch {
      case x: EPException => println(x.getLocalizedMessage)
    }
  }

Here we see the other aspect of the Union types. The publishEvent function has a pretty impressive type parameter in its formal signature. It roughly translates to ‘Any T that the compiler can prove is contained in EsperEvents is acceptable as a type for evt’.
The epl function takes the rule text and uses it to create a statement using the EPAdministrator obtained from epService. If this works (no EPL compilation errors resulting in an EPException) we register an UpdateListener with the statement. Every time the rule fires because it matches entering or leaving events in some stream, this listener gets invoked with these arriving and leaving events. We insert the new ones as instances of NewEvent and the old ones as instances of RemovedEvent. Remember? These are the internalEvent cases registered as the Event type with LookupClassification. There is one little gotcha here resulting in the insert and remove functions. From within the (java) callback the compiler (or more likely me) was somehow unable to find the publish method. These little helper functions solved that.

That’s it!

We now have an Akka event bus with an embedded Esper engine routing events to subscribers using EPL statements that can get as complicated as you like and trust me you can write some seriously complicated EPL to do some very cool pattern matching on live event streams.

From → programming, scala

2 Comments
  1. Chris Toomey permalink

    Thanks for this blog post, turns out we’re actually exploring this very topic as an option for replacing a very expensive commercial CEP system that we inherited with an externally developed system. This definitely looks like a promising solution.

Trackbacks & Pingbacks

  1. Improved Esper EventBus | Frankly Sauer

Comments are closed.