Here’s a quick UDA to do standard deviation of a window. I found it interesting that I had to take the IEnumerable<double> source and call ToArray(). If I didn’t, it would throw a NullReferenceException, although why is something of a mystery. It would be nice if I could pass in the Average from the query since that’s already calculated by the StreamInsight engine but no dice.
Note: I’ve not done any performance testing … it was copied from MSDN. Use at your own risk …
/// <summary>
/// Static class with UDA extensions for standard deviation
/// </summary>
public static class StandardDeviation
{
/// <summary>
/// Extension method for the UDA.
/// </summary>
/// <typeparam name="T">Payload type of the stream.</typeparam>
/// <param name="window">Window to be passed to the UDA</param>
/// <param name="map">Mapping from the payload to a float field in the payload.</param>
/// <returns>Aggregation result.</returns>
[CepUserDefinedAggregate(typeof(StdDevDouble))]
public static double StdDev<T>(this CepWindow<T> window, Expression<Func<T, double>> map)
{
throw CepUtility.DoNotCall();
}
}
/// <summary>
/// A UDA to calculate the standard deviation of a window
/// </summary>
public class StdDevDouble : CepAggregate<double, double>
{
/// <summary>
/// Computes the aggregation over a window.
/// </summary>
/// <param name="source">Set of events contained in the window.</param>
/// <returns>Aggregation result.</returns>
public override double GenerateOutput(IEnumerable<double> source)
{
double[] values = source.ToArray();
double mean = values.AsParallel().Average();
double standardDev = values.AsParallel().Aggregate(
0.0,
// do this on each thread
(subtotal, item) => subtotal + Math.Pow((item - mean), 2),
// aggregate results after all threads are done.
(total, thisThread) => total + thisThread,
// perform standard deviation calc on the aggregated result.
(finalSum) => Math.Sqrt((finalSum / (source.Count() - 1)))
);
return standardDev;
}
}
Using it in a query is simple – just make sure that you add a using statement for the namespace containing the aggregate’s definition.
var stdDeviation = from e in sourceQueryStream
group e by e.ItemId into eachGroup
from window in eachGroup.HoppingWindow(
TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new {
ItemId = eachGroup.Key,
StdDev = window.StdDev(e => e.Value),
Avg = window.Avg(e => e.Value),
Min = window.Avg(e => e.Value),
Max = window.Avg(e => e.Value),
Count = window.Count()
};