About the author

J Sawyer is a developer based in Houston, TX who absolutely loves to write code. After spending 9 years at Microsoft, he moved on to other things and is currently the Lead Developer for the RealTime Data Management team at Logica US. He spends his days building Really Cool Things around StreamInsight and having a blast doing it.

He has been involved with HDNUG, one of the oldest and largest .NET-focused user groups in the US, since its inception in 2001 and has watched it grow from 5-10 technologists meeting around a conference table to a thriving community of over 5000 with regular meeting attendance averaging 100 attendees. He currently serves as the Vice President. You can join him at HDNUG on the second Thursday of every month at the Houston Microsoft office.

He also loves to ride his Yamaha FZ1. And sometimes his Ninja 650. And also his Honday XR-400 dirt bike. But he doesn't code and ride at the same time. That would be bad.

Specifying CTIs with LinqPad

January 11, 2012 1:44 AM

I’ve said it before and I’ll say it again … LinqPad is an essential tool for anyone doing StreamInsight applications. And don’t just settle for the free version but get the Pro version (at least) since it has Intellisense. I’m not ashamed to admit that I am completely, totally addicted to Intellisense (which I find somewhat amusing at times because it annoyed me to no end when it first came out in VB 5.0 – but then I got used to it and descended into my current addiction).

With that said (again), one thing that I’ve found a little … oh … less-than-perfect doesn’t have to do with LinqPad but with the way that all of the StreamInsight samples create the streams, which also happened to be how I was creating my streams. Until recently, that is. You see, AdvanceTimeSettings.IncreasingStartTime doesn’t always mirror how we are going to see data in the real world. It also doesn’t allow you to show how CTIs can be used to handle little issues like latency from the source data. To do that, you really need to specify your own CTIs so that you can control – and others can see - exactly where the CTI is issued in relation to the enqueued Insert events. You also can’t test/prototype query scenarios where you have multiple events with the same identifier in a single CTI span – or no events within a CTI span. Both of these scenarios can – and do – happen in the real world. But … and this depends on the adapter … you may want to handle CTI’s in your input adapter itself rather than relying on AdvanceTimeSettings. It turns out, however, that it’s really not that difficult.

Let’s start with how we typically do it. First, we have some source data as an array and a function to create our event timestamp. Then we create the point stream from the array using ToPointStream (or ToIntervalStream or ToEdgeStream). Here’s a code example and the results from LinqPad:

void Main()
{
    Func<int, DateTimeOffset> t = 
        (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s);
        
    var values = new []
    {
        new {Item="Variable1", Value=92, Timestamp=0},
        new {Item="Variable2", Value=60, Timestamp=0},
        new {Item="Variable1", Value=93, Timestamp=2},
        new {Item="Variable2", Value=75, Timestamp=2},
        new {Item="Variable1", Value=88, Timestamp=3},
        new {Item="Variable2", Value=81, Timestamp=3},
        new {Item="Variable1", Value=93, Timestamp=5},
        new {Item="Variable2", Value=82, Timestamp=5}
    };
    
    var valueStream = values.ToPointStream(Application, 
        e => PointEvent.CreateInsert(t(e.Timestamp), 
            new {Item = e.Item, Value = e.Value}),
            AdvanceTimeSettings.IncreasingStartTime); 
    
    valueStream.ToPointEnumerable().Dump("Results"); 
        
}

 

Results

IEnumerable<PointEvent<>> (13 items)

EventKind

StartTime

Payload

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable1, Value = 92 }

Item

Variable1

Value

92

Cti

1/11/2011 8:00:00 AM

null

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable2, Value = 60 }

Item

Variable2

Value

60

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:02 AM

null

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable2, Value = 75 }

Item

Variable2

Value

75

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable1, Value = 88 }

Item

Variable1

Value

88

Cti

1/11/2011 8:00:03 AM

null

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable2, Value = 81 }

Item

