Skip to content
Feb 22 14

Complex Event Processing with Akka and Esper

by Frank Sauer

Complex Event Processing (CEP) is a – surprise – complex topic, and this post is not about that. What it is about is one possible way one could integrate a popular open source CEP engine – Esper – into an Akka system of actors. This seemed like a natural fit somehow and I wanted to explore it. My first thought was to integrate Esper as an actor itself, but then it occurred to me that Esper could also easily be thought of as an Event bus, where events entering the bus are internally routed into the CEP engine, and actors subscribing to the bus will receive the events coming out of it as a result of the firing rules in the CEP engine reacting to the input events. This post explores how this can be accomplished.

A good explanation of using an Akka Event bus can be found on Kotan Code. Please read that first before continuing, you’ll need to understand it since the rest of this blog builds on it and I did not feel the need to repeat the basics here.

Before explaining how the integration was accomplished, let’s see how we use the result. The following code creates an esper event bus with one simple rule that generates Buy orders at the latest price if the average of the last 4 prices is larger than the oldest of those 4 prices

(all the code for this post is of course available on github)

case class Price(@BeanProperty symbol: String, @BeanProperty price: Double)
case class Buy(@BeanProperty symbol: String, @BeanProperty price: Double, @BeanProperty amount: Long)
case class Sell(@BeanProperty symbol: String, @BeanProperty price: Double, @BeanProperty amount: Long)

class EsperEventBusExample extends ActorEventBus with EsperClassification {

  type EsperEvents = union[Price] #or [Sell] #or [Buy]

  // you need to register all types BEFORE adding any statements or publishing any events
  registerEventType("Price", classOf[Price])
  registerEventType("Buy", classOf[Buy])
  registerEventType("Sell", classOf[Buy])

  // generate a Buy order for a quantity of 1000 at the newest price,
  // if the average of the last 4 prices is greater than the oldest of these 4 prices
  // This is just one way you could do this in Esper, not necessarily the best way...
  epl(
    """
      insert into Buy
      select a.symbol as symbol, d.price as price, 1000 as amount
      from pattern[every a=Price -> b=Price(symbol=a.symbol) -> c=Price(symbol=a.symbol) -> d=Price(symbol=a.symbol)]
      where (a.price + b.price + c.price + d.price) > 4*a.price
    """)
}

This defines the complete Esper event bus for our example. You have to mix in the EsperClassification trait which has the requirement that you define the type EsperEvents since it is abstract in that trait. This is a design pattern used by all the other Eventbus traits, so I stuck with it. It’s called EsperEvents (plural!) because you want to be able to submit events of more than just one type into the event bus and I did not want to use Any since that’s lame and there is a much cooler way to keep things nicely typed. I’ll get into that weird union stuff later in more detail, it’s totally worth it, trust me…

Of course, an Esper event bus would not be very useful if it did not have some rules, so we add the simple rule here that creates a Buy order if it sees 4 prices for the same symbol such that the average of these 4 prices is larger than the first. The calls to registerEventType are a nuisance but currently a requirement, I’ll explain later. It allows us to use the defined case classes as Esper events. The use of @BeanProperty in these case classes is also unfortunately an Esper related necessity since Esper is really a java framework and using POJOs as events requires them to be beans (oversimplified, see details for other ways to use Java objects that are not beans). For a (very!) detailed explanation of the Esper language (EPL), once again see the Esper docs.

Now that we have defined our bus, let’s use it. First we create an actor and subscribe it to new Buy orders:

class BuyingActor extends Actor {
  def receive = {
    case Buy(sym,price,amt) => println(s"Got a new buy: $amt $sym @ $$$price") // not as expensive as it looks
  }
}

object EsperEventBusApp extends App {
  // set up the event bus and actor(s)
  val system = ActorSystem()
  val evtBus = new EsperEventBusExample
  val buyer = system.actorOf(Props(classOf[BuyingActor]))

  // subscribe to buys
  evtBus.subscribe(buyer, "inserted/Buy")

then we insert some data and see what happens:

val prices = Array(
    Price("BP", 7.61), Price("RDSA", 2101.00), Price("RDSA", 2209.00),
    Price("BP",7.66), Price("BP", 7.64), Price("BP", 7.67)
  )

