About the author

J Sawyer is a developer based in Houston, TX who absolutely loves to write code. After spending 9 years at Microsoft, he moved on to other things and is currently the Lead Developer for the RealTime Data Management team at Logica US. He spends his days building Really Cool Things around StreamInsight and having a blast doing it.

He has been involved with HDNUG, one of the oldest and largest .NET-focused user groups in the US, since its inception in 2001 and has watched it grow from 5-10 technologists meeting around a conference table to a thriving community of over 5000 with regular meeting attendance averaging 100 attendees. He currently serves as the Vice President. You can join him at HDNUG on the second Thursday of every month at the Houston Microsoft office.

He also loves to ride his Yamaha FZ1. And sometimes his Ninja 650. And also his Honday XR-400 dirt bike. But he doesn't code and ride at the same time. That would be bad.

Simple StreamInsight App

May 25, 2011 6:49 AM

In a previous post, I talked about how to change one of the StreamInsight sample apps to run using a remote StreamInsight server. After a discussion with the person that asked the original question, I’ve put together a full sample app that can run either in process or connect to a remote server. Most of the code is completely identical … input adapters, output adapters, queries … as that part of the programming model is the same. There are, however, some differences between what you need to do for this kind of app and what you typically see in the samples. I’ve tried to include most, if not all, of the server/application bells and whistles to make it easy to play with.

Let’s start with the common settings for the sample application. First, the streamInsightInstanceName should point to the active and installed instance. If starting out of process, this is used to build the url to connect to the management service. It probably would be better to have the full url there when you are running remote … but I didn’t do it. Next, you need to specify the streamInsightAppName, which determines the name of the application to create or connect to.

Creating the Server – In Process

First, set the runInProcess key in the app.config to true. If you want to use the metadata service that was introduced in 1.1, set useMetadata to true. When creating the server, you need to specify the metadata service at startup. If using the metadata overload, it cannot be null. A final note, in StreamInsight 1.2, a new feature called checkpointing is being introduced. This feature allows for higher availability of StreamInsight instances; certain queries can have their entire state written out to disk at some interval and allows for long-running queries to be reconstituted on startup (instead of losing all that information). Checkpointing relies on the metadata services.

Server cepServer = AppSettings.Current.UseMetadata ?
                    Server.Create(AppSettings.Current.StreamInsightInstanceName, GetMetadataService()) :
                    Server.Create(AppSettings.Current.StreamInsightInstanceName)

When running in process, you can also fire up the management service, which allows the Event Flow Debugger (and other tools) to connect to your StreamInsight instance. If a value is present in the managementServiceUrl settings key, this will be created. At this time, only the WSHttpBinding is supported. Note: you will need to run Visual Studio as admin -or- reserve whatever url you are going to use with WCF. See this blog post for details on that. I, personally, run VS as admin since it makes my life easier in this regard.

ServiceHost managementServiceHost = new ServiceHost(cepServer.CreateManagementService());
//Adding the service host. 
//This allows remote clients to access this application, including the Query Debugger.
managementServiceHost.AddServiceEndpoint(
    typeof(IManagementService),
    new WSHttpBinding(SecurityMode.Message), AppSettings.Current.ManagementServiceUrl);
managementServiceHost.Open();

Creating the Server – Remote

Since there are fewer knobs and switches for a remote instance, connecting to a running server is much simpler. With the sample app, it will connect to the standard url for the StreamInsight instance name specified in the config file. However, if you have the management service started on a custom in-process instance of StreamInsight, you will also be able to connect to that. By the way, tThe model used for the recently announced Azure SteamInsight service is the remote model.

var endpointAddress = new System.ServiceModel.EndpointAddress(
        @"http://localhost/StreamInsight/" + AppSettings.Current.StreamInsightInstanceName);

Server cepServer = Server.Connect(endpointAddress);

Getting the Application