Variable2

Value

81

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:05 AM

null

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable2, Value = 82 }

Item

Variable2

Value

82

Cti

12/31/9999 11:59:59 PM

null

Most of the samples filter the CTIs out from the dump but I like to see them (always). Of course, since this post is about CTIs, we definitely need to see them. If you take a look at the results, you’ll see that the CTIs aren’t exactly where you might expect them to be. When you use IncreasingStartTime, the engine “watches” for a new start time to be enqueued with an event. It then enqueues a CTI with that new event’s start time. The next event – with the same start time – is in the next CTI span. So each CTI span has events with two different start times!

Let’s change it around a bit. There is an overload of ToPointStream that takes an AdvanceTimeSettings, which gives you more control over your CTIs. Changing the code around a bit, we certainly get different results:

AdvanceTimeSettings ats = new AdvanceTimeSettings(
    new AdvanceTimeGenerationSettings(
        TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(0)), null, AdvanceTimePolicy.Drop);

var
valueStream = values.ToPointStream(Application, e => PointEvent.CreateInsert(t(e.Timestamp), new {Item = e.Item, Value = e.Value}),ats, "Values");

 

Values

IEnumerable<PointEvent<>> (11 items)

EventKind

StartTime

Payload

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable1, Value = 92 }

Item

Variable1

Value

92

Cti

1/11/2011 8:00:00 AM

null

Insert

1/11/2011 8:00:00 AM

ø

{ Item = Variable2, Value = 60 }

Item

Variable2

Value

60

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:02 AM

null

Insert

1/11/2011 8:00:02 AM

ø

{ Item = Variable2, Value = 75 }

Item

Variable2

Value

75

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable2, Value = 81 }

Item

Variable2

Value

81

Insert

1/11/2011 8:00:03 AM

ø

{ Item = Variable1, Value = 88 }

Item

Variable1

Value

88

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable1, Value = 93 }

Item

Variable1

Value

93

Cti

1/11/2011 8:00:05 AM

null

Insert

1/11/2011 8:00:05 AM

ø

{ Item = Variable2, Value = 82 }

Item

Variable2

Value

82

 

It is different and it’s also close to what I want. But there are still events in there that have mixed start times within the same CTI window. But the example above isn’t quite fair … if I add a touch of a delay into the AdvanceTimeSettings begin to look more like what I expect. But … if you look above, we aren’t getting them every 2 seconds. We still have a patch of events with different start times. And – notice – they don’t come every 2 seconds like clockwork. Instead the CTIs are enqueued only after an event start time changed in the CTI. The only way to resolve it is to take complete control over the CTIs … so we add them into the source data. We don’t have to specify any AdvanceTimeGenerationSettings since, of course, we are enqueing manually. Which gives us the following code and output:

void Main()
{
    Func<int, DateTimeOffset> t = 
        (s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddSeconds(s);
var values = new [] { new {Item="Variable2", Value=60, Timestamp=0}, new {Item="CTI", Value=60, Timestamp=0}, new {Item="Variable1", Value=93, Timestamp=2}, new {Item="Variable2", Value=75, Timestamp=2}, new {Item="Variable1", Value=88, Timestamp=3}, new {Item="Variable2", Value=81, Timestamp=3}, new {Item="CTI", Value=60, Timestamp=3}, new {Item="Variable1", Value=93, Timestamp=5}, new {Item="Variable2", Value=82, Timestamp=5}, new {Item="CTI", Value=60, Timestamp=5} }; var sourceData = values.ToPointStream(Application, e => e.Item != "CTI" ? PointEvent.CreateInsert(t(e.Timestamp), new Payload(){Item=e.Item, Value=e.Value}): PointEvent<Payload>.CreateCti(t(e.Timestamp).AddTicks(1))); sourceData.ToPointEnumerable().Dump("Results"); } // Define other methods and classes here public struct Payload{ public string Item; public int Value; }

Note that we aren’t using an anonymous type for the stream – we can’t. You’ll get a compile error if you do. Also, the method that we’re using isn’t very reusable and we’ll wind up writing the same thing over and over again and tweaked to whatever we did. Finally, I’m not really thrilled about the clarity. But we can kick this up a notch and use a fully reusable method that handles creating the events and can use anonymous types, thanks to the wonderful goodness that are lamdas. Check it out:

public static PointEvent<TPayload> GetPointEvent<TPayload, TSource>(
    TSource source, 
    Func<TSource, bool> ctiSelectExpression, 
    Func<TSource, TPayload> payloadSelectExpression, 
    Func<TSource, DateTimeOffset> eventTimeExpression)
    {
        bool isCti = ctiSelectExpression.Invoke(source); 
        if(isCti)
        {
            return PointEvent<TPayload>.CreateCti(eventTimeExpression.Invoke(source)); 
        }
        return PointEvent<TPayload>.CreateInsert(eventTimeExpression.Invoke(source), 
            payloadSelectExpression.Invoke(source)); 
    }

We can then use this in ToPointStream:

var sourceData = values.ToPointStream(Application, 
    e => GetPointEvent(e, 
        i=> i.Item == "CTI", 
        i=> new {Item=i.Item, Value = i.Value}, 
        i => t(i.Timestamp)));
The output is the same as the first method but, in this case, it is more reusable and I find it a touch simpler.

Tags:

StreamInsight | Code Sample

Output Adapter –> Input Adapter Communications : Follow up

December 22, 2011 12:41 PM

Just a quite note to follow up on my previous post on Output Adapter –> Input Adapter Communications : Event Shapes – specifically about Edge output to Edge input scenarios. While this scenario works just fine in an ideal world, we all know that we don’t live in an ideal world. Instead, there are potential communication breakdowns between the StreamInsight servers from things like reboots (for whatever reason … like Windows Updates), network outages or – if you are using a unreliable protocol like UDP – dropped packets and messages. In an edge-to-edge scenario, it is possible for the hub StreamInsight server to get a start edge … but never an end. In this case, you have an event that is in the engine and participating in analysis, joins, unions, aggregates, etc. that is no longer valid. But … since the end never “came in”, you have no way of knowing that the event is no longer valid and its end date is, essentially, the end of time. Over a long-running process, this can build if you have several starts without a corresponding end. On the other end of the spectrum, you could get an end event without a corresponding start. StreamInsight won’t let you enqueue such a beastie – it will raise an exception – but the problem is deeper than that. As with the never-ending start, you’ll have data consistency issues. In this case, you rather than having an event that is part of your analysis, you are missing an event that should be a part of your analysis. Again, the result is that you have inconsistent output.

Some of this … particularly issues with reboots … can be handled, to some extent, with checkpointing and adapters that understand and properly handle high water marks. But there’s nothing that you can do about communications outages or dropped/undelivered packets.

Translating these incoming events to points simplifies these issues but doesn’t completely resolve all of them. If you enqueue a point on start, you can use the ToSignal() macro that’s in the LinqPad samples with a timeout of TimeSpan.MaxValue to get the same effect (Edge Start/End) in your output. And, while you can still have events living longer than they should, they will only live until you get an updated value for the item rather than living forever, which minimizes the impact and prevents orphaned starts from building up. Whether you are enqueuing only starts or only ends, you still may miss some events but that is a potential problem regardless of your event shapes.

So … the edge output to edge input scenario isn’t quite as simple as it appeared at first blush. In a test/lab scenario, it will usually work just fine, especially when following the “happy path”. However, there are other challenges that come into play in a real-world scenario where things go wrong and, with these challenges in mind, the edge-to-edge scenario is more challenging. At the end of it, point inputs, regardless of the source event shape, provide the simplest use case and present only those challenges that are due to the very nature of a distributed system. Using something like MSMQ for the transport would resolve a lot of this as well … but it comes at a (pretty significant) cost of throughput and latency.

Tags:

StreamInsight

Output Adapter –> Input Adapter Communications : Event Shapes

November 23, 2011 1:56 PM

One use case for StreamInsight in an enterprise environment is to have a hub and spoke architecture. In such an architecture, you would have multiple downstream StreamInsight instances (the spokes) that sit close to the data source to do event detection and processing on very high speed data that, very simply, can’t be sent to a centralized server due to network latency and limited bandwidth. This server would also downsample and (likely) filter data that is of interest to a centralized operations center where there is an aggregating StreamInsight server (the hub). In the case of an “interesting” event or set of events, additional information can be “turned on” at the downstream server to add to the existing feed, thereby optimizing the use of limited bandwidth while preserving the ability to view and collect data that is of critical interest. The aggregating StreamInsight server can then provide end-user interfaces with data as well as do additional aggregated analysis across all of the spokes.

Two example use cases for this:

Oil & Gas Production: In O&G, you would have a downstream StreamInsight server that sits at the drill site, whether that be an offshore rig or an onshore platform. In both of these scenarios, it is entirely likely that there are very limited pipes back to a central operations center. Furthermore, since these installations can have thousands or tens of thousands of sensors, not all of this information will be useful – or even desirable – in the onshore operations center but still needs to be processed and analyzed onsite. Aggregated, filtered data and even calculated data based on the raw sensor feeds would be forwarded to a central StreamInsight server back “on the beach” for analysis across platforms and monitoring. Cross-platform aggregation would be interesting and useful when multiple platforms are using the same pipelines to make sure that there is capacity in the pipeline for current production as well as to optimize capacity usage.

Utilities – SmartGrid: This scenario provides an even better use case for this kind of architecture. A single utility company will have millions of smart meters installed across their service area. Utility companies are also installing “smart transformers” that provide data related to transformer performance. StreamInsight may be fast (it is) and may be able to handle a lot of data at high frequency (it can) but it can only do so much. Having a single StreamInsight server processing the data from all of a utlity company’s smart meters and transformers simply isn’t realistic. Like the O&G scenario above, downstream StreamInsight servers would collect information from individual meters and transformers, downsample and aggregate and then send to a centralized server. In fact, with utilities, there may be a couple of layers of this, depending on the size of the service area and utility provider. Initial aggregate (at the source) by substation that is then fed to the hub server would be useful and interesting. From there, aggregation across substations can provide the information required to ensure that there is enough capacity on the grid for current usage as well as optimize the grid’s current capacity so there isn’t too much capacity that isn’t required. Aggregation by substation – or, for example, zip code – can help utility providers optimize and target certain areas for rolling brown/blackouts when necessary with the minimum impact required to keep the grid balanced.

My team has developed some adapters that are specifically designed for and intended to be used in these scenarios. As we’ve been doing this, we’ve also had a lot of discussion around how these should work, especially in cases where the inbound stream has a different shape than the target input adapter (e.g. Edge output –> Point Input). How are these translated from one to the other? What kind of event shapes are actually valid in these scenarios? Here’s what we’ve come up with:

Output (Source) Adapter Shape Input (Target) Adapter Shape Valid Use Case Comments
Edge Edge Yes Events should be enqueued as they arrive. No translation should be done unless the Start Edge time is before the last issued CTI.
Edge Interval No Typically, this is invalid. With an inbound interval from a downstream StreamInsight server, the start time is just about guaranteed to be before the last issued CTI. Because interval events aren’t released to the output adapter until the end time, it is also possible for there to be different start times in one inbound package.
Edge Point Yes The point input adapter should only enqueue the End Edge with an event timestamp equal to the EndTime of the end edge. If the end time is before the last issued CTI,
Alternatively, you could enqueue a point at both the start and the end or just the start, using the corresponding timestamp. If these alternatives are required use cases, it should be configurable.
Interval Edge No The only time this would be possible is to enqueue on the end edge event as it is only then that the end time (and total interval) is known. Since the start time is virtually guaranteed to be after the last issued CTI and since it is likely that different edge events that arrive together also have completely different start times, it is impossible to enqueue them correctly in the application timeline.
We have determined, therefore, that this is an invalid use case.
Interval Interval No While it seems that there would be no translation required, that is not the case. As with the edge target above, the start time of the interval is virtually guaranteed to be before the last issued CTI.
We have determined, therefore, that this an invalid use case.
Interval Point Yes Again, the start time is virtually guaranteed to be before the last issued CTI and start times in the same “group” will likely have different start times. Therefore, the point should be enqueued with the End Time of the inbound interval event.
Point Point Yes No translation necessary. Enqueue the point with the original timestamp.
Point Interval No While possible, it doesn’t really make much sense. You could enqueue the interval with an end time 1 tick past the start time but what would be the point?
Point Edge No Just like with the interval target above, there isn’t a very good way to handle this that makes sense logically. Having a start edge and then an end edge with a 1 tick difference between start and end time defeats the purpose of an edge event.

As you can see above, it’s not as straightforward as one would initially think. Because of their very nature, interval events are particularly problematic when coming in from a downstream StreamInsight server because their start times are typically in the past according to application time. Point events as a destination are universally valid use cases.

What about CTI’s? As I’m sure you are aware, CTI’s advance application time and are not necessarily based on the system clock - though they certainly can be if that is desired. In the use cases above, they would be at the source, downstream servers and the destination, upstream server would advance application time based on the source servers. Due to latency, this would like be a touch behind the system clock. Depending on the protocol used, it is possible that events may arrive out-of-order so, in some cases, this CTI should have a configurable time span to account for this. You may need to issue a CTI for, say 10:00:00 at 10:00:02 – but using the 10:00:00 timestamp. This will also ensure that any queries that span multiple downstream, input servers are synchronized. One thing to also note with all of the timestamps – it may be necessary, in some cases, to account for differences in the source server application clock with the target server application clock. This can happen in cases where the source application clock is advanced according to the source system clock and and the application clocks across the source servers aren’t fully synchronized. In both the O&G and Utilities scenarios above, this is an entirely possible use case. If that is the case, the target adapter will need to do any translation necessary between the systems – the target, upstream server will need to do this to fully and accurately coordinate the clocks between the multiple downstream servers.

Tags:

StreamInsight

Cool StreamInsight query–Point input to Edge Output

November 8, 2011 5:30 PM

It’s easy to get individual events as points in StreamInsight. We can then take those events and write them directly out to our output adapter. There’s nothing to this. But … and this is a common scenario … what do we do if the values don’t change very often? Do we really want to write an hours worth of data that hasn’t changed, especially if we are reading the value every second or even several times a second? Probably not. And in many cases, we don’t need to. In many cases, in fact, we’re only interested in saving the values when they have changed. Of course, this can be done in StreamInsight through a little bit of query majick (or I wouldn’t be writing this blog entry, now would I?). So … what we are going to do here is to take a point stream, evaluate it, select only the items that have changed since the previous value and then send these to output as edge events. Each edge will have a start and end when the value changes. Simple enough, right?

As with the previous example, this was done using LinqPad. In this case, I borrowed (or stole, as the case may be) a Linq macro from one of the samples – the FoldPairs macro. I also borrowed/stole the ToSignal extension and turned that into a Linq macro from the StreamInsight blog. Both of these are going to be darn handy in this example.

public static CepStream<TResult> FoldPairs<TStream, TResult>(
    CepStream<TStream> input,
    Expression<Func<TStream, TStream, bool>> predicate,
    TimeSpan timeout,
    Expression<Func<TStream, TStream, TResult>> resultSelector)
{
    var signal = input
                    .AlterEventDuration(e => timeout)
                    .ClipEventDuration(input, (f, s) => predicate.Compile()(f, s));
                    
    return from l in signal.ShiftEventTime(e => TimeSpan.FromTicks(1))
           from r in input
           where predicate.Compile()(l, r)
           select resultSelector.Compile()(l, r);
}

public static CepStream<T> ToSignal<T, K>(CepStream<T> inputstream, Expression<Func<T, K>> keySelector)
{
    return inputstream
        .AlterEventDuration(e => TimeSpan.MaxValue)
        .ClipEventDuration(inputstream, (e1, e2) => (keySelector.Compile()(e1)).Equals(keySelector.Compile()(e2)));
}

And now for our data. I’ve added comments to the data to show where we have value changes. We’ll take this and convert it to a point stream for evaluation.

Func<int, int, DateTimeOffset> t = (h, m) => new DateTimeOffset(2011, 1, 25, 0, 0, 0, TimeSpan.Zero).AddHours(h).AddMinutes(m);

    var sourceData = new []
    {
        //Initial event @ 4:12
        new { SourceId = "A", Value = 22, TimeStamp = t(4, 12) },
        new { SourceId = "A", Value = 22, TimeStamp = t(4, 13) },
        new { SourceId = "A", Value = 22, TimeStamp = t(4, 14) },
        //A: New event @ 4:15
        new { SourceId = "A", Value = 67, TimeStamp = t(4, 15) },
        //A: New event @ 4:16
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 16) },
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 17) },
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 18) },
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 19) },
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 20) },
        new { SourceId = "A", Value = 54, TimeStamp = t(4, 21) },
        //A: New event @ 4:22
        new { SourceId = "A", Value = 87, TimeStamp = t(4, 22) },
        //B: Initial Event @ 4:12
        new { SourceId = "B", Value = 24, TimeStamp = t(4, 12) },
        new { SourceId = "B", Value = 24, TimeStamp = t(4, 13) },
        //B: New Event @ 4:14
        new { SourceId = "B", Value = 31, TimeStamp = t(4, 14) },
        new { SourceId = "B", Value = 31, TimeStamp = t(4, 15) },
        new { SourceId = "B", Value = 31, TimeStamp = t(4, 16) },
        new { SourceId = "B", Value = 31, TimeStamp = t(4, 17) },
        new { SourceId = "B", Value = 31, TimeStamp = t(4, 18) },
        //B: New Event @ 4:19
        new { SourceId = "B", Value = 50, TimeStamp = t(4, 19) },
        new { SourceId = "B", Value = 50, TimeStamp = t(4, 20) },
        new { SourceId = "B", Value = 50, TimeStamp = t(4, 21) },
        new { SourceId = "B", Value = 50, TimeStamp = t(4, 22) }
    };
    
    var source = sourceData.OrderBy(e => e.TimeStamp).ToPointStream(
        Application,
        ev => PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), new { ev.SourceId, ev.Value }),
        AdvanceTimeSettings.IncreasingStartTime);