  // feed in the market data
  prices foreach (evtBus.publishEvent(_))

At this point we’ll see the BuyingActor printing this text:

Got a new buy: 1000 BP @ $7.67

How ?

Before getting into the implementation of the EsperClassification trait, I have to talk about that union business… I can’t hold it back any longer. The scalavro utils library contains a very cool implementation of union types as described by Miles Sabin in this amazing blog post. What this allows us to do is define types as the union of other types, so we can write a function that takes both Strings and Ints as parameter but nothing else. Passing any other type will result in a compile error. Remember I promised an explanation for why I think the calls to registerEventType are a nuisance? It’s because the union library also has a way to reflect on the members of the union types using Union.typeMembers() which I could have used to register the types based strictly on the type definition. However, I kept getting a TypeTag not found error and was unable to resolve that. If anybody knows how to fix that (Pull requests welcomed) I would very much appreciate it!

OK, on to the heart of the beast:

EsperClassification

The EsperClassification trait extends the LookupClassification trait with the Classifier type defined as a String. I also decided that I was only ever going to use this with instances of  ActorEventBus so I require that using a self type. This allowed me to use ActorRef subscribers and send messages to them using !.

abstract trait EsperClassification extends LookupClassification {

  this : ActorEventBus =>

  type EsperEvents

  sealed trait InternalEvent
  case class NewEvent (evt: EventBean) extends InternalEvent
  case class RemovedEvent(evt: EventBean) extends InternalEvent

  type Event = InternalEvent
  type Classifier = String

  val esperConfig = new Configuration()

  lazy val epService = EPServiceProviderManager.getDefaultProvider(esperConfig)
  lazy val epRuntime = epService.getEPRuntime

Here we see how EsperClassification extends LookupClassification which requires a value for the types Event and Classifier. We will be creating String classifiers and as far as the LookupClassification is concerned the (internal) event types are just that, InternalEvent. We’ll get to that in a minute.
The Esper engine is created from a Configuration object which is used for many things, but we will only use it to register event types. The epService and epRuntime are used later to create EPL statements and insert events. The reason they are declared lazy is that we have to register the event types with esperConfig before using it to create the epService and this way we postpone instantiating the epService. This is critical and yet another reason I would like to eliminate the calls to registerEventType.

  /**
   * The topic will be "inserted/<event-type>" or "removed/<event-type>"
   * @param event
   * @return
   */
  protected def classify(event: Event): Classifier = event match {
    case NewEvent(evt) => s"inserted/${evt.getEventType.getName}"
    case RemovedEvent(evt) => s"removed/${evt.getEventType.getName}"
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    event match {
      case NewEvent(evt) => subscriber ! evt.getUnderlying
      case RemovedEvent(evt) => subscriber ! evt.getUnderlying
    }
  }

The classify method is required by the LookupClassification and we use it to construct a topic for the events coming out of the Esper engine. “inserted” or “removed” followed by the name of the event type. The publish method is also a LookupClassification requirement and is used to route the events to the subscribers. In our case we send the underlying event of the Esper EventBean to the subscribers. This will usually be an instance of a case class as in the above example, or a Map if the EPL does a projection that does not map cleanly to (all) the attributes of a defined event type (if they do Esper will create an instance of the right class, as in our example)

The implementation is now complete with respect to LookupClassification, all we have left to do is add EPL statements and get events into the engine:

  

  def publishEvent[T: prove[EsperEvents]#containsType](evt:T) {
    epRuntime.sendEvent(evt)
  }

