Skip to content

CEP using Akka Streams

by Frank Sauer on May 10th, 2014

Ever since I took the Principles of Reactive Programming class on Coursera and learned about Rx I’ve had this lingering thought that streams are a perfect fit for CEP. Not really a surprise given that one of the commercial CEP products is TIBCO StreamBase… After the recent announcement of Akka Streams and watching Roland Kuhn’s webinar on it, I decided to give this a try and see what it would take to implement the same  trading algorithm I blogged about before but this time using Akka Streams. Some of the reasons to try this are:

  • Benefit from the expressiveness and type safety of the Scala language to implement the rules
  • Benefit from the inherent scalability of the Akka platform to multiple cores and even distributed clusters
  • At some point this can probably also benefit from Akka persistence to get high availability

Akka Streams is the Typesafe  implementation of the draft Reactive Streams specification, a standard for asynchronous stream processing with non-blocking backpressure. Akka Streams is interoperable with other Reactive Streams implementations. It is implemented as a fluent DSL on top of an actor based execution model. The DSL allows for the creation, filtering, splitting, and merging of streams, all operations crucial to express CEP-like rules “normally” expressed using a SQL -like language.

Let’s see what our algorithm looks like using this DSL. First we start out with the same data as in the previous posts, but we wrap it with a Flow:

  val prices = Array(
    Price("BP", 7.61), Price("RDSA", 2101.00), Price("RDSA", 2209.00),
    Price("BP",7.66), Price("BP", 7.64), Price("BP", 7.67)
  )

  Flow(prices.toVector).

The first step is to split this stream by symbol. The result will be new streams, one per symbol. The groupBy transforms the Flow[Price] to Flow[(String,Producer[Price])]. Each of the streams has to be processed as a new Flow:

 Flow(prices.toVector).
    groupBy(_.symbol).
    foreach {
       case (symbol, producer) => Flow(producer)
          ...
    }

Now that we have multiple streams with equal symbols  we can start grouping them into chunks to calculate the average. The DSL function to do this is called grouped. It transforms the Flow[Price] into Flow[Seq[Price]]:

 Flow(prices.toVector).
    groupBy(_.symbol).
    foreach {
       case (symbol, producer) => 
          Flow(producer).
          grouped(WINDOWSIZE).
          ...
    }

now that we have chunks we can calculate the average and implement our rule. The next step filters these chunks so only those matching our rule (upward trending price) will continue in the flow:

 Flow(prices.toVector).
    groupBy(_.symbol).
    foreach {
       case (symbol, producer) => 
          Flow(producer).
          grouped(WINDOWSIZE).
          filter(window => window.size == WINDOWSIZE && avg(window) > window.head.price).
    }

The first condition throws away incomplete windows and the second compares the average with the first element to determine the trend. Almost there! Now we can generate our BUY orders for everything left and send them to market (well, the console actually). The map function transforms the Flow[Seq[Price]] we still have here to a Flow[Buy]:

 Flow(prices.toVector).
    groupBy(_.symbol).
    foreach {
       case (symbol, producer) => 
          Flow(producer).
          grouped(WINDOWSIZE).
          filter(window => window.size == WINDOWSIZE && avg(window) > window.head.price).
          map(window => Buy(symbol, window.reverse.head.price, ORDERSIZE)).
          foreach(buy => println(s"generated a buy:  $buy"))
    }

There is only one crucially important step left to do. We need to initiate the consumption of data from the flow, otherwise nothing will happen:

 Flow(prices.toVector).
    groupBy(_.symbol).
    foreach {
       case (symbol, producer) => 
          Flow(producer).
          grouped(WINDOWSIZE).
          filter(window => window.size == WINDOWSIZE && avg(window) > window.head.price).
          map(window => Buy(symbol, window.reverse.head.price, ORDERSIZE)).
          foreach(buy => println(s"generated a buy:  $buy")).
          consume(materializer)
    }

The materializer passed to the consume method is a FlowMaterializer, and in this sample it is created using FlowMaterializer(MaterializerSettings()). The FlowMaterializer controls how the transformation steps defined by the flow are mapped to the underlying actor model. Materializer settings include things like buffer sizes and fanout.

OK, I lied, we’re not quite done yet. We also need to trigger our initial flow to start consuming data! The consume shown above only consumes data from each stream created by the groupBy. Below is the entire sample, including the last step to consume from the outer flow and shut down the akka system when done:

object StreamCEP extends App {

  case class Price(symbol: String, price: Double)
  case class Buy(symbol: String, price: Double, amount: Long)
  case class Sell(symbol: String, price: Double, amount: Long)

  implicit val system = ActorSystem("Sys")
  val materializer = FlowMaterializer(MaterializerSettings())

  val prices = Array(
    Price("BP", 7.61), Price("RDSA", 2101.00), Price("RDSA", 2209.00),
    Price("BP",7.66), Price("BP", 7.64), Price("BP", 7.67)
  )

  val WINDOWSIZE = 4
  val ORDERSIZE=1000

  def sum(prices: Seq[Price]):Double = prices.foldLeft(0.0)((partial, price) => partial + price.price)
  def avg(prices: Seq[Price]):Double = sum(prices) / prices.size

  // just use the test data from above, could be an actual (infinite) ticker stream
  Flow(prices.toVector).
    // group by symbol, splitting the stream into separate streams, one per symbol
    groupBy(_.symbol).
    // process them all
    foreach {
       case (symbol, producer) =>
          Flow(producer).
            // chunk the stream by the window size
            grouped(WINDOWSIZE).
            // the actual rule! only let through each complete window for which avg > first
            filter(window => window.size == WINDOWSIZE && avg(window) > window.head.price).
            // for those that pass, generate a Buy
            map(window => Buy(symbol, window.reverse.head.price, ORDERSIZE)).
            // send to market - for now just print them :-) 
            foreach(buy => println(s"generated a buy:  $buy")).
            consume(materializer)
    }.
    onComplete(materializer) { _ => system.shutdown()}
}

I must say I’m pretty impressed already! Can’t wait to see this technology mature. The example works like a charm (Check out the activator template from the previous post, it should have been added to that automatically). Can’t wait to try this out on a real stream and run it on a large akka cluster, that should be fun!

P.S. When you run this you’ll notice some dead-letter messages from Akka on shutdown. I don’t know what causes this yet. If anybody knows, please let me know.

UPDATE

Somebody on twitter asked me how this compares to Esper and that got me thinking. Depending on your definition of CEP, the above may or may not qualify as true CEP… There is no notion of time involved in the above example, no events entering and leaving windows of various kinds, as is the case in Esper-land. For example, in an earlier post  this algorithm was implemented by forking the incoming stream into a delayed stream and an average stream, which were then later joined. The delayed stream was used to compare the oldest element of the window with the average of that window. As of now I see no way to do things like that using Akka streams. Not sure if this can be done on top of it or if it would require an internal change to the core implementation. I’m thinking of a delay(n:Int):Flow[T] primitive which would delay the Flow[T] by n Ts, so in effect representing a delay queue of size n. Perhaps this can be expressed in terms of the general transform primitive since that can handle state.

Another prime example of CEP involves checking for patterns of events over time, including the absence of events! This requires one event triggering a timer which in and of itself triggers another event when it expires, and then depending on what happens first, other events may get generated. Again, not sure how to map this idea to Akka Stream as of yet.

From → programming, scala

One Comment

Trackbacks & Pingbacks

  1. Reactive Weekly (12/05/14) | Eigengo blog

Comments are closed.