So far, pretty straightforward and nothing special. Next we need to calculate the change between two consecutive values. We do this using the borrowed FoldPairs macro. This will provide us with an anonymous type with our item identifier (SourceId) and the difference (delta) between two consecutive values.

var delta = FoldPairs(source,
                           (a, b) => a.SourceId == b.SourceId,
                           TimeSpan.MaxValue,
                           (a, b) => new { a.SourceId, diff = b.Value - a.Value });

Now that we have the deltas, it’s easy enough to join this back to the original source query, selecting only those source items where the delta is not equal to 0.

var changesOnly = from r in delta join s in source 
                    on r.SourceId equals s.SourceId 
                    where r.diff != 0
                    select s; 

So far so good. If you run what we have so far and write this to the output, you’ll see that you only get those point events that change since the previous value. But … you’ll also see that we’re missing something – our very first, initial event. That’s because no delta is calculated for this event as it has no previous event. Well, if we are going to be a real application, we can’t have the first one disappearing on us all the time. So now we need to get the first event – the event that has nothing preceding it. To do this, we use the ToSignal macro and convert our initial source stream to a signal stream … they become an interval for each individual reading (whether changed or not). We then shift the event time to create an overlap between one event and the next one. Where we don’t have an overlap, we have the very first point event – a left anti semi-join. We can then take this an union it with the stream of changed events, providing a stream with the changed events AND our first event.