  def epl(epl: String) {
    def insert(evt: EventBean) = publish(NewEvent(evt))
    def remove(evt: EventBean) = publish(RemovedEvent(evt))
    try {
      val stat = epService.getEPAdministrator.createEPL(epl)
      stat.addListener(new UpdateListener() {
        override def update(newEvents: Array[EventBean], oldEvents: Array[EventBean]) {
          newEvents foreach (insert(_))
          oldEvents foreach (remove(_))
        }
      })
    } catch {
      case x: EPException => println(x.getLocalizedMessage)
    }
  }

Here we see the other aspect of the Union types. The publishEvent function has a pretty impressive type parameter in its formal signature. It roughly translates to ‘Any T that the compiler can prove is contained in EsperEvents is acceptable as a type for evt’.
The epl function takes the rule text and uses it to create a statement using the EPAdministrator obtained from epService. If this works (no EPL compilation errors resulting in an EPException) we register an UpdateListener with the statement. Every time the rule fires because it matches entering or leaving events in some stream, this listener gets invoked with these arriving and leaving events. We insert the new ones as instances of NewEvent and the old ones as instances of RemovedEvent. Remember? These are the internalEvent cases registered as the Event type with LookupClassification. There is one little gotcha here resulting in the insert and remove functions. From within the (java) callback the compiler (or more likely me) was somehow unable to find the publish method. These little helper functions solved that.

That’s it!

We now have an Akka event bus with an embedded Esper engine routing events to subscribers using EPL statements that can get as complicated as you like and trust me you can write some seriously complicated EPL to do some very cool pattern matching on live event streams.

Feb 15 14

Mock your Cake and eat it too

by Frank Sauer

In this post I’ll examine the use of the Cake Pattern in my Scala client for InfluxDB. When I started this project I wanted the ability to vary the two core dependencies for this code – an http client and a json parsing library – depending on the environment in which the client is being used. For example, in a standalone environment, you might want to use the asynchronous AsyncHttpClient and json4s library, but in the context of a Play! framework application, you probably want to use its provided WS service to make the REST calls and json parsing macros to process the results. I had read about the Cake pattern and decided to give it a try. This turned out very well and as an added bonus also solved the age old question about how to unit test a database client without actually connecting to a real database.

Declaring the dependencies

All communication with InfluxDB occurs via http calls to a REST service. Both the requests (well, some of them) and responses will contain JSON content. This means that our InfluxDB client needs ways to communicate over http and parse JSON. The first step in using the cake pattern is declaring your dependencies. In our case we want to state that we need a Http service and a Json converter. What these are we’ll see next, but first  here’s how you use self types in the main InfluxDB client to indicate that it requires both a HttpServiceComponent as well as  JsonConverterComponent:

class InfluxDB(hostName: String, port: Int, user: String, pwd: String, db: String) {
  // require a HTTPServiceComponent and a JsonConverterComponent
  self: HTTPServiceComponent with JsonConverterComponent =>

What this means is that ultimately, when an instance of InfluxDB is materialized, it is guaranteed that it will also contain implementations of the HttpServiceComponent trait and JsonConverterComponent trait. The crux of the Cake pattern is that these are abstract traits that can be implemented many different ways and that the mixing is deferred as long as possible. Let’s take a look at the HttpServiceComponent trait first.

HTTPServiceComponent

abstract trait HTTPServiceComponent {

  val httpService: HTTPService // abstract; implementations must provide a value

  trait HTTPService {

    def GET(url: String): Future[String]
    def POST(url: String, body: String, contentType: String): Future[Unit]
    def PUT(url: String, body: String, contentType: String): Future[Unit]
    def DELETE(url: String): Future[Unit]
  }
}

The nested trait is what we’d expect to see for a http service even without baking it into a Cake. Please note that this is not a fully generic http service interface, it is strictly the functionality needed by the InfluxDB client. A generic http client would not have a Future[Unit] return type for POST. It just so happens that all POST requests made by the InfluxDB client don’t actually return anything interesting. The mechanics of Scala’s Future are sufficient to let us deal with failure situations.

The more interesting aspect is what happens in the outer HttpServiceComponent trait. It has an abstract val httpService of type HTTPService, the inner trait. Because the InfluxDB class was defined with the self types as shown above, we can use this val in the InfluxDB client code, even though it is abstract here. The fact that it is abstract simply means that ultimately we need to mix in a concrete sub-trait of this HttpServiceComponent and assign it a real value. We’ll get to that, first let’s see how the InfluxDB client code can use this trait:

def listDatabases: Future[List[DBInfo]] = {
    val p = Promise[List[DBInfo]]
    val url = s"$baseUrl?u=${user.urlEncoded}&p=${pwd.urlEncoded}"
    LOG.debug(s"Getting databases from $url")
    httpService.GET(url) onComplete {
      case Success(response) => p.success(jsonConverter.jsonToDBInfo(response))
      case Failure(error) => p.failure(error)
    }
    p.future
  }

Here you see how the implementation of listDatabases accesses the httpService. This compiles thanks to the self type declaration. It is as if we had written “with HttpServiceComponent” except that we get to decide later what actual concrete implementation we want to mix in. This snippet also uses jsonConverter, which brings us to the other dependency, also resolved using the Cake pattern:

JsonConverterComponent

Similar to HttpServiceComponent, JsonConverterComponent consists of a trait with an abstract val and a nested service definition trait:

abstract trait JsonConverterComponent {

