Ruminations of J.net idle rants and ramblings of a code monkey

Query Logic and Operator Reuse in StreamInsight (DQC)

StreamInsight | Code Sample

Way back in 2010, Mark Simms posted a blog entry called StreamInsight: Understanding dynamic query composition where he detailed how you can use dynamic query composition (DQC) to prevent multiple instantiation of your adapters. And yes, it is important … very important … for this reason but there is a more to the story here that Mark didn’t cover.

Let’s start, however, with a set of requirements for our StreamInsight queries.

First, we need to have a data source. For our example, we’ll be using a random number generator but that’s really irrelevant. But the data source gets it started. Next, we need to calculate a 10-second rolling average every 2 seconds. This is easy enough … that’s a hopping window. The last step is to take the results of our hopping window aggregate and calculate the difference between the averages with every hop … providing a rate-of-change for the calculated average. Again, this is a pretty common, well-known pattern that’s in the StreamInsight samples for LinqPad called “FoldPairs” (Alter/Clip/Shift). Each one of these steps needs to be a query since we need to send this to an output adapter.

So let’s get started. We’ve read Mark’s article and take it to heart so we’ll use DQC with the first, source query to make sure that there is only one instance of the input adapter.

var initialStream = CepStream<GeneratedEvent>.Create(cepApplication,
"generatedStream", typeof(GeneratorFactory),
config, EventShape.Point);Query initialQuery = CreateAndStartQuery<GeneratedEvent>(
initialStream, "initialQuery", "Initial query from generator",
EventShape.Point, csvConfig, cepApplication);//Taking this query and converting to a stream allows us to build on it further. 
//This is called Dynamic Query Composition (DQC)
var sourceStream = initialQuery.ToStream<GeneratedEvent>("sourceStream");

Our next step is to create the hopping window for our aggregate, using sourceStream as the basis.

var hoppingWindowStream = from s in sourceStream.AlterEventDuration(e=> TimeSpan.FromTicks(1))
group s by s.DeviceId into aggregateGroupfrom item in aggregateGroup.HoppingWindow(
TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2),
HoppingWindowOutputPolicy.PointAlignToWindowEnd)
select new AggregateItem{
DeviceId = aggregateGroup.Key,
Average = item.Avg(e => e.Value),
Count = item.Count(),
Sum = item.Sum(e => e.Value)
};//Create the query and send to output adapter.Query hoppingWindowQuery = CreateAndStartQuery<AggregateItem>(hoppingWindowStream,
"hoppingWindowAggregate", "Hopping aggregate query from generator",
EventShape.Point, csvConfig, cepApplication);

Now we need to calculate the rate-of-change for the hopping aggregate. Since we’ve already got the hoppingWindowStream, we can just use that.

var aggregateDifferenceSourceStream = hoppingWindowStream;var aggregateDifference = from current in aggregateDifferenceSourceStreamfrom previous in aggregateDifferenceSourceStream
.AlterEventDuration(e => TimeSpan.MaxValue)
.ClipEventDuration(aggregateDifferenceSourceStream,
(e1, e2) => e1.DeviceId == e2.DeviceId)
.ShiftEventTime(e => TimeSpan.FromTicks(1))
where current.DeviceId == previous.DeviceIdselect new ItemDifference()
{
CurrentValue = current.Average,
DeviceId = current.DeviceId,
PreviousValue = previous.Average,
Difference = current.Average - previous.Average
};Query aggregateDifferenceQuery = CreateAndStartQuery(aggregateDifference, "AggregateDifference",
"Difference between two aggregate values",
EventShape.Point,
csvConfig, 
cepApplication); 

When we run this, we get the results that we expect. Pretty straightforward, right? Well … not exactly. Let’s take a look at the queries in the event flow debugger. First, we see the hopping window stream and the grouping and aggregating operators. It’s pretty simple and straightforward.

Hopping WIndow

Now, let’s take a look at the query for the rate-of-change in the aggregates. It’s a bit longer and, if you look at the area that’s highlighted in red, it has the exact same operators that we see in the aggregate query.

diffFromAvg - No DQC

So … the operators that did the group and aggregate are actually repeated for both queries! What’s going on here? What you need to understand is that the operator tree for a StreamInsight query isn’t built until you call ToQuery() … and the entire operator tree is built! If your tree goes all the way back to the source, you’ll get two instances of the same input adapter, as Mark described in his blog. But you’ll also get any other operators repeated. In our little sample here, it’s really not a big deal but in a large application, this can lead to some pretty substantial overhead. Using DQC, you can reduce this overhead and reuse the results of operators without having them rebuilt in the entire tree. When you get to a result that then reused with different streams, you can create a query from it, convert the query back to a stream with ToStream() and then write your additional Linq expressions. You do not need to have an output adapter with every query either … that is actually optional.  Here’s how the code for the aggregate difference query would like using DQC:

var aggregateDifferenceSourceStream = hoppingWindowQuery.ToStream<AggregateItem>();var aggregateDifference = from current in aggregateDifferenceSourceStreamfrom previous in aggregateDifferenceSourceStream
.AlterEventDuration(e => TimeSpan.MaxValue)
.ClipEventDuration(aggregateDifferenceSourceStream,
(e1, e2) => e1.DeviceId == e2.DeviceId)
.ShiftEventTime(e => TimeSpan.FromTicks(1))
where current.DeviceId == previous.DeviceIdselect new ItemDifference()
{
CurrentValue = current.Average,
DeviceId = current.DeviceId,
PreviousValue = previous.Average,
Difference = current.Average - previous.Average
};Query aggregateDifferenceQuery = CreateAndStartQuery(aggregateDifference, "AggregateDifference",
"Difference between two aggregate values",
EventShape.Point,
csvConfig, 

Note that the only difference here (in bold) is that we use convert the hoppingWindowQuery back into a stream and use that for our source. The difference, however, in the tree of query operators is telling though. I’ve outlined in red where the aggregate query source now is imported from the published query with the results of the aggregates.

diffFromAvg - DQC

The operators that are unique to this query – those that calculate the rate of change in the rolling average – are still here. The operators that are already necessary for a previous query, however, are not. This is a very simple example, to be sure, and any benefits probably wouldn’t be noticeable except at a very high volume. However, when you have more complex queries (as you would in the real world), the difference can be huge.

There is, however, one pretty big limitation that you need to be aware of when using DQC … you can’t use checkpoints (high availability) when your source is from a published query (DQC). So if you need this, then you need to very carefully plan how and where you use DQC, how you get your source data and how you compose your queries.

Comments are closed