var initialEvent = from s in source 
                    where (
                        from s2 in 
                        ToSignal(source, e=> e.SourceId)
                        .ShiftEventTime(e => TimeSpan.FromTicks(1))
                        where s.SourceId == s2.SourceId
                        select s2
                        ).IsEmpty()
                    select s; 

var final = changesOnly.Union(initialEvent); 

This, in itself, is actually useful … you may not want to take the next step and convert them into edge events as this query will give you a point event for each value change. But … we’re not quite done with the scenario that we want to accomplish. To create edge events with a start and end time that represent value changes, we simply us the ToSignal() linq macro on our final point stream. If you want these as intervals, you’ll get, in effect, the same data except that you won’t “see” the interval until the end time. If they are edge events, you’ll get a start as soon as the value changes and the end before the next change.

var signalEdges = ToSignal(final, e=> e.SourceId);
And … it really is just that simple … it also helps that the Linq macros really reduce the amount of linq query statements that we have to write. You could, if you wanted, also add dead zones … where you take this up a notch and only produce an event when the value changes by a certain amount or percentage. But I’ll leave that as an exercise for the ready. Can’t take all the fun out of it, can I?

 

Tags:

StreamInsight | Code Sample

Cool StreamInsight query–turning alarms into Edge events

November 4, 2011 5:04 PM