  val jsonConverter: JsonConverter  // abstract; implementations must provide a value

  trait JsonConverter {

    val LOG = LoggerFactory.getLogger("org.influxdb.scala.JsonConverter")

    def jsonToSeries(response: String, precision: Precision): Try[QueryResult]
    def seriesToJson(s:Series):String
    def jsonToDBInfo(response:String):List[DBInfo]

    /**
     * combine the keys for all points into a single list of column names.
     * points in the sequence may have different columns, and the values in the points array
     * for missing columns will be null
     */
    def allColumns(points: Seq[DataPoint]): List[String] =
      points.foldLeft(Set[String]())((acc, p) => acc ++ p.keys.toSet).toList

  }
}

The idea is identical to before but here we see that the nested service definition doesn’t have to be completely without implementation. The allColumns function is needed to implement seriesToJson, yet it does not use any particular aspect of the chosen json parser so it can be used by all implementations, which is why I chose to implement it here. Don’t worry about the implementation of allColumns, but in case you’re wondering what’s going on here, you need to know that Datapoint is a type alias for Map[String,Any].

Let’s get real

OK, now we have seen how the InfluxDB client uses two abstract traits. Next, let’s see how we can provide concrete implementations for these traits and how we can get them mixed in. First, let’s examine the HttpServiceComponent. For now let’s assume we are writing the standalone implementation without any assumptions about other frameworks or (application)server environments. I chose the AsyncHttpClient because I signed the reactive manifesto and want everything to be asynchronous as much as possible, which also explains why the return types of all functions of the HTTPService return a Future. Below you’ll find a part of the standalone implementation of the HttpServiceComponent. To keep our eyes on the Cake I’ve left out most of the actual implementation and formatted it to fit your screen (see github for the gory details):

trait AsyncHttpClientComponent extends HTTPServiceComponent {

  implicit val pool = Executors.newSingleThreadExecutor()
  val httpService = new AsyncHttpClientImpl()

  class AsyncHttpClientImpl(implicit pool:ExecutorService) extends HTTPService {

    // details, details

    def GET(url: String): Future[String] = 
        getOrDelete[String](client.prepareGet(url)) { result => result}
    def POST(url: String, body: String, contentType: String): Future[Unit] = 
        postOrPut(client.preparePost(url),body,contentType)
    def PUT(url: String, body: String, contentType: String):  Future[Unit] = 
        postOrPut(client.preparePut(url),body,contentType)
    def DELETE(url:String):Future[Unit]  = 
        getOrDelete[Unit](client.prepareDelete(url)) { result => }

  }
}

Here we can see that AsyncHttpClientComponent extends HTTPServiceComponent and that it assigned a concrete value of type AsyncHttpClientImpl to the httpService val. What this means is that if we write “new InfluxDB(…) with AsyncHttpClientComponent” the value of httpService as used throughout the InfluxDB client code will have the value assigned here, namely the AsyncHttpClientImpl.

Welcome to Cake, you have now mastered the Cake pattern! But wait, there is more…

Standard Configurations

The actual JsonConverterComponent implementation using json4s is accomplished in exactly the same manner, see github. Since we have two dependencies that are most likely used together if we want to use InfuxDB in a standalone configuration, let’s make this painfully simple:

trait StandaloneConfig extends AsyncHttpClientComponent 
                       with Json4sJsonConverterComponent

To instantiate an InfluxDB client in a standalone environment we can now write the following and we’ll get a happy InfluxDB client with all dependencies materialized into concrete implementations!