In most of the demos/samples out there, not only is it running in process, it’s also running without metadata services. In those cases, you always need to create the application. However, you don’t always need to do this. If connecting to a remote instance or if you are using the metadata service, it is very possible that the application is already there. So we need to check before we grab the application object.

private static Application GetApplication(Server cepServer)
{
            
    if (!cepServer.Applications.ContainsKey(AppSettings.Current.StreamInsightAppName))
    {
        Console.WriteLine("Creating new Cep Application");
        return cepServer.CreateApplication(AppSettings.Current.StreamInsightAppName);
    }
    else
    {
        Console.WriteLine("Connecting to existing Cep Application");
        return cepServer.Applications[AppSettings.Current.StreamInsightAppName];
    }

}

Creating the Queries

Creating the queries and attaching the adapters is no different whether you are running in process or out of process. However, we need to take into account that, like with the application object, the queries may already be there. With the remote instance, this is pretty clear. When using metadata, the queries will be created when you create the server. However, they will not be started – so you do need to start them. When starting a query, you need to check to see if it’s already running – you will get an exception if you try to start a query that is already started. There are 3 queries that are created and running using a technique called Dynamic Query Composition. This is actually pretty important to understand – but a topic for another post. One is the raw, source query, one is a hopping window aggregate and one is a snapshot window aggregate. It’s interesting to see the differences between the output of the hopping window aggregate and the snapshot window aggregate. The only input adapter used is the random data generator that is included in the StreamInsight Team Samples. Each query is attached to an Async CSV output adapter, again from the StreamInsight Team Samples (not required – you can have a standing, running query without an output adapter).

I’ve uploaded the sample to MSDN Code Samples. You can download it here.

Tags:

StreamInsight | Code Sample

A couple of helpful StreamInsight Query extension methods

May 20, 2011 6:12 PM

When you are working with StreamInsight queries, there is a certain amount of information that isn’t available from the Query object, but should be … or, that is, I think should be and, since this is my blog, that’s all that really matters, right? The biggest one is the query’s current state. You can’t tell, from the query object, if it is running, stopped, stopping or anything else. Instead, you have to get the diagnostic view and then dig into that. I’m sorry … I am really loving StreamInsight but I find that just a little on the warty side. And … since you need the DiagnosticView to get this, it’s ideal to just add that one on there while we are at it. There are a bunch of things available in the diagnostic view that can be helpful but I find the query state to be the most important.

First, an enum for the various query states. This isn’t defined in the StreamInsight assemblies; you get a string from the diagnostic view.

/// <summary>
/// Enum for the query's current state.
/// </summary>
public enum QueryState
{
    /// <summary>
    /// The query has been suspended
    /// </summary>
    Suspended,
    /// <summary>
    /// The query is in the process of stopping
    /// </summary>
    Stopping,
    /// <summary>
    /// The query is running and processing events
    /// </summary>
    Running,
    /// <summary>
    /// The query has been aborted due to an error
    /// </summary>
    Aborted,
    /// <summary>
    /// The state cannot be determined or the query has been deleted
    /// </summary>
    Unknown
}

Next, the extension methods. I’ve used Get in the title as they aren’t really properties; instead the value needs to be gotten so I feel that it’s best and proper to have that clearly indicated in the method name. They are both quite simple and, I feel, help make working with the query object a bit smoother.

/// <summary>
/// Class with extension methods for Queries.
/// </summary>
public static class QueryExtensionMethods
{
    /// <summary>
    /// Returns the diagnostic view for the query.
    /// </summary>
    /// <param name="query">Query object </param>
    /// <returns>The Diagnostic view for the query.</returns>
    public static DiagnosticView GetDiagnosticView (this Query query)
    {
        DiagnosticView diagnosticView =
            query.Application.Server.GetDiagnosticView(query.Name);
        return diagnosticView; 
    }

    /// <summary>
    /// Returns the current state of the query
    /// </summary>
    /// <param name="query"></param>
    /// <returns></returns>
    public static QueryState GetQueryState(this Query query)
    {
        QueryState queryState = QueryState.Unknown;
        try
        {
            DiagnosticView diagnosticView = query.GetDiagnosticView(); 
            if (!Enum.TryParse<QueryState>(
                diagnosticView[DiagnosticViewProperty.QueryState].ToString(), true, out queryState))
            {
                queryState = QueryState.Unknown; 
            }   
        }
        catch(ManagementException)
        {}
        return queryState; 
    }
}

And … a shout-out to Tony, one of the developers on my team, for coming up with this idea.

Tags:

StreamInsight

StreamInsight: Simple sample of adapters and Server.Connect

May 19, 2011 9:14 PM

This is in response to a question on the StreamInsight forum where a developer was asking for a sample that uses Server.Connect.

Before I get into the details, notes:

  • I used the PatternDetector sample from the StreamInsight Product Team Samples on CodePlex. Rather than write something from scratch, I just converted the existing project to VS 2010 and tweaked it.
  • Add a reference to System.ServiceModel to the project.
  • You need to make sure that you defined an instance for StreamInsight in the setup and installed the service. And start it. Starting it helps.
  • I had to change the creds for the StreamInsight service to use my credentials instead of Network Service because I got a funky error (that I’ve never seen before) with the Event Flow Debugger. It may have something to do with the fact that I’m running on a domain controller (don’t say it … I know … but it’s just a dev machine and I need to AD for some Hyper-V VM’s that I run). I’d recommend trying to do it with Network Service but if it doesn’t work, you’ve been warned. The error is below:

Security Support Provider Interface (SSPI) authentication failed. The server may not be running in an account with identity 'DEVBIKER\J Sawyer'. If the server is running in a service account (Network Service for example), specify the account's ServicePrincipalName as the identity in the EndpointAddress for the server. If the server is running in a user account, specify the account's UserPrincipalName as the identity in the EndpointAddress for the server.

  • You need to make sure that you copy the relevant DLL’s to the folder for the StreamInsight service host folder. By default, this is C:\Program Files\Microsoft StreamInsight 1.1\Host. These can be found in the \bin folder for the PatternDetector project and are:
    • StreamInsight.Samples.Adapters.SimpleTextFileReader
    • StreamInsight.Samples.Adapters.SimpleTextFileWriter
    • StreamInsight.Samples.UserExtensions.Afa
  • Create a folder for the input and output folders. Actually, if you are running under your own account, this won’t be necessary. If you are running under Network Service, you will. And make sure that you give Network Service appropriate permissions.
  • The User Defined Operator in the sample that I chose caused an issue when running remotely. This surprised me … usually things run pretty much unchanged. I didn’t have the time or energy to debug that so I just changed it so that it was no longer necessary.

So … the changes.

In void Main, I changed the startup code to:

if (inProc)
    using (Server server = Server.Create(streamInsightInstanceName))
    {
        RunApp(server);
    }
else
{
    using (Server server = Server.Connect(new System.ServiceModel.EndpointAddress("http://localhost/StreamInsight/" + streamInsightInstanceName)))
    {
        RunApp(server);
    }
}

 

As you can see, I refactored everything in the using block to a new method called “RunApp”. This made it cleaner (I thought). I suppose that I could have done it another way, but this just seemed right.

In RunApp, I changed to code to create the application to check for the existence of the application and, based on that, get or create the application object. This is below:

// Create application in the server. The application will serve 
// as a container for actual CEP objects and queries.
Console.WriteLine("Creating CEP Application");
Application application;
if (!server.Applications.ContainsKey(appName))
{
    application = server.CreateApplication(appName);
}
else
{
    application = server.Applications[appName];
}

And that is (typically) all that you need to do to change a StreamInsight app to run either in proc or remote. As I mentioned before, I did have to change the query and remove the reference to the UDO … which was very strange because I’ve never had issues with extensions before. I’m guessing that it had something to do with the implementation of the UDO.

You can download it below Program.cs (the only changed file) for the Pattern Detector sample below:

 

Tags:

.NET Stuff | StreamInsight | Code Sample

StreamInsight User Defined Aggregate: Standard Deviation

May 15, 2011 5:39 PM

Here’s a quick UDA to do standard deviation of a window. I found it interesting that I had to take the IEnumerable<double> source and call ToArray(). If I didn’t, it would throw a NullReferenceException, although why is something of a mystery. It would be nice if I could pass in the Average from the query since that’s already calculated by the StreamInsight engine but no dice.

Note: I’ve not done any performance testing … it was copied from MSDN. Use at your own risk …

/// <summary>
/// Static class with UDA extensions for standard deviation
/// </summary>
public static class StandardDeviation
{
    /// <summary>
    /// Extension method for the UDA.
    /// </summary>
    /// <typeparam name="T">Payload type of the stream.</typeparam>
    /// <param name="window">Window to be passed to the UDA</param>
    /// <param name="map">Mapping from the payload to a float field in the payload.</param>
    /// <returns>Aggregation result.</returns>
    [CepUserDefinedAggregate(typeof(StdDevDouble))]
    public static double StdDev<T>(this CepWindow<T> window, Expression<Func<T, double>> map)
    {
        throw CepUtility.DoNotCall();
    }
}

/// <summary>
/// A UDA to calculate the standard deviation of a window
/// </summary>
public class StdDevDouble : CepAggregate<double, double>
{
    /// <summary>
    /// Computes the aggregation over a window.
    /// </summary>
    /// <param name="source">Set of events contained in the window.</param>
    /// <returns>Aggregation result.</returns>
    public override double GenerateOutput(IEnumerable<double> source)
    {
        double[] values = source.ToArray(); 

        double mean = values.AsParallel().Average();

        double standardDev = values.AsParallel().Aggregate(
            0.0,
            // do this on each thread
             (subtotal, item) => subtotal + Math.Pow((item - mean), 2),
            // aggregate results after all threads are done.
             (total, thisThread) => total + thisThread,
            // perform standard deviation calc on the aggregated result.
            (finalSum) => Math.Sqrt((finalSum / (source.Count() - 1)))
        );
        return standardDev;
    }

}

Using it in a query is simple – just make sure that you add a using statement for the namespace containing the aggregate’s definition.

var stdDeviation = from e in sourceQueryStream
             group e by e.ItemId into eachGroup
             from window in eachGroup.HoppingWindow(
                TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1), HoppingWindowOutputPolicy.ClipToWindowEnd)
             select new {
                 ItemId = eachGroup.Key,
                 StdDev = window.StdDev(e => e.Value), 
                 Avg = window.Avg(e => e.Value), 
                 Min = window.Avg(e => e.Value), 
                 Max = window.Avg(e => e.Value), 
                 Count = window.Count()
             };

Tags:

.NET Stuff | StreamInsight | Code Sample

How StreamInsight Data Is Different – Part I

May 8, 2011 3:07 PM

One of the key challenges that developers (from what I’ve seen) have when getting started with StreamInsight is in dealing with and understanding how data is handled. The concepts of input and output adapters is actually relatively simple and familiar to developers everywhere so that’s simple. The language used by SI to handle the data – LINQ – is also pretty familiar to developers, in this case, deceptively familiar. But once they get into writing the queries, things become more difficult.

Let’s start first with what developers are used to – 3-dimensional data. Data is identified and “found” using 3 pieces of information: source, row and column. This tells you where it is. We’re used to seeing things in 3 dimensions; it’s how our world is shaped. Given 3 pieces of spatial values, you can determine either size or location. Yes, these things may change – new building are constructed, existing buildings are reconfigured or renovated, wrecking balls take buildings out. But finding something now only requires those 3 pieces of information. And using this information, you can go to a restaurant. Or you can go on walkabout and explore to see what’s there right now. It is information that is stored and then retrieved on demand. From an experiential perspective, it’s memory and recall.

