About the author

J Sawyer is a developer based in Houston, TX who absolutely loves to write code. After spending 9 years at Microsoft, he moved on to other things and is currently the Lead Developer for the RealTime Data Management team at Logica US. He spends his days building Really Cool Things around StreamInsight and having a blast doing it.

He has been involved with HDNUG, one of the oldest and largest .NET-focused user groups in the US, since its inception in 2001 and has watched it grow from 5-10 technologists meeting around a conference table to a thriving community of over 5000 with regular meeting attendance averaging 100 attendees. He currently serves as the Vice President. You can join him at HDNUG on the second Thursday of every month at the Houston Microsoft office.

He also loves to ride his Yamaha FZ1. And sometimes his Ninja 650. And also his Honday XR-400 dirt bike. But he doesn't code and ride at the same time. That would be bad.

Specifying CTIs with LinqPad

January 11, 2012 1:44 AM

I’ve said it before and I’ll say it again … LinqPad is an essential tool for anyone doing StreamInsight applications. And don’t just settle for the free version but get the Pro version (at least) since it has Intellisense. I’m not ashamed to admit that I am completely, totally addicted to Intellisense (which I find somewhat amusing at times because it annoyed me to no end when it first came out in VB 5.0 – but then I got used to it and descended into my current addiction).

With that said (again), one thing that I’ve found a little … oh … less-than-perfect doesn’t have to do with LinqPad but with the way that all of the StreamInsight samples create the streams, which also happened to be how I was creating my streams. Until recently, that is. You see, AdvanceTimeSettings.IncreasingStartTime doesn’t always mirror how we are going to see data in the real world. It also doesn’t allow you to show how CTIs can be used to handle little issues like latency from the source data. To do that, you really need to specify your own CTIs so that you can control – and others can see - exactly where the CTI is issued in relation to the enqueued Insert events. You also can’t test/prototype query scenarios where you have multiple events with the same identifier in a single CTI span – or no events within a CTI span. Both of these scenarios can – and do – happen in the real world. But … and this depends on the adapter … you may want to handle CTI’s in your input adapter itself rather than relying on AdvanceTimeSettings. It turns out, however, that it’s really not that difficult.

Let’s start with how we typically do it. First, we have some source data as an array and a function to create our event timestamp. Then we create the point stream from the array using ToPointStream (or ToIntervalStream or ToEdgeStream). Here’s a code example and the results from LinqPad:

void Main()
{
    Func<int, DateTimeOffset> t = 
        (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s);
        
    var values = new []
    {
        new {Item="Variable1", Value=92, Timestamp=0},
        new {Item="Variable2", Value=60, Timestamp=0},
        new {Item="Variable1", Value=93, Timestamp=2},
        new {Item="Variable2", Value=75, Timestamp=2},
        new {Item="Variable1", Value=88, Timestamp=3},
        new {Item="Variable2", Value=81, Timestamp=3},
        new {Item="Variable1", Value=93, Timestamp=5},
        new {Item="Variable2", Value=82, Timestamp=5}
    };
    
    var valueStream = values.ToPointStream(Application, 
        e => PointEvent.CreateInsert(t(e.Timestamp), 
            new {Item = e.Item, Value = e.Value}),
            AdvanceTimeSettings.IncreasingStartTime); 
    
    valueStream.ToPointEnumerable().Dump("Results"); 
        
}

 

Results

IEnumerable<PointEvent<>> (13 items)

EventKind

StartTime

Payload

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable1, Value = 92 }

Item

Variable1

Value

92

Cti

1/11/2011 8:00:00 AM

null

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable2, Value = 60 }

Item

Variable2

Value

60

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:02 AM

null

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable2, Value = 75 }

Item

Variable2

Value

75

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable1, Value = 88 }

Item

Variable1

Value

88

Cti

1/11/2011 8:00:03 AM

null

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable2, Value = 81 }

Item

Variable2

Value

81

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:05 AM

null

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable2, Value = 82 }

Item

Variable2

Value

82

Cti

12/31/9999 11:59:59 PM

null

Most of the samples filter the CTIs out from the dump but I like to see them (always). Of course, since this post is about CTIs, we definitely need to see them. If you take a look at the results, you’ll see that the CTIs aren’t exactly where you might expect them to be. When you use IncreasingStartTime, the engine “watches” for a new start time to be enqueued with an event. It then enqueues a CTI with that new event’s start time. The next event – with the same start time – is in the next CTI span. So each CTI span has events with two different start times!

Let’s change it around a bit. There is an overload of ToPointStream that takes an AdvanceTimeSettings, which gives you more control over your CTIs. Changing the code around a bit, we certainly get different results:

AdvanceTimeSettings ats = new AdvanceTimeSettings(
    new AdvanceTimeGenerationSettings(
        TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(0)), null, AdvanceTimePolicy.Drop);

var
valueStream = values.ToPointStream(Application, e => PointEvent.CreateInsert(t(e.Timestamp), new {Item = e.Item, Value = e.Value}),ats, "Values");

 

Values

IEnumerable<PointEvent<>> (11 items)

EventKind

StartTime

Payload

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable1, Value = 92 }

Item

Variable1

Value

92

Cti

1/11/2011 8:00:00 AM

null

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable2, Value = 60 }

Item

Variable2

Value

60

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:02 AM

null

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable2, Value = 75 }

Item

Variable2

Value

75

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable2, Value = 81 }

Item

Variable2

Value

81

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable1, Value = 88 }

Item

Variable1

Value

88

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:05 AM

null

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable2, Value = 82 }

Item

Variable2

Value

82

 

It is different and it’s also close to what I want. But there are still events in there that have mixed start times within the same CTI window. But the example above isn’t quite fair … if I add a touch of a delay into the AdvanceTimeSettings begin to look more like what I expect. But … if you look above, we aren’t getting them every 2 seconds. We still have a patch of events with different start times. And – notice – they don’t come every 2 seconds like clockwork. Instead the CTIs are enqueued only after an event start time changed in the CTI. The only way to resolve it is to take complete control over the CTIs … so we add them into the source data. We don’t have to specify any AdvanceTimeGenerationSettings since, of course, we are enqueing manually. Which gives us the following code and output:

void Main()
{
    Func<int, DateTimeOffset> t = 
        (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s);
var values = new [] { new {Item="Variable2", Value=60, Timestamp=0}, new {Item="CTI", Value=60, Timestamp=0}, new {Item="Variable1", Value=93, Timestamp=2}, new {Item="Variable2", Value=75, Timestamp=2}, new {Item="Variable1", Value=88, Timestamp=3}, new {Item="Variable2", Value=81, Timestamp=3}, new {Item="CTI", Value=60, Timestamp=3}, new {Item="Variable1", Value=93, Timestamp=5}, new {Item="Variable2", Value=82, Timestamp=5}, new {Item="CTI", Value=60, Timestamp=5} }; var sourceData = values.ToPointStream(Application, e => e.Item != "CTI" ? PointEvent.CreateInsert(t(e.Timestamp), new Payload(){Item=e.Item, Value=e.Value}): PointEvent<Payload>.CreateCti(t(e.Timestamp).AddTicks(1))); sourceData.ToPointEnumerable().Dump("Results"); } // Define other methods and classes here public struct Payload{ public string Item; public int Value; }

Note that we aren’t using an anonymous type for the stream – we can’t. You’ll get a compile error if you do. Also, the method that we’re using isn’t very reusable and we’ll wind up writing the same thing over and over again and tweaked to whatever we did. Finally, I’m not really thrilled about the clarity. But we can kick this up a notch and use a fully reusable method that handles creating the events and can use anonymous types, thanks to the wonderful goodness that are lamdas. Check it out:

public static PointEvent<TPayload> GetPointEvent<TPayload, TSource>(
    TSource source, 
    Func<TSource, bool> ctiSelectExpression, 
    Func<TSource, TPayload> payloadSelectExpression, 
    Func<TSource, DateTimeOffset> eventTimeExpression)
    {
        bool isCti = ctiSelectExpression.Invoke(source); 
        if(isCti)
        {
            return PointEvent<TPayload>.CreateCti(eventTimeExpression.Invoke(source)); 
        }
        return PointEvent<TPayload>.CreateInsert(eventTimeExpression.Invoke(source), 
            payloadSelectExpression.Invoke(source)); 
    }

We can then use this in ToPointStream:

var sourceData = values.ToPointStream(Application, 
    e => GetPointEvent(e, 
        i=> i.Item == "CTI", 
        i=> new {Item=i.Item, Value = i.Value}, 
        i => t(i.Timestamp)));
The output is the same as the first method but, in this case, it is more reusable and I find it a touch simpler.

Tags:

StreamInsight | Code Sample

Cool StreamInsight query–Point input to Edge Output

November 8, 2011 5:30 PM

It’s easy to get individual events as points in StreamInsight. We can then take those events and write them directly out to our output adapter. There’s nothing to this. But … and this is a common scenario … what do we do if the values don’t change very often? Do we really want to write an hours worth of data that hasn’t changed, especially if we are reading the value every second or even several times a second? Probably not. And in many cases, we don’t need to. In many cases, in fact, we’re only interested in saving the values when they have changed. Of course, this can be done in StreamInsight through a little bit of query majick (or I wouldn’t be writing this blog entry, now would I?). So … what we are going to do here is to take a point stream, evaluate it, select only the items that have changed since the previous value and then send these to output as edge events. Each edge will have a start and end when the value changes. Simple enough, right?

As with the previous example, this was done using LinqPad. In this case, I borrowed (or stole, as the case may be) a Linq macro from one of the samples – the FoldPairs macro. I also borrowed/stole the ToSignal extension and turned that into a Linq macro from the StreamInsight blog. Both of these are going to be darn handy in this example.

public static CepStream<TResult> FoldPairs<TStream, TResult>(
    CepStream<TStream> input,
    Expression<Func<TStream, TStream, bool>> predicate,
    TimeSpan timeout,
    Expression<Func<TStream, TStream, TResult>> resultSelector)
{
    var signal = input
                    .AlterEventDuration(e => timeout)
                    .ClipEventDuration(input, (f, s) => predicate.Compile()(f, s));
                    
    return from l in signal.ShiftEventTime(e => TimeSpan.FromTicks(1))
           from r in input
           where predicate.Compile()(l, r)
           select resultSelector.Compile()(l, r);
}

public static CepStream<T> ToSignal<T, K>(CepStream<T> inputstream, Expression<Func<T, K>> keySelector)
{
    return inputstream
        .AlterEventDuration(e => TimeSpan.MaxValue)
        .ClipEventDuration(inputstream, (e1, e2) => (keySelector.Compile()(e1)).Equals(keySelector.Compile()(e2)));
}

And now for our data. I’ve added comments to the data to show where we have value changes. We’ll take this and convert it to a point stream for evaluation.

Func<int, int, DateTimeOffset> t = (h, m) => new DateTimeOffset(2011, 1, 25, 0, 0, 0, TimeSpan.Zero).AddHours(h).AddMinutes(m);

    var sourceData = new []
    {
        //Initial event @ 4:12
        new { SourceId = "A", Value = 22, TimeStamp = t(4, 12) },
        new { SourceId = "A", Value = 22, TimeStamp = t(4, 13) },
        new { SourceId = "A", Value = 22, TimeStamp = t(4, 14) },
        //A: New event @ 4:15
        new { SourceId = "A", Value = 67, TimeStamp = t(4, 15) },
        //A: New event @ 4:16
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 16) },
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 17) },
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 18) },
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 19) },
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 20) },
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 21) },
        //A: New event @ 4:22
        new { SourceId = "A", Value = 87, TimeStamp = t(4, 22) },
        //B: Initial Event @ 4:12
        new { SourceId = "B", Value = 24, TimeStamp = t(4, 12) },
        new { SourceId = "B", Value = 24, TimeStamp = t(4, 13) },
        //B: New Event @ 4:14
        new { SourceId = "B", Value = 31, TimeStamp = t(4, 14) },
        new { SourceId = "B", Value = 31, TimeStamp = t(4, 15) },
        new { SourceId = "B", Value = 31, TimeStamp = t(4, 16) },
        new { SourceId = "B", Value = 31, TimeStamp = t(4, 17) },
        new { SourceId = "B", Value = 31, TimeStamp = t(4, 18) },
        //B: New Event @ 4:19
        new { SourceId = "B", Value = 50, TimeStamp = t(4, 19) },
        new { SourceId = "B", Value = 50, TimeStamp = t(4, 20) },
        new { SourceId = "B", Value = 50, TimeStamp = t(4, 21) },
        new { SourceId = "B", Value = 50, TimeStamp = t(4, 22) }
    };
    
    var source = sourceData.OrderBy(e => e.TimeStamp).ToPointStream(
        Application,
        ev => PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), new { ev.SourceId, ev.Value }),
        AdvanceTimeSettings.IncreasingStartTime);

So far, pretty straightforward and nothing special. Next we need to calculate the change between two consecutive values. We do this using the borrowed FoldPairs macro. This will provide us with an anonymous type with our item identifier (SourceId) and the difference (delta) between two consecutive values.

var delta = FoldPairs(source,
                           (a, b) => a.SourceId == b.SourceId,
                           TimeSpan.MaxValue,
                           (a, b) => new { a.SourceId, diff = b.Value - a.Value });

Now that we have the deltas, it’s easy enough to join this back to the original source query, selecting only those source items where the delta is not equal to 0.

var changesOnly = from r in delta join s in source 
                    on r.SourceId equals s.SourceId 
                    where r.diff != 0
                    select s; 

So far so good. If you run what we have so far and write this to the output, you’ll see that you only get those point events that change since the previous value. But … you’ll also see that we’re missing something – our very first, initial event. That’s because no delta is calculated for this event as it has no previous event. Well, if we are going to be a real application, we can’t have the first one disappearing on us all the time. So now we need to get the first event – the event that has nothing preceding it. To do this, we use the ToSignal macro and convert our initial source stream to a signal stream … they become an interval for each individual reading (whether changed or not). We then shift the event time to create an overlap between one event and the next one. Where we don’t have an overlap, we have the very first point event – a left anti semi-join. We can then take this an union it with the stream of changed events, providing a stream with the changed events AND our first event.

var initialEvent = from s in source 
                    where (
                        from s2 in 
                        ToSignal(source, e=> e.SourceId)
                        .ShiftEventTime(e => TimeSpan.FromTicks(1))
                        where s.SourceId == s2.SourceId
                        select s2
                        ).IsEmpty()
                    select s; 

var final = changesOnly.Union(initialEvent); 

This, in itself, is actually useful … you may not want to take the next step and convert them into edge events as this query will give you a point event for each value change. But … we’re not quite done with the scenario that we want to accomplish. To create edge events with a start and end time that represent value changes, we simply us the ToSignal() linq macro on our final point stream. If you want these as intervals, you’ll get, in effect, the same data except that you won’t “see” the interval until the end time. If they are edge events, you’ll get a start as soon as the value changes and the end before the next change.

var signalEdges = ToSignal(final, e=> e.SourceId);
And … it really is just that simple … it also helps that the Linq macros really reduce the amount of linq query statements that we have to write. You could, if you wanted, also add dead zones … where you take this up a notch and only produce an event when the value changes by a certain amount or percentage. But I’ll leave that as an exercise for the ready. Can’t take all the fun out of it, can I?

 

Tags:

StreamInsight | Code Sample

Cool StreamInsight query–turning alarms into Edge events

November 4, 2011 5:04 PM

One of the use cases for StreamInsight is to detect certain alarm conditions on streaming real time data. This, in itself, isn’t all that special … many of the process control systems out there already do something like that. What makes StreamInsight different is that it can be a lot smarter about determining these alarm conditions. For example … one of the things that apparently happened (as I understand it) with Macondo (aka Deepwater Horizon) is that the PCS alarms went off constantly … they would trigger when 1 value was out of range every time it was out of range. So … there were so many false alerts that they simply turned the system off. It really isn’t all that unreasonable … many of these sensors will temporarily go out of range and it’s not indicative of a problem. In fact, it could just be a random bad reading or a transient condition that isn’t really a cause for alarm at all. However, if you start to have multiple values from the same sensor/device out of range within a particular time period, then you may really have an issue. You also don’t want to issue an alarm with every bad reading … but, instead, issue one at start and at finish. Because, with an alarm, you have a definite start of the event … but at the start, you have no idea when it will end. You also have a definite end to the event … but you know that only when things come back to normal. It’s a perfect fit for an edge event.

Part of this can be found in the StreamInsight samples for LinqPad – a most righteously awesome tool that every StreamInsight query developer should have and use. It’s in the “Alarm Floods and Transients” sample under 101 StreamInsight Queries, section 2_QueryPatterns. I will start with that and describe the queries step-by-step.

First … our scenario. We have a data stream with events that have a “Status” field. If this field is a “0”, it’s good, if “1” it’s bad. Now … in the real world, you’d get to this value somehow … through previous queries that do analytics or perhaps even from your data source. For our purposes, that is irrelevant. We’re interested in what we do with it. Now, in our case, we can actually get false alerts (of course) so we want to trigger an alarm only when we get multiple alerts within a specific time frame. We then want the alarm expressed as an edge event. Finally, if we have an alarm that crosses a specific amount of time, we want to repeat the alarm. We’ll have several steps to do this. First, here’s our source data; I added a little to what was there. (Notes: These are all set up to run in LinqPad. Also, we don’t differentiate the events by any sort of ID … you’ll need to do this in a real app).

var sourceData = new []
{
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:10:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:11:00 PM") },
    //False alert @ 4:12
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:12:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:13:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:14:00 PM") },
    //Real alert @ 4:15
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:15:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:16:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:18:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:19:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:20:00 PM") },
    //Real alert @ 4:21 - Longer alert that repeats
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:21:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:22:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:23:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:24:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:25:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:26:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:27:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:28:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:29:00 PM") },
    //False alert @ 4:30
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:30:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:31:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:32:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:33:00 PM") },
};

var source = sourceData.ToPointStream(Application, ev => 
    PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), ev),
    AdvanceTimeSettings.StrictlyIncreasingStartTime);

var timeout = TimeSpan.FromMinutes(2);
var alarmTimeout = TimeSpan.FromMinutes(5); 

Now that we have our source data and variables, let’s create two streams, one with valid items and one with all alerts:

var validData = source.Where(e => e.Status == 0); 
var alarmEvents = source.Where(e => e.Status == 1);

Next, we need to remove the false alerts … alerts that aren’t followed by another alert. We do this by taking the valid items, moving the start time back by the timeout and then extending the event duration by the timeout. If there is a successful join then there is a “good” event within the timeout. In that case, we filter the alarm out using a Left Anti-Semi Join.

// take all alarm events that are not followed by a non-alarm event
// within the timeout
var nonTransientAlarms = from alarm in alarmEvents
             where (from nextevent in source
                        .AlterEventLifetime(
                            e => e.StartTime.Subtract(timeout),
                            e => timeout)
                    where nextevent.Status == 0
                    select nextevent).IsEmpty()
             select alarm;