  val client = new InfluxDB("localhost", 8086, "root", "root", "testing") 
                   with StandaloneConfig

But wait, there is one more thing… Let’s get to the title of this blog post…

Unit testing the InfluxDB client with mocks

We have seen how we can use the cake pattern to inject concrete implementations of an http service and json converter into the InfluxDB client. Could we not use this same pattern to come up with a fake http service to unit test the client code without actually communicating with an actual real InfluxDB database? Well, yes, of course! Here’s how.

I want to use ScalaTest and a mock http service to write something like this:

class InfluxDBCoreSpec extends FlatSpec with Matchers {

  val db = new InfluxDB("localhost", 8086, "root", "root", "testing") with MockHttpService with Json4sJsonConverterComponent

  "db.listDatabases" should "return a list of databases" in {
    val dbsF = db.listDatabases
    val dbs = Await.result(dbsF, 1 second) // DO NOT do this in real code, keep it non-blocking
    dbs.length should be (2)
    dbs(0).name should be ("dbOne")
    dbs(0).replicationFactor should be (1)
    dbs(1).name should be ("dbTwo")
    dbs(1).replicationFactor should be (2)
  }

  "db.query" should "return a query result" in {
    val qF = db.query("select * from testing", MILLIS)
    val result = Await.result(qF, 1 second) // DO NOT do this in real code, keep it non-blocking
    val t0 = new Date(100)
    result.length should be (1)
    val series = result(0)
    series.name should be ("testing")
    series.time_precision should be (MILLIS)
    series.data.length should be (3)
    series.data(0)("time") should be (t0)
    series.data(0)("sequence_number") should be (Some(1))
    series.data(0)("bar") should be (Some(200))
    series.data(0)("baz") should be (None)
    series.data(0)("foo") should be (Some(100))
    series.data(2)("baz") should be (Some(300))
  }
}

Notice the “with MockHttpService” on line 3. We want to write that such that the expectations expressed in the tests can be tested. It therefore needs to return hardcoded responses as if we had made these requests to a real InfluxDB instance. In the below I used Mockito to achieve this:

trait MockHttpService extends HTTPServiceComponent {
    import org.mockito.Mockito._
    override val httpService = mock(classOf[HTTPService])
    private val dblist = "http://localhost:8086/db?u=root&p=root"
    private val query = "http://localhost:8086/db/testing/series?u=root&p=root&time_precision=m&q=select+*+from+testing"

    when(httpService.GET(dblist)).thenReturn(Future[String](
      """
        [
          {"name":"dbOne", "replicationFactor": 1},
          {"name":"dbTwo", "replicationFactor": 2},
        ]
      """
    ))
    when(httpService.GET(query)).thenReturn(Future[String](
      """
        [
          {
            "name":"testing",
            "columns":["time","sequence_number","bar","baz","foo"],
            "points":[
              [100,1,200,null,100],
              [200,2,200,null,100],
              [300,3,100,300,null]
            ]
          }
        ]
      """
    ))
  }

The MockHttpService is another concrete implementation of the HttpServiceComponent, but instead of actually communicating over http with a real database it uses Mockito to stub out hardcoded responses to the various requests, based on the url. You can read about Mockito on their site. I believe the code is pretty self-explanatory; when you see a request with the given url, return the given response… Doesn’t get much easier.

Conclusion

The Cake pattern is an excellent way to inject dependencies in a syntactically brilliant way. No frameworks or libraries (like Spring or Guice) are needed. The standard Scala language feature of mixing in traits is all you need. Combinations of dependencies are easily created as well, as evidenced by the StandaloneConfig trait. As a bonus, it is incredibly easy to write unit tests as well by selectively mocking one or more of your dependencies.