StreamInsight is different. There is an additional dimension added to things – the dimension of time. If, say, you are meeting a friend to have lunch, you need to know more than just where to meet him. You need to know when to meet him. You don’t need to know the “when” dimension to find the restaurant. And this is exactly the dimension that StreamInsight adds to our data. But it’s actually more than that. Saying when to meet our friend is still using it as an attribute, not as a dimension. Instead, it is like how we actually experience the lunch as it is occurring, how our brain processes the events and happenings around us. Like our own daily experience of events in time, data comes to us and passes by. Once a moment has passed, that moment is gone and lost to history. We may remember things, write them down, take a picture and so on … but then we are back to stored data that is a snapshot of time, but is not happening in time. As things (events) are happening, our brain processes them, correlates them, stores some away in memory and tosses some of it out. StreamInsight, as a CEP engine, does much the same thing that our brains do approaching the world in time. It wouldn’t, IMHO, be unreasonable to say that our brains are the most complex CEP engine that there is.

Let’s take this a little further and use it to explain some of the core StreamInsight concepts. Like the real world, SI has events and these events have different temporal characteristics.

Let’s go back to meeting our friend for lunch. We go to the at a specific time restaurant to meet our friend. When we go into the restaurant, we open the door and walk in. It is an event in time but it’s not something that we’d typically remember later nor would we care to. This is a point event; there and then gone. If there is something unusual about this event … say, for example, the door falls off of its hinges when we open it … but we’ll touch on that a little later.

This restaurant that we’re going to isn’t open 24 hours; they are open from 10:00 AM to 11:00 PM on some days and from 10:00 AM to 2:00 AM on some other days. But we know what those hours are going to be and that the restaurant will have a status of “open” during this time period. While the restaurant is open, other things happen … customers come in, place orders and pay their bill. At the end of the day, we have a total number of customer, total number of orders and total amount of money that pass through the register. This, too, is an interval with associated data, one that is just like an aggregate database query. We can have both of these types of intervals in StreamInsight – one where the data associated with it is known in the beginning and one that has data that is calculated based on a time window. Either way, there is a known and definite start and end.

Now, back to our lunch. We know we are at lunch and we know who our friend is when it starts but we don’t know, at the time we start, when it’s going to end. We know it will but there are a lot of factors that come into play – how quickly the kitchen is running, how busy our waitress is, a lengthy discussion with our friend the kids and wives or yesterday’s baseball game. Once our lunch is done, we can mark down the end time and move on with our day. This is the edge event. You have a start and sometime later have an end but you don’t know when that end will be when the event itself starts.

If you want to get technical, everything happens over a time span, even if it’s a very short one. Likewise, StreamInsight handles all of these events internally as interval events. Point events have a time span of a tick and edge events start out with a time span of infinity and at the end has the end time set. Interval events are, I think, self-explanatory.

Where do all of these events come from? In our daily experiences, we learn about things that happening through our 5 senses. Or, perhaps more accurately, we can get it through our 5 senses. We could close our eyes or plug our ears. Our senses are our input adapters. Most of these events come and go with little or even no memory or their occurring (it’s a MASSIVE amount of data that our brains routinely handle) but unusual or extraordinary events are remembered, written down, photographed or video recorded. We can then review these events later, whether it’s to relive them or analyze them deeper. And this is exactly what our output adapters are for. But, just like the events that occur around us, not all events are sent to output adapters, only those events that meet certain conditions. In StreamInsight, we use LINQ queries to do this determination. You won’t, in many cases, want to store these events and, besides, StreamInsight can handle and analyze far more events than we could reasonably send to a disk. A traditional database is always a replay of the past … you select things from tables that have already happened, you analyze them in cubes, create reports, etc. StreamInsight can help you get the appropriate, important information into these storage-based data sources.

In the next article, I’ll go further on this and talk about how events are joined and unioned in time.

Tags:

StreamInsight