SQL Server 2008 R2: StreamInsight - User-defined aggregates

I’d briefly played around with user-defined aggregates in StreamInsight with CTP3 but when I started working with the new Count Windows, I found I had to have one working. I learned a few things along the way that I hope will help someone.

The first thing you have to do is define a class:

public class IntegerAverage: CepAggregate<int, int>

{

    public override int GenerateOutput(IEnumerable<int> eventData)

    {

        if (eventData.Count() == 0)

        {

            return 0;

        }

        else

        {

          return eventData.Sum() / eventData.Count();

        }

    }

}

In this case, I’ve defined an IntegerAverage class that inherits from CepAggregate. CepAggregate is then declared as a generic type taking an int input parameter and providing an int output. The GenerateOutput method is overriden and called when it’s time to process the input. In this case, I’ve just checked if there are any values, returned zero if not and returned an integer average if there are values.

This is fine but to make it work with Intellisense and make the compiler play nicely, etc. you need to do more. The following code helps:

public static class UDAExtensionMethods

{

    \[CepUserDefinedAggregate(typeof(IntegerAverage))\]

    public static int IntAvg<T>(this CepWindow<T> window,

                                Expression<Func<T, int>> map)

    {

        throw CepUtility.DoNotCall();

    }

}

We define the name that we will use in code (in this case IntAvg). As this should never be called directly, we throw an exception if it is ever called.

The part that tricked me at first was this:

Expression<Func<T, int>> map

I had seen it used in the doco in Books Online but BOL didn’t ever mention where it came from. To use this, you need to add a using statement in your code as:

using System.Linq.Expressions;

This is the line that maps the function signature. Without it, you’ll get an error telling you that your aggregate has no overload taking one parameter. This allows me to then call it in code via:

select new { AverageMilliseconds = w.IntAvg(e =>

                         e.MillisecondsToPassSpeedCheckPoint) };

Passing The Event Instead

An alternative to this would be to build an aggregate that’s specific to the the event type. You could then call it via:

select new { AverageMilliseconds = w.SomeAvg() };

but then your user-defined aggregate would have to be defined via:

public class SomeAverage: CepAggregate<TollPointEvent, int

In this case, TollPointEvent was the name of the event type that the window is being created over. You’d need to then access members of the event class inside your aggregate code, like:

public override int GenerateOutput(IEnumerable<TollPointEvent> events)

{

  if (events.Count() == 0)

  {

    return 0;

  }

  else

  {

    return events.Sum(e => e.MillisecondsToPassSpeedCheckPoint)

         / events.Count();

  }

}

Finally, you’d need to modify your declaration similar to:

public static class UDAExtensionMethods

{

  \[CepUserDefinedAggregate(typeof(IntegerAverage))\]

  public static int SomeAvg<T>(this CepWindow<T> window)

  {

      throw CepUtility.DoNotCall();

  }

}

Note that the Expression section would not be needed as the whole event was being passed.

While you can do this, I’d only look to do this where your aggregate is very specific to the event class and where it involved multiple fields from within the event.

2010-05-08