One of the use cases for StreamInsight is to detect certain alarm conditions on streaming real time data. This, in itself, isn’t all that special … many of the process control systems out there already do something like that. What makes StreamInsight different is that it can be a lot smarter about determining these alarm conditions. For example … one of the things that apparently happened (as I understand it) with Macondo (aka Deepwater Horizon) is that the PCS alarms went off constantly … they would trigger when 1 value was out of range every time it was out of range. So … there were so many false alerts that they simply turned the system off. It really isn’t all that unreasonable … many of these sensors will temporarily go out of range and it’s not indicative of a problem. In fact, it could just be a random bad reading or a transient condition that isn’t really a cause for alarm at all. However, if you start to have multiple values from the same sensor/device out of range within a particular time period, then you may really have an issue. You also don’t want to issue an alarm with every bad reading … but, instead, issue one at start and at finish. Because, with an alarm, you have a definite start of the event … but at the start, you have no idea when it will end. You also have a definite end to the event … but you know that only when things come back to normal. It’s a perfect fit for an edge event.

Part of this can be found in the StreamInsight samples for LinqPad – a most righteously awesome tool that every StreamInsight query developer should have and use. It’s in the “Alarm Floods and Transients” sample under 101 StreamInsight Queries, section 2_QueryPatterns. I will start with that and describe the queries step-by-step.

