About the author

J Sawyer is a developer based in Houston, TX who absolutely loves to write code. After spending 9 years at Microsoft, he moved on to other things and is currently the Lead Developer for the RealTime Data Management team at Logica US. He spends his days building Really Cool Things around StreamInsight and having a blast doing it.

He has been involved with HDNUG, one of the oldest and largest .NET-focused user groups in the US, since its inception in 2001 and has watched it grow from 5-10 technologists meeting around a conference table to a thriving community of over 5000 with regular meeting attendance averaging 100 attendees. He currently serves as the Vice President. You can join him at HDNUG on the second Thursday of every month at the Houston Microsoft office.

He also loves to ride his Yamaha FZ1. And sometimes his Ninja 650. And also his Honday XR-400 dirt bike. But he doesn't code and ride at the same time. That would be bad.

StreamInsight User Defined Aggregate: Standard Deviation

May 15, 2011 3:39 PM

Here’s a quick UDA to do standard deviation of a window. I found it interesting that I had to take the IEnumerable<double> source and call ToArray(). If I didn’t, it would throw a NullReferenceException, although why is something of a mystery. It would be nice if I could pass in the Average from the query since that’s already calculated by the StreamInsight engine but no dice.

Note: I’ve not done any performance testing … it was copied from MSDN. Use at your own risk …

/// <summary>
/// Static class with UDA extensions for standard deviation
/// </summary>
public static class StandardDeviation
{
    /// <summary>
    /// Extension method for the UDA.
    /// </summary>
    /// <typeparam name="T">Payload type of the stream.</typeparam>
    /// <param name="window">Window to be passed to the UDA</param>
    /// <param name="map">Mapping from the payload to a float field in the payload.</param>
    /// <returns>Aggregation result.</returns>
    [CepUserDefinedAggregate(typeof(StdDevDouble))]
    public static double StdDev<T>(this CepWindow<T> window, Expression<Func<T, double>> map)
    {
        throw CepUtility.DoNotCall();
    }
}

/// <summary>
/// A UDA to calculate the standard deviation of a window
/// </summary>
public class StdDevDouble : CepAggregate<double, double>
{
    /// <summary>
    /// Computes the aggregation over a window.
    /// </summary>
    /// <param name="source">Set of events contained in the window.</param>
    /// <returns>Aggregation result.</returns>
    public override double GenerateOutput(IEnumerable<double> source)
    {
        double[] values = source.ToArray(); 

        double mean = values.AsParallel().Average();

        double standardDev = values.AsParallel().Aggregate(
            0.0,
            // do this on each thread
             (subtotal, item) => subtotal + Math.Pow((item - mean), 2),
            // aggregate results after all threads are done.
             (total, thisThread) => total + thisThread,
            // perform standard deviation calc on the aggregated result.
            (finalSum) => Math.Sqrt((finalSum / (source.Count() - 1)))
        );
        return standardDev;
    }

}

Using it in a query is simple – just make sure that you add a using statement for the namespace containing the aggregate’s definition.

var stdDeviation = from e in sourceQueryStream
             group e by e.ItemId into eachGroup
             from window in eachGroup.HoppingWindow(
                TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1), HoppingWindowOutputPolicy.ClipToWindowEnd)
             select new {
                 ItemId = eachGroup.Key,
                 StdDev = window.StdDev(e => e.Value), 
                 Avg = window.Avg(e => e.Value), 
                 Min = window.Avg(e => e.Value), 
                 Max = window.Avg(e => e.Value), 
                 Count = window.Count()
             };

Tags:

.NET Stuff | StreamInsight | Code Sample