Skip to content

Improved Esper EventBus

by Frank Sauer on February 24th, 2014

In my previous post on integrating  Esper into an Akka event bus, I pointed out two issues I wasn’t too happy with:

  1. The calls to registerEventType
  2. The sample EPL  could have been better

I’m happy to report that there has been some progress on both issues:

Reflecting on union types

I found a partial solution to this issue. The EsperClassification trait now registers the types at the right time:

  def esperEventTypes:Union[EsperEvents] // abstract! should be concrete val = new Union[EsperEvents]

  val esperConfig = new Configuration()

  // types are now automagically registered when we need them to, but it would be even cooler
  // if we did not have to ask the application programmer to instantiate the Union... 
  // missing TypeTags if we attempt to do it here
  esperEventTypes.typeMembers() foreach(t => registerEventType(, m.runtimeClass(t)))

  // these now no longer have to be lazy, since types are guaranteed to be registered at this point
  val epService = EPServiceProviderManager.getDefaultProvider(esperConfig)
  val epRuntime = epService.getEPRuntime

Now there are no more assumptions that applications do this in the right order (before creating EPL statements or submitting events). It does this by reflecting on the Union type provided by esperEventTypes. This is an abstract method returning a value of type Union[EsperEvents]. It is totally silly that actual event busses should have to override this since the implementation will always be the same:

override def esperEventTypes = new Union[EsperEvents]

The issue is that implementing this in the EsperClassification trait itself does not work. The instantiation of Union requires an implicit TypeTag and I can’t seem to figure out how to provide it. It just works when doing it in the subclasses – without doing anything out of the ordinary.

So still not ideal, but much better than explicitly having to register all the types manually.

Better EPL for the sample algorithm

The pattern based EPL was not the best way to implement the simple average algorithm to generate the Buy orders. The rule using pattern matching was hard coded to a window of length 4, and it also did not use the built-in avg function to calculate the average. Esper can do much better, and here’s the result:

val windowSize = 4
val orderSize = 1000

// this will delay the Price stream by windowSize - 1: 
// the price at position (latest - windowSize) will fall out of the window into the Delayed stream
epl(s"insert rstream into Delayed select rstream symbol,price from Price.std:groupwin(symbol).win:length(${windowSize-1})")

// after every windowSize prices for a symbol, the average is inserted into the Averages stream
epl(s"insert into Averages select symbol,avg(price) as price from Price.std:groupwin(symbol).win:length_batch($windowSize) group by symbol")

// the join is only triggered by a new average (since it has the unidrectional keyword), 
// which (see above) is only generated after a complete window for a symbol has been seen
      insert into Buy
      select p.symbol, p.price, $orderSize as amount
      from Price.std:unique(symbol) p
      join Delayed.std:unique(symbol) d on d.symbol = p.symbol
      join Averages a unidirectional on a.symbol = p.symbol
      where a.price > d.price

This solution allows for variable size windows and it also nicely demonstrates the windowing and stream joining features of  Esper. For more details on these features see the Esper EPL reference.

Next steps

Next I will look into simplifying the subscription API by removing support for the “removed/*” topics. It turns out that this isn’t really needed since Esper will only ever pass data in the oldEvents parameter of the updateListener when the keyword “irstream” is used  in the select clause, and I can’t remember the last time I’ve seen an example of that… Stop me now if you need this :-)

From → programming, scala

One Comment

Trackbacks & Pingbacks

  1. Embedding Esper as an Akka Actor | Frankly Sauer

Comments are closed.