Skip to content

Filtering Akka Streams by elapsed time

by Frank Sauer on January 25th, 2015

It has been a while since my last blog post, sometimes the day job gets in the way of the more interesting experiments worth blogging about. However, lately I’ve had some time to play with the latest Akka Streams (1.0-M2) and figure out how to map a common CEP use case involving the use of elapsed time onto it.

Consider a stream of data coming from temperature sensors, say in the form of a Temperature(sensor: String, value: Double, time: Long). Let’s further assume we want to alert on this data if we see a temperature over 100 degrees. However, our sensors are a bit flaky and the data stream will have occasional spikes where one measurement suddenly shoots up for a very brief period of time, perhaps only a single measurement. We don’t want to emit false alarms, so we have to deal with these spikes and filter them out. Or as an alternative use case let’s say the sensors are part of some equipment that can handle a temperature of 100 degrees for 2 minutes but no longer, so we only want to alert if the temperature stays over 100 for more than 2 minutes. I’m sure you can come up with other use cases for the use of time as well, it is a common scenario.

Testing a condition like “temperature over 100″ sounds like the standard filter operation, and the akka streams documentation for 1.0-M2 has an example of a Filter operator, implemented as a subclass of PushStage (renamed type A to E for Event):

class Filter[E](p: E => Boolean) extends PushStage[E, E] {
    override def onPush(evt: E, ctx: Context[E]): Directive =
       if (p(evt)) ctx.push(evt)
       else ctx.pull()
}

Let’s use this as our starting point and add elapsed time to its signature. Let’s try this definition :

class FilterFor[E](duration: FiniteDuration)(p: E => Boolean) extends PushStage[E, E] {
    override def onPush(evt: E, ctx: Context[E]): Directive = ???
}

Conceptually this is what we’re looking for, but there are some technicalities that will make this a bit more complicated, caused by the fact that we’re potentially receiving multiple events during the given timeframe for which the predicate evaluates to true. With a regular filter function there is no problem, each event for which the predicate yields true is pushed downstream immediately, but in FilterFor we have to wait for the given duration and compare the inbound events with a previous event to validate that the condition still holds or the elapsed time has exceeded the given duration. This means we need the ability to compare the inbound events, and cache them. So we need a key for the events. The events themselves probably won’t make good keys so let’s provide a key function E=>K that extracts a key from an event.

We also need to decide what to do with all the events received during the time interval. Do we keep the first, the last, all of them? Because this is a filter and the expectation of a filter is that the inbound and outbound type is the same, we need to push an E downstream, but which one? Let’s leave that up to the user as well and expose both the key function as well as this reducer in the API:

class FilterFor[E,K](duration: FiniteDuration)
                  (key: E=>K, reduce: Seq[(Long,E)]=>E)
                  (predicate: E => Boolean) extends PushStage[E, E] {

    var pending: Map[K,Seq[(Long,E)]] = Map.empty

    override def onPush(evt: E, ctx: Context[E]): Directive = ???
}

Before we look at the implementation I want to emphasize one very important rule from the akka streams documentation:

Warning
There is one very important rule to remember when working with a Stage. Exactly one method should be called on the currently passed Context exactly once and as the last statement of the handler where the return type of the called method matches the expected return type of the handler. Any violation of this rule will almost certainly result in unspecified behavior (in other words, it will break in spectacular ways). Exceptions to this rule are the query methods isHolding() and isFinishing()

This is very important and limits the ways we can implement our filterFor behavior. For example, in a first attempt I tried to use timers but this broke this rule (and the Context is not thread-safe) and indeed failed quite spectacularly.

As already shown above, we will keep some internal state in the form of a Map that maps keys to sequences of events and the timestamp at which they were received. We’ll use System.nanoTime() as the timestamp. If this is not accurate enough for your hardware or use case I’d like to talk to you :-)

When a new event arrives we have to deal with 4 cases:

  • The predicate matches and we’ve seen this event before. In this case if the first occurrence was far enough in the past we want to reduce all the events we’ve seen during the time interval and push the result downstream, if not, we simply add it to the sequence of pending events and continue
  • The predicate no longer matches but we have seen this event before. The condition no longer holds so we can simply remove any pending events and continue
  • The predicate matches but we’ve never seen this event before. In this case we store the event in our list of pending events and continue
  • The predicate does not match and we’ve never seen this event before. This is the don’t care case and we simply continue, however this is also a good opportunity to eliminate stale pending events, which are possible and even likely because we may see just one or a few events for which the predicate holds but not enough to fill the time frame and we never see a matching event for which the predicate no longer holds. In this case the pending events would not get cleared out by the second case

In the above description, when I say “and continue” this means invoke pull on the context, indicating to upstream stages that we’re ready to receive more data. Translated to code this looks like this:

class FilterFor[E,K](duration: FiniteDuration)
                  (key: E=>K, reduce: Seq[(Long,E)]=>E)
                  (predicate: E => Boolean) extends PushStage[E, E] {

    var pending: Map[K,Seq[(Long,E)]] = Map.empty
    val nanos = duration.toNanos

    override def onPush(evt: E, ctx: Context[E]): Directive = {
       val k = key(evt)
       val now = System.nanoTime

       pending.get(k) match {
          case Some(previous) if predicate(evt)  => 
               val withNext = previous :+ now->evt
               if (now - previous.head._1 >= nanos) {
                  pending = pending - k
                  ctx.push(reduce(withNext))
               } else {
                  pending = pending.updated(k, withNext)
                  ctx.pull
               }
          case Some(previous) if !predicate(evt) => 
               pending = pending - k
               ctx.pull
          case None if predicate(evt) =>
               pending = pending + (k -> Vector(now->evt))
               ctx.pull
          case _ =>
               // none of the above, good time to remove stale entries from the cache
               for {
                 (k,(t,_)::_) <- pending // get key and oldest timestamp
                 if (now - t) > nanos
               } yield {pending = pending - k} 
               ctx.pull
       } 
    }
}

That’s it, let’s try it out. To do so we’ll create a stream of mock temperature sensor data and figure out a way to stream it at a fixed rate, say 1 sample every 100 milliseconds, so we can test out FilterFor stage. The latter can be achieved by zipping our test data with a tick stream using the impressive Akka streams FlowGraph DSL:

  case class Tick(time:Long)

  case class Temperature(sensor:String, value: Double, time:Long = 0)

  val data = Source(List[Temperature](
      Temperature("S1",100), Temperature("S2",100),
      Temperature("S1",101), Temperature("S2",101),
      Temperature("S1",102), Temperature("S2",102),
      Temperature("S1",105), Temperature("S2",99),
      Temperature("S1",100), Temperature("S2",100),
      Temperature("S1",101), Temperature("S2",101),
      Temperature("S1",102), Temperature("S2",102),
      Temperature("S1",103), Temperature("S2",103),
      Temperature("S1",101), Temperature("S2",99),
      Temperature("S1",102), Temperature("S2",102),
      Temperature("S1",100), Temperature("S2",100)
  ))

  // emit a tick every 100 millis
  val ticks = Source(0 second, 100 millis, () => Tick(System.nanoTime()))

  val temps: Source[Temperature] = Source() { implicit b =>
     import FlowGraphImplicits._
     val out = UndefinedSink[Temperature]
     val zip = ZipWith[Tick,Temperature,Temperature]((tick,temperature)=> temperature.copy(time = tick.time))
     ticks ~> zip.left
     data ~> zip.right
     zip.out ~> out
     out
  }

temps is now a Source[Temperature] that emits a Temperature object every 100 millisecond. Note that in the above we also inject a timestamp into our fake measurements, normally I would assume the measurements will already contain such timestamp. We now need a key function and a reducer. Let’s write a reducer that returns the maximum temperature sample seen during the interval for which the predicate held. The key is simply the sensor id:

  def tempKey (t:Temperature):String = t.sensor
  def tempReduce (ts: Seq[(Long,Temperature)]):Temperature = {
    def compare(t1: (Long,Temperature),t2 : (Long,Temperature)) = if (t1._2.value > t2._2.value) t1 else t2
    ts.reduceLeft(compare)._2
  }

And now, finally, we can write a flow in which we use our filterFor:

  temps
    .filterFor(1 seconds)(t => t.value >= 100)(tempKey,tempReduce)
    .map(t=>s"HOT!!!: $t")
    .to(Sink.foreach(println(_)))
    .run

This prints the following :

HOT!!!: Temperature(S1,105.0,1422202132063859000)

The careful reader will now say “Wait a minute! temp is a Source[Temperature], how can we invoke filterFor on that? We never defined such a thing!” I left out a bit of magic that I thought was so cool I left it for last… After adding shiny new tools like FilterFor to our toolbox we can add them to the existing DSL using an implicit class, like so:

  // pimp the dsl
  implicit class FilterForInSource[E](s: Source[E]) {
    def filterFor[K](duration:FiniteDuration)(predicate: E=>Boolean)
                 (key: E => K = (evt:E)=>evt.asInstanceOf[K],
                  reduce: Seq[(Long,E)] => E = (evts:Seq[(Long,E)])=>evts.head._2):Source[E] =
      s.transform(() => new FilterFor(duration)(key,reduce)(predicate))
  }

Without this bit of implicit magic, you’d have to use transform in your flows. Note also that I’ve provided defaults for the key function (the identity function) and the reducer (returns the oldest event), so if this works for your use case – in other words, events are themselves usable keys and you want to push the oldest one downstream – you can simply leave the entire last parameter list off and improve the readability of your flows.

If you’ve read this far, you deserve a present: all this code is available on github, enjoy :-)

I want to thank Endre and Konrad from the Akka mailing list for helping me out and the akka team as a whole for coming up with this cool stuff. I can see myself using this often and with pleasure in the future.

No comments yet

Leave a Reply

You must be logged in to post a comment.