Ruminations of J.net idle rants and ramblings of a code monkey

AdHoc Query Filters with StreamInsight

StreamInsight

It’s been a while – things have been VERY busy getting ready for the big public unveiling at the Microsoft Global Energy Forum. That was yesterday and I do think that it went very well; we had a lot of interesting conversations with customers around  StreamInsight, what it can do and, specifically, Logica’s capabilities to deliver a solution on the StreamInsight platform. That said, I have some ideas and thoughts backed up to blog about.

So … here’s your scenario. You have standing queries that you start with StreamInsight and process. With some output adapters, you want all of the results for these queries. However, that’s not always the case, especially when the output adapter is feeding a UI. When you have 7000-8000 individual data points coming into to StreamInsight, you don’t want to be feeding all of this down to the UI. And, since most people can’t process more than a few individual data items at a time, there’s really no point in doing so. (Keep in mind that this is the demo feed; real world scenarios in Oil & Gas can easily have over 50K data points coming in.) Now, you could spend all of your time writing every possible permutation of query filters and, I suppose, if very long term job security is your goal, that might be your path. Personally, I’d find such a thing boring to no end and, since I’m an inherently lazy developer, I’d much prefer to do it once in a flexible manner and move on to more interesting things. So … here it goes.

First, you need to define a common data class for your query result items; you need to know, at the very least, what field that you want to filter on and what the resulting datatype is – for typing the stream and query. For a large implementation, you’d want to do this anyway and part of your analysis at the beginning would be to determine the core datatypes required for your queries. You will want these datatypes to inherit from a core base class that defines attributes to include in every concrete data class. Here’s the one that we’ll be using in this example:

/// <summary>
/// Base class for all query data points/// </summary>public class DataItem{
/// <summary>
/// ID of the source item/// </summary>public string ItemId { get; set; }
/// <summary>
/// Time item was received/caclulated/// </summary>public DateTimeOffset Timestamp;
}

As you can see, it’s pretty basic and, though it’s not marked abstract, it’s main use is to be inherited and to allow use to generically write the query filters. I didn’t mark it abstract to allow for the any potential cases where this would be all we need … though I cannot, for the life of me, think of a single case for this. For a value, we’ll use a derived ValueDataItem class:

/// <summary>
/// Class for data items with a single value/// </summary>
/// <typeparam name="T">Type of the value</typeparam>public class ValueDataItem<T>:DataItem{
/// <summary>
/// Single value for this item./// </summary>public T Value { get; set; }
}

This class will suffice for most raw point events, especially those that come from sensor readings. And, since it’s a generic type, it can hold any value that we need.

Note: You may be tempted, as I was, to create typed input adapters that use this kind of generic class. DO NOT DO IT. Just say no. Something funky goes on with the way the typed input adapters are handled that will lead you down a rat hole into Oblivion. You will start trying to force it and violate the KISS principle every step of the way into an endless abyss of reflection and dynamic compilation. But that’s a discussion for another time …

To show the flexibility of the methodology, I’m also going to introduce one more DataItem class, the AggregateDataItem.

/// <summary>
/// Data item for aggregate query results/// </summary>
/// <typeparam name="T">Type of the raw source data values.</typeparam>class AggregateDataItem<T>:DataItem{
/// <summary>
/// Sum of the aggregate/// </summary>public T Sum { get; set; }
/// <summary>
/// Count of the aggregate/// </summary>public uint Count { get; set; }
/// <summary>
/// Mininum value of the aggregate/// </summary>public T Min { get; set; }
/// <summary>
/// Maximum value of the aggregate/// </summary>public T Max { get; set; }
/// <summary>
/// Average value of the aggregate/// </summary>public double Average { get; set; }
}

Now that we have the basics down, let’s take a look at what we need to do to evaluate our filter. First, we have a class defining the filter with properties for the ItemIds to compare (as a string array) and an int for the type of comparison. While it’s tempting (and natural) to create the comparison mode as an enumeration, enums aren’t supported datatypes when calling custom methods/functions from a running StreamInsight query so I’ve made them old-school constants. Also, while I am using an array for the ItemIds, StreamInsight doesn’t support passing string arrays into a method from a query either so I have to join the array and then split later. Ugly, but it works. Our method to call and do the comparison are below:

public static bool Matches(string value, string itemIdList, int comparisonMode )
{
string[] itemIds = itemIdList.Split(';');return MatchesItem(value, itemIds, comparisonMode);
}
/// <summary>
/// Evaluates for a match/// </summary>
/// <param name="value">Value of item to match</param>
/// <param name="itemIds">Array of ItemIds for comparison</param>
/// <param name="comparisonMode"></param>
/// <returns></returns>
/// <remarks>CONTAINS is an "AND" match; all others are an "OR"</remarks>public static bool MatchesItem(string value, string[] itemIds, int comparisonMode)
{
switch (comparisonMode)
{
case COMPARISON_NONE:return true;case COMPARISON_EQUALS:return itemIds.Any(value.Equals);case COMPARISON_STARTS_WITH:return itemIds.Any(value.StartsWith);case COMPARISON_ENDS_WITH:return itemIds.Any(value.EndsWith);case COMPARISON_CONTAINS:return itemIds.All(value.Contains);
}
return false;
}

Now. the code to create the filtered query from a standing query and attach an output adapter to it:

/// <summary>
/// Creates a filtered query from an existing query./// </summary>
/// <typeparam name="T">Type of the query's output data item</typeparam>
/// <param name="cepApplication">Hosting CEP Application</param>
/// <param name="queryName">Name of the standing query</param>
/// <param name="filterDefinition">Filter definition</param>
/// <param name="outputAdapterFactoryType">Type of the output adapter factory.</param>
/// <param name="outputAdapterSettings">Settings for the output adapter.</param>
/// <param name="eventShape">Event shape for the resulting query</param>
/// <param name="eventOrder">Event order for the resulting query.</param>
/// <returns>Newly created query.</returns>
/// <remarks>Query is not started on return.</remarks>public static Query CreateFilterQuery<T>(
Application cepApplication,
string queryName, QueryFilterDefinition filterDefinition,
Type outputAdapterFactoryType, object outputAdapterSettings,
EventShape eventShape, StreamEventOrder eventOrder)
where T : DataItem{
//Get the source query.Query sourceQuery = cepApplication.Queries[queryName];Guid newQueryId = Guid.NewGuid();//Convert to a stream to monkey with it.CepStream<T> cepStream =
sourceQuery.ToStream<T>();string itemIdList = String.Join(";", filterDefinition.ItemIds);var filteredStream = from d in cepStreamwhere QueryFilterDefinition.Matches(d.ItemId, itemIdList, filterDefinition.ComparisonMode)
select d;//Create the query.Query filteredQuery = filteredStream.ToQuery(
cepApplication, newQueryId.ToString(), "Filtered " + queryName,
outputAdapterFactoryType, outputAdapterSettings,
eventShape, eventOrder);return filteredQuery;
}

Note the use of the generic T in the method call as well as the generic constraint. This is why we started with the base class defining ItemId; we need it to provide StreamInsight with the schema for the query and the constraint to ensure that we have an ItemId to use for the filter. If the generic type specified when you create the filtered query doesn’t match the type of the standing query, you will get an error when you create the query. Creating the filtered query is as simple as the following:

Query filteredAggregate = QueryFilterDefinition.CreateFilterQuery<AggregateDataItem<float>>(
cepApplication, "Aggregate", new QueryFilterDefinition()
{
ComparisonMode =
QueryFilterDefinition.COMPARISON_CONTAINS,
ItemIds = new string[] {"1", "2"}
}, typeof(TracerFactory), tracerConfig, EventShape.Interval, StreamEventOrder.FullyOrdered);
filteredAggregate.Start();

Yes, I’ve left a LOT out … I didn’t feel that it would be necessary to show all of the set-up, etc. But you can download and review the demo solution to see the details.

Comments are closed