//Show the Non-transient alarms
(from p in nonTransientAlarms.ToIntervalEnumerable()
 where p.EventKind == EventKind.Insert
 select p).Dump("Non-transient alarms");

The output shows all of the alarm events that do not have a “good” event within the timeout. You’ll notice that there is a flood of events … we’ll need to filter this out so that we have the initial alarm event.

// Expand all alarm events to the timeout and count over snapshots
var counts = from win in nonTransientAlarms
                 .Where(e => e.Status == 1)
                 .AlterEventDuration(e => timeout2)
                 .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
             select new { count = win.Count() };

// Those snapshots with a count of 1 belong to the initial alarms.
// reduce to points and join with original stream.
var initialAlarm = from c in counts
                 .Where(e => e.count == 1)
                 .ToPointEventStream()
             from e in source
             select e;

So far, we’ve not done much that’s not in the original sample. Now we go off that path and get these changed into Edges that represent the start and end of the actual alarm. We start by turning the valid data into a signal. We will also extend our initial alarms to the alarm timeout. From there, doing a LASJ again with the valid data signal will provide us with edge events that start and end with the alarm. Because we have 1 alarm that extends past the alarm timeout, this will generate 2 edge events. Together, these edge events will cover the entire alarm timeframe:

var validDataSignal = from v in validData
                                .AlterEventDuration(e=>TimeSpan.MaxValue)
                                .ClipEventDuration(initialAlarm,(e1, e2) => true)
                      select v; 
//Since with have the initial alarms, how long does the alarm last? 
var alarmTimeline = from i in initialAlarm.AlterEventDuration(e=> alarmTimeout)
                    where ( from v in validDataSignal select v).IsEmpty()
                    select i; 
                    

(from p in alarmTimeline.ToEdgeEnumerable()
 where p.EventKind == EventKind.Insert
 select p).Dump("Alarm timeframe");

And that should do it …

Tags:

Code Sample | StreamInsight

Baton Rouge Sql Saturday Content

August 7, 2011 5:46 PM

I’ve just posted this to the SQL Saturday web site. All of the content that was presented is there, including the PowerPoint and the code. There’s quite a bit of stuff in the code that we simply didn’t have time to show. There are two text files in there with several different query patterns that you can copy and paste into place. This sample does require StreamInsight 1.2, so make sure that you have at least the evaluation version installed.

Tags:

Code Sample | Community | Events | StreamInsight

StreamInsight 1.2: Extension Method for Perf Counters

August 5, 2011 12:01 PM

One of the new features in StreamInsight 1.2 is performance counters. These give you a very robust and easy way to test and monitor performance of your StreamInsight application and were much needed. The counters are enabled at the StreamInsight Server and Application level all the time but you need to specify and enable the individual queries that you want monitored as well. Enabling a query for performance counters will also hook up both the input and the output adapters, which help you understand if you have bottlenecks or poor performance in them as well – though there is a performance hit (though relatively minimal) for this. Very cool stuff. But … like the query state … it’s a bit warty (IMHO) in some of the details. Fortunately, it’s nothing that we can’t make at least a little better through the beauty that is extension methods. I’ll go into more detail on the perf counters in a future post but I wanted to share this little piece of code to help y’all get to using these a bit quicker.

/// <summary>
/// Enables and disables performance counters for a query. 
/// </summary>
/// <param name="query"></param>
/// <param name="enabled">true to enable, false to disable.</param>
public static void SetPerformanceCounters(this Query query, bool enabled)
{
    
    Uri queryUri = query.Name;

    DiagnosticSettings settings = query.Application.Server.GetDiagnosticSettings(
                        queryUri);
    if (enabled)
    {
        settings.Aspects |= DiagnosticAspect.PerformanceCounters;
    }
    else
    {
        settings.Aspects &= DiagnosticAspect.PerformanceCounters;
    }

    query.Application.Server.SetDiagnosticSettings(queryUri, settings);

}

Tags:

StreamInsight | Code Sample