First … our scenario. We have a data stream with events that have a “Status” field. If this field is a “0”, it’s good, if “1” it’s bad. Now … in the real world, you’d get to this value somehow … through previous queries that do analytics or perhaps even from your data source. For our purposes, that is irrelevant. We’re interested in what we do with it. Now, in our case, we can actually get false alerts (of course) so we want to trigger an alarm only when we get multiple alerts within a specific time frame. We then want the alarm expressed as an edge event. Finally, if we have an alarm that crosses a specific amount of time, we want to repeat the alarm. We’ll have several steps to do this. First, here’s our source data; I added a little to what was there. (Notes: These are all set up to run in LinqPad. Also, we don’t differentiate the events by any sort of ID … you’ll need to do this in a real app).

var sourceData = new []
{
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:10:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:11:00 PM") },
    //False alert @ 4:12
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:12:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:13:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:14:00 PM") },
    //Real alert @ 4:15
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:15:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:16:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:18:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:19:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:20:00 PM") },
    //Real alert @ 4:21 - Longer alert that repeats
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:21:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:22:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:23:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:24:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:25:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:26:00 PM") },
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:27:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:28:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:29:00 PM") },
    //False alert @ 4:30
    new { Status = 1, TimeStamp = DateTime.Parse("10/23/2009 4:30:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:31:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:32:00 PM") },
    new { Status = 0, TimeStamp = DateTime.Parse("10/23/2009 4:33:00 PM") },
};

var source = sourceData.ToPointStream(Application, ev => 
    PointEvent.CreateInsert(ev.TimeStamp.ToLocalTime(), ev),
    AdvanceTimeSettings.StrictlyIncreasingStartTime);

var timeout = TimeSpan.FromMinutes(2);
var alarmTimeout = TimeSpan.FromMinutes(5); 

Now that we have our source data and variables, let’s create two streams, one with valid items and one with all alerts:

var validData = source.Where(e => e.Status == 0); 
var alarmEvents = source.Where(e => e.Status == 1);

Next, we need to remove the false alerts … alerts that aren’t followed by another alert. We do this by taking the valid items, moving the start time back by the timeout and then extending the event duration by the timeout. If there is a successful join then there is a “good” event within the timeout. In that case, we filter the alarm out using a Left Anti-Semi Join.

// take all alarm events that are not followed by a non-alarm event
// within the timeout
var nonTransientAlarms = from alarm in alarmEvents
             where (from nextevent in source
                        .AlterEventLifetime(
                            e => e.StartTime.Subtract(timeout),
                            e => timeout)
                    where nextevent.Status == 0
                    select nextevent).IsEmpty()
             select alarm;

//Show the Non-transient alarms
(from p in nonTransientAlarms.ToIntervalEnumerable()
 where p.EventKind == EventKind.Insert
 select p).Dump("Non-transient alarms");

The output shows all of the alarm events that do not have a “good” event within the timeout. You’ll notice that there is a flood of events … we’ll need to filter this out so that we have the initial alarm event.

// Expand all alarm events to the timeout and count over snapshots
var counts = from win in nonTransientAlarms
                 .Where(e => e.Status == 1)
                 .AlterEventDuration(e => timeout2)
                 .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
             select new { count = win.Count() };

// Those snapshots with a count of 1 belong to the initial alarms.
// reduce to points and join with original stream.
var initialAlarm = from c in counts
                 .Where(e => e.count == 1)
                 .ToPointEventStream()
             from e in source
             select e;

So far, we’ve not done much that’s not in the original sample. Now we go off that path and get these changed into Edges that represent the start and end of the actual alarm. We start by turning the valid data into a signal. We will also extend our initial alarms to the alarm timeout. From there, doing a LASJ again with the valid data signal will provide us with edge events that start and end with the alarm. Because we have 1 alarm that extends past the alarm timeout, this will generate 2 edge events. Together, these edge events will cover the entire alarm timeframe:

var validDataSignal = from v in validData
                                .AlterEventDuration(e=>TimeSpan.MaxValue)
                                .ClipEventDuration(initialAlarm,(e1, e2) => true)
                      select v; 
//Since with have the initial alarms, how long does the alarm last? 
var alarmTimeline = from i in initialAlarm.AlterEventDuration(e=> alarmTimeout)
                    where ( from v in validDataSignal select v).IsEmpty()
                    select i; 
                    

(from p in alarmTimeline.ToEdgeEnumerable()
 where p.EventKind == EventKind.Insert
 select p).Dump("Alarm timeframe");

And that should do it …

Tags:

Code Sample | StreamInsight