About the author

J Sawyer is a developer based in Houston, TX who absolutely loves to write code. After spending 9 years at Microsoft, he moved on to other things and is currently the Lead Developer for the RealTime Data Management team at Logica US. He spends his days building Really Cool Things around StreamInsight and having a blast doing it.

He has been involved with HDNUG, one of the oldest and largest .NET-focused user groups in the US, since its inception in 2001 and has watched it grow from 5-10 technologists meeting around a conference table to a thriving community of over 5000 with regular meeting attendance averaging 100 attendees. He currently serves as the Vice President. You can join him at HDNUG on the second Thursday of every month at the Houston Microsoft office.

He also loves to ride his Yamaha FZ1. And sometimes his Ninja 650. And also his Honday XR-400 dirt bike. But he doesn't code and ride at the same time. That would be bad.

Dual Mode Data Sinks–Part II

April 13, 2013 7:04 PM

Well, it’s been a while. Far too long, in fact. I’ve been on a project that’s … well … challenged and that’s consumed far more of my time than I would care to really dwell on. I’ll not say any more than that. So … my apologies for taking so long to continue.

With that out of the way, let’s pick up with our data sinks. There were two major things left undone with the last post. First, the API. I left it without an abstracted, cleaner API that was similar to the adapter model. That’s the first thing to add. Now, we don’t want to go “all the way” with this and produce a runnable process – that would defeat one of the coolest features of a process – the ability to contain multiple sources and sinks on one clean block that starts and stops as a whole. This also allows us to send individual queries to multiple sinks (without DQC or subjects) and makes reuse of the sources across multiple sinks simpler than in the adapter model with DQC. And, to keep in consistent with the adapter model, we want to attach our sink to the stream directly from the IQStreamable interface – and this means extension method. (I’ll confess that I have come to deeply love extension methods, by the way … I really like the clean API that they create). Our extension method will encapsulate creating the observer and then binding the stream to the observer. So that we can include multiple bindings within a process, we’ll just return the streamable binding that we create. Our extension method looks like this:

ToBinding Extension Method
  1. public static IRemoteStreamableBinding ToBinding<TPayload>(
  2.     this IQStreamable<TPayload> stream,
  3.     Application cepApplication,
  4.     Type consumerFactoryType,
  5.     object configInfo,
  6.     EventShape eventShape)
  7. {
  8.     var factory = Activator.CreateInstance(consumerFactoryType) as ISinkFactory;
  9.  
  10.     if (factory == null)
  11.     {
  12.         throw new ArgumentException("Factory cannot be created or does not implement ISinkFactory");
  13.     }
  14.  
  15.     switch (eventShape)
  16.     {
  17.         case EventShape.Interval:
  18.             var intervalObserver =  cepApplication.DefineObserver(() => factory.CreateIntervalObserverSink<TPayload>(configInfo));
  19.             return stream.Bind(intervalObserver);
  20.             
  21.         case EventShape.Edge:
  22.             var edgeObserver = cepApplication.DefineObserver(() => factory.CreateEdgeObserverSink<TPayload>(configInfo));
  23.             return stream.Bind(edgeObserver);
  24.             
  25.         case EventShape.Point:
  26.             var pointObserver = cepApplication.DefineObserver(() => factory.CreatePointObserverSink<TPayload>(configInfo));
  27.             return stream.Bind(pointObserver);
  28.             
  29.         default:
  30.             throw new ArgumentOutOfRangeException("eventShape");
  31.     }
  32.     
  33. }

We won’t call it ToQuery(), doing so, while consistent, wouldn’t be descriptive at all. A binding that then gets run in a process is an inherently different thing from a query. With the adapter model, a query is a single runnable unit of logic; a binding is not. A binding can only run within the context of a process – it is the process, not the binding, that is the closest corollary to the query. But, the naming and the arguments are similar enough. Now, when we want to bind a stream to a sink, it looks like this:

Binding a Stream to a Sink
  1. var sinkConfig = new ConsoleOutputConfig()
  2.     {
  3.         ShowCti = true,
  4.         CtiEventColor = ConsoleColor.Blue,
  5.         InsertEventColor = ConsoleColor.Green
  6.     };
  7.  
  8. var binding = data.ToBinding(cepApplication, typeof (ConsoleOutputFactory), sinkConfig, EventShape.Point);
  9. binding.Run("Hello");

You’ll also notice, if you haven’t noticed before, that this API forces you to use the entire event, not just the payload, when sending to the sink. While you can just send the payload to the sink and ignore the event itself, that seems quite pointless and can also cause quite a bit of confusion. I’ve seen a lot of folks put timestamps in the payload; I typically don’t. In the vast majority of cases, I rely on – and use – the timestamps on the event itself. These are the timestamps that actually matter; these are the timestamps that the StreamInsight engine is using when evaluating the event. These are the timestamps of the event within the application timeline. Anything in the payload is just an attribute of the event; the start time and end time are a key part of the definition. Also, if you only send the payload to your sink, you won’t get any CTIs at all. CTIs are kinda important, I think, for a sink for a couple of reasons. First, even if you have no events coming through but you have CTIs, you know that the engine is running and pumping data through. This is particularly useful when you aren’t getting any data at all in your sink. Without the CTIs, you won’t have a good idea if your query is actually getting processed through the engine or if you have some other error in your logic. Second … CTI’s let you know when you can/should write to a durable store in a batch. Here’s the thing – Insert events don’t get released to your sink until there is a CTI. That CTI tells you, also, that all of the events up to that point in the application time have been released to your sink. So that’s the perfect time to batch up any writes that you have to a durable store (say, Sql Server) and write them in one shot. Batched updates/inserts are going to scale far better than single writes and that’s always a very good thing in a StreamInsight application. You always need to remember that, in many cases, that sink is going to be the biggest potential bottleneck in the application because it usually involves some sort of I/O. And that I/O is always going to be slower than the raw CPU and memory-bound performance that you can get from your queries.

The next thing on our list is a bit tougher. I mentioned it in my previous post and it’s the one thing that I really didn’t like about the entire reactive model when I first saw it. You see, I’m a big fan of untyped adapters, especially output adapters. Yes, they are pretty useful on the input side as well but on the output they are absolutely essential. You can’t always know what your payload is going to look like and you don’t want to be writing a sink for every different payload that you dream up. For input … in a lot of cases, you can do OK since you’ll have a good idea of what the schema is going to look like. This is something that also came up recently on the StreamInsight forum so I know I’m not the only one that misses it. Fortunately, the .NET Framework gives us a way to make this happen. It’s more work than a simple untyped adapter but, with reflection, we can get the same kind of flexibility that we had with untyped adapters in our sinks.

What we want to do is to have, at the end, something similar to what an output adapter provides for us – name/value pairs where nested classes have a [ParentProperty].[ChildProperty] type name. While we’re at it, we’ll simplify it a bit; it’ll be a simple, straightforward dictionary with the name and the value, rather than having a separate event definition and then matching the index with the name, as in an untyped adapter. At the highest level, we’ll have a method called "GetPropertyValues” that takes the object and handles all of the details for us. Of course, it’ll be an extension method.

GetPropertyValues
  1. /// <summary>
  2. /// Returns the values of properties as name/value pairs.
  3. /// </summary>
  4. /// <param name="source">The object to read properties from</param>
  5. /// <returns></returns>
  6. public static Dictionary<string, object> GetPropertyValues(this object source )
  7. {
  8.     var propertyDictionary = new Dictionary<string, object>();
  9.     //Get all of the properties.
  10.     AppendPropertyValues(source, string.Empty, propertyDictionary);
  11.     return propertyDictionary;
  12.  
  13. }

The real work is in the AppendPropertyValues method. This takes the dictionary, the object and a property name prefix and appends the properties and their values to the dictionary. To do this, first we get a list of the public instance properties on the object. From there, we loop over them. If one of them is a class, we then recurse into AppendPropertyValues but adding the source property name as the prefix. After that, we also get the fields with the same binding flags; the GetProperties method won’t return the fields and there isn’t a single reflection method that I could find that would give me both in a single call.

AppendPropertyValues
  1. private static void AppendPropertyValues(object source, string prefix, Dictionary<string, object> propertyDictionary)
  2. {
  3.     var properties = source.GetType().GetProperties(BindingFlags.Public | BindingFlags.Instance);
  4.     foreach (var propertyInfo in properties)
  5.     {
  6.         if (_validProp.Contains(propertyInfo.PropertyType))
  7.         {
  8.             var method = propertyInfo.GetGetMethod();
  9.             object value = method.Invoke(source, null);
  10.             propertyDictionary.Add(prefix + propertyInfo.Name, value);
  11.         }
  12.         else if(propertyInfo.PropertyType.IsClass)
  13.         {
  14.             var method = propertyInfo.GetGetMethod();
  15.             object value = method.Invoke(source, null);
  16.             AppendPropertyValues(value, prefix + propertyInfo.Name + ".", propertyDictionary);
  17.         }
  18.     }
  19.     var fields = source.GetType().GetFields(BindingFlags.Public | BindingFlags.Instance);
  20.     foreach (var fieldInfo in fields)
  21.     {
  22.         if (_validProp.Contains(fieldInfo.FieldType))
  23.         {
  24.             object value = fieldInfo.GetValue(source);
  25.             propertyDictionary.Add(prefix + fieldInfo.Name, value);
  26.         }
  27.         else if (fieldInfo.FieldType.IsClass)
  28.         {
  29.             var value = fieldInfo.GetValue(source);
  30.             AppendPropertyValues(value, prefix + fieldInfo.Name + ".", propertyDictionary);
  31.         }
  32.     }
  33. }

With this now in place, we have the same capabilities in our sinks that we had in our output adapters. Using this, we can change our ConsoleDataConsumer to display the payload properties, rather than just some basic header info.

Code Snippet
  1. if (outputEvent.EventKind == EventKind.Insert)
  2. {
  3.     Console.ForegroundColor = Configuration.InsertEventColor;
  4.     
  5.     var eventValues = outputEvent.Payload.GetPropertyValues();
  6.     StringBuilder output = new StringBuilder(2048);
  7.     foreach (var eventValue in eventValues)
  8.     {
  9.         output.Append(eventValue.Key).Append(":").Append(eventValue.Value.ToString()).Append("\t");
  10.     }
  11.     Console.WriteLine("Insert Event Received at " + outputEvent.StartTime + "\t" + output.ToString());
  12. }
  13. else if (Configuration.ShowCti)
  14. {
  15.     Console.ForegroundColor = Configuration.CtiEventColor;
  16.     Console.WriteLine("CTI event received at " + outputEvent.StartTime);
  17. }
  18. Console.ResetColor();

Now … we could certainly do some optimizations on this. For example, we could cache the property names and definitions for each of the types. I’ll leave that as an exercise for later … or for you, dear blog reader.

You can download the solution from my SkyDrive. As you look, you’ll also see that I’ve done some refactoring on the project names and namespaces …

Tags:

StreamInsight | Code Sample

Dual Mode Data Sources-Part III

January 2, 2013 9:41 PM

In the two previous postings, we would through creating the data sources and exposing them to both the new Reactive-centric API and the “legacy” adapter-centric API. While we’ve accomplished that, I also said that we’d revisit the 2.1 code to add a layer of consistency to the APIs; we’re going to “bring back” factories and integrate these into our API. Why on earth would I do that? Didn’t we just get rid of them to make things simpler? Well, yes, we did just do away with the requirement of a factory but that doesn’t mean that they aren’t a good idea. Implementing the factory pattern will hide the details of creating the actual data source – our current one is pretty simple but others may get quite complex – and provide a layer to “check” requirements before we try to start the query (we’re already doing this with the adapter factory). I also happen to like consistent APIs and keeping consistency whenever practical and possible.

We’ll start by creating an interface. We may – and likely will – also wind up creating an abstract base class that implements the interface and handles common functionality but that will be refactored later after we write some more sources and get a better feel of how to best define the base class. And … having both an interface and a base class gives us the greatest level of flexibility when implementing later on; because our API is based on interfaces, we can inherit from other, existing code and/or components and just add the interface. If we based our API exclusively on base classes, the single-inheritance rule would limit what that. This interface will be pretty simple and only have a single method.

ISourceFactory
/// <summary>
/// Interface for data source factories.
/// </summary>
public interface ISourceFactory
{
    /// <summary>
    /// Creates an observable source.
    /// </summary>
    /// <typeparam name="TPayload">Type of the payload.</typeparam>
    /// <param name="config">Configuration class.</param>
    /// <param name="eventShape">Shape of the events</param>
    /// <returns></returns>
    IObservable<StreamInputEvent<TPayload>> CreateObservableSource<TPayload>(object config, EventShape eventShape);
}

You may notice a slight different between this and the input adapter factory interface – config is not strongly typed. Yes, we could do that … define a generic interface and then use reflection to call it. But, to be honest, I really didn’t want to go through all of the contortions to make that happen. So the config is an object.

Now, when we implement this on our existing factory, we also refactor things a bit to maximize code reuse.

Adapter + Source Factory
public sealed class TestDataInputFactory : ITypedInputAdapterFactory<TestDataInputConfig>, ISourceFactory
    {

        [SuppressMessage("Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter", Justification = "By Design")]
        public InputAdapterBase Create<TPayload>(TestDataInputConfig configInfo, EventShape eventShape)
        {
            CheckPayloadType<TPayload>();
            return new ObservableTypedPointInputAdapter<TestDataEvent, TestDataInputConfig>(CreateProducer(configInfo, eventShape));
        }

        public IObservable<StreamInputEvent<TPayload>> CreateObservableSource<TPayload>(object config, EventShape eventShape)
        {
            //Check the payload type.
            CheckPayloadType<TPayload>();
            //Check the config class for the proper type.
            TestDataInputConfig typedConfig = config as TestDataInputConfig;
            if (typedConfig == null)
            {
                //Invalid cast
                throw new ArgumentException("Configuration class must be of type TestDataInputConfig");
            }
            return (IObservable<StreamInputEvent<TPayload>>)CreateProducer(typedConfig, eventShape);
        }

        private static void CheckPayloadType<TPayload>()
        {
            //Check the payload.
            if (typeof(TPayload) != typeof(TestDataEvent))
            {
                //this won't work.
                //throw an exception.
                throw new InvalidOperationException("Specified type must be of " + typeof(TestDataEvent).FullName);
            }
        }


        private TestDataProducer CreateProducer(TestDataInputConfig config, EventShape eventShape)
        {
            switch (eventShape)
            {
                case EventShape.Point:
                    //Create the publisher.
                    return new TestDataProducer(config);
                default:
                    throw new ArgumentException(string.Format(
                        System.Globalization.CultureInfo.InvariantCulture,
                        "TestDataInputFactory cannot instantiate adapter with event shape {0}",
                        eventShape.ToString()));
            }
        }

        public void Dispose()
        {
        }
    }

You’ll notice that both methods … source and adapter … use all of the same validation logic and code. Using this in our existing code isn’t too difficult. Our new “RunProcess()” now looks like the following:

Run Process
private static void RunProcess(Application cepApplication)
{
    var config = new TestDataInputConfig (){
        NumberOfItems=20,
        RefreshInterval=TimeSpan.FromMilliseconds(500)
    };


    var data = cepApplication.DefineObservable(
        () => new TestDataInputFactory().CreateObservableSource<TestDataEvent>(config,EventShape.Point ))
        .ToPointStreamable(e => e.GetPointEvent());

    
    var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(
        e => Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}", e.ItemId, e.RunNumber, e.Value)));

    data.Bind(sink).Run();

}

We can still make it better, though, and make make it look more like the pre-2.1 API. We’ll start by creating an RxStream class and add methods for creating observables. We’ll also want to make sure that these methods are remotable – so we can work with either a local or a remote instance without any code changes. This was a challenge with the pre-2.1 API; you could pretty easily get yourself into a situation where your code work work on a local, in-process instance but not work at all when you were connecting to a remote instance. With DefineObservable, however, it’s actually defining a remote function that returns an observable and that’s what we call … whether in process or remote. If it works in-process then it’ll work out of process. However, you may wind up getting yourself into a situation with mysterious serialization errors … you need to make sure that whatever you pass to your methods if fully serializable. That’s why our configuration class has the DataContract attribute. With our CreateObservable method, we’ll first check to see if we have a reference to the function (and notice that it is of type Func<>). If not, we check to see if it’s been created and deployed. If not, we create and deploy it. We could also put this same code in a static constructor – it wouldn’t make much difference. The actual work is done by an InstantiateObservable private method and that’s what our defined Observable actually calls.

RxStream Observables
private static Func<Type, object, EventShape, IQbservable<StreamInputEvent<TPayload>>> _observable;

public static IQbservable<StreamInputEvent<TPayload>> CreateObservable(Application cepApplication,
    Type sourceFactoryType,
    object configInfo,
    EventShape eventShape)
{
    string entityName = "Observable:" + typeof (TPayload).FullName;
    if (_observable == null)
    {
        //Check the application.
        if (cepApplication.Entities.ContainsKey(entityName))
        {
            _observable =
                cepApplication.GetObservable<Type, object, EventShape, StreamInputEvent<TPayload>>(entityName);
        }
        else
        {
            //Define and deploy.
            _observable = cepApplication.DefineObservable(
                (Type t, object c, EventShape e) => InstantiateObservable(t, c, e));
            _observable.Deploy(entityName);
        }

    }
    return _observable.Invoke(sourceFactoryType, configInfo, eventShape);
}

private static IObservable<StreamInputEvent<TPayload>> InstantiateObservable(Type sourceFactoryType,
                                                                              object configInfo,
                                                                              EventShape eventShape)
{
    var sourceFactory = Activator.CreateInstance(sourceFactoryType) as ISourceFactory;
    if (sourceFactory == null)
    {
        throw new ArgumentException("Specified type is not a source factory.");
    }
    return sourceFactory.CreateObservableSource<TPayload> (configInfo, eventShape);
}

Now all that we have left to do is to create methods to also create the streams, rather than just the observables. This will be our typical use case but we’ll still keep the observable method public as well to give us the most flexibility when we are actually using this in anger.

RxStream Create
public static IQStreamable<TPayload> Create(
    Application cepApplication, Type sourceFactoryType, object configInfo, EventShape eventShape)
{
    return Create(cepApplication, sourceFactoryType, configInfo, eventShape, null);
}

public static IQStreamable<TPayload> Create(
    Application cepApplication, Type sourceFactoryType, object configInfo, EventShape eventShape, AdvanceTimeSettings advanceTimeSettings)
{
 
    var observable = CreateObservable(cepApplication, sourceFactoryType, configInfo, eventShape);

    switch (eventShape)
    {
        case EventShape.Interval:
            return observable.ToIntervalStreamable(e => e.GetIntervalEvent(), advanceTimeSettings);

        case EventShape.Edge:
            return observable.ToEdgeStreamable(e => e.GetEdgeEvent(), advanceTimeSettings);

        case EventShape.Point:
            return observable.ToPointStreamable(e => e.GetPointEvent(), advanceTimeSettings);

        default:
            throw new ArgumentOutOfRangeException("eventShape");
    }
}

Once this is in place, the code to create our stream look remarkably similar to the adapter-centric code:

Creating the Stream
var config = new TestDataInputConfig (){
    NumberOfItems=20,
    RefreshInterval=TimeSpan.FromMilliseconds(500)
};


var data = RxStream<TestDataEvent>.Create(cepApplication, typeof (TestDataInputFactory), config, EventShape.Point);

There’s still more that we’ll need to do … for example, we’ll need to create overloads to import CTIs. But, for now, we’re done with the core API for our input adapters and will be moving on to output adapters/sinks in the next article.

You can download the current from SkyDrive

Tags:

StreamInsight | Code Sample

Dual Mode Data Sources–Part II

December 11, 2012 9:11 PM

In my previous post, I walked through creating a basic structure for creating StreamInsight data sources using the Reactive model introduced with version 2.1. There was quite a bit of plumbing work to get done but now that it’s finished, we’re ready to move on to the next step … taking what we just created and making it available to the pre-2.1 API. I didn’t mention this in the previous post but these articles are the first in what’s going to be a series of articles that walk through building a StreamInsight application. In these articles, we’re going through the process to create our data sources – or, more accurately, the architectural framework for our data sources. We’ll do the same with sinks/output adapters and then we’ll start pulling in some real(istic) data and doing something more interesting. Each article will build on the next so you can follow along.

Now, back to our input. Since we already have the data flowing via the IObservable interface, it’s really easy to subscribe to this interface and implement an input adapter that uses the exact same code to generate events. All we need to do subscribe to our producer and enqueue the event when OnNext is called:

On Next
  1. public void OnNext(StreamInputEvent<TPayloadType> publishedEvent)
  2. {
  3.     if (AdapterState == AdapterState.Running)
  4.     {
  5.         var newEvent = publishedEvent.GetPointEvent();
  6.         this.Enqueue(ref newEvent);
  7.     }
  8. }

It really is that simple. Well, almost but before we get into that, let’s step back for a second. We have an event producer that creates our events. This producer handles all of the details of connecting to the data source and packaging the data for StreamInsight. Traditionally, we did all of this in the adapter itself but since we’ve already separated it, the adapter really doesn’t need to do much except enqueue. Looking at it that way, why do we need to have separate adapters for each type that we have? Well, the truth is: we don’t. With a little planning, we can have a single set of adapters that subscribe to a producer’s IObservable interface and handle enqueuing the events into StreamInsight as well as manage lifetime. These will be our adapters and they’ll wind up being very thin. So we’ll start by creating a new point typed input adapter. Like the base class, the payload type is a generic argument for the class – there’s no reason why we can’t use this for any type. Because Start() and Resume() are abstract methods on the base class, we have to implement them. We’ll also want to override Stop() and Dispose(bool disposing).

Adapter Skeleton
  1.     public class ObservableTypedPointInputAdapter<TPayloadType>
  2.         :TypedPointInputAdapter<TPayloadType>
  3.     {
  4.         public override void Start()
  5.         {
  6.             throw new NotImplementedException();
  7.         }
  8.  
  9.         public override void Resume()
  10.         {
  11.             throw new NotImplementedException();
  12.         }
  13.  
  14.         public override void Stop()
  15.         {
  16.              base.Stop();
  17.         }
  18.  
  19.         protected override void Dispose(bool disposing)
  20.         {
  21.              base.Dispose(disposing);
  22.         }
  23.     }

So let’s get started. First, we need to get our producer. That’s simple enough; we know that we need to have an adapter factory so that’s where we’ll create it and pass it in as an argument of the constructor. We pop this into a private class field – we won’t be calling this until we subscribe. Speaking of subscribe, this is exactly what we need to do in the Start() method. For the producer’s OnCompleted action, we’ll call the adapter’s Stop() method. Yes, you can call Stop() in an adapter and not stop the queries that are getting events from the stream – this is something that may happen with a read-once reference data adapter. It’s not commonly seen in the wild but we’ll make sure that we handle it correctly. One thing that is important to understand here … if you are having StreamInsight generate your CTIs for you with AdvanceTimeSettings, one of your options is to AdvanceToInfinityOnShutdown. This is directly impacted by the adapter calling Stopped() (or the producer, in fact, calling Complete()). When the source shuts down, StreamInsight will enqueue a CTI with a timestamp of DateTimeOffset.MaxValue, which serves to “clear” the stream of any pending events. It also happens to work very nicely if you happen to have a read-once data source of reference data.

Constructor & Start
  1. public ObservableTypedPointInputAdapter(IObservable<StreamInputEvent<TPayloadType>> eventProducer)
  2. {
  3.     this._eventProducer = eventProducer;
  4. }
  5.  
  6. public override void Start()
  7. {
  8.     _subscription = _eventProducer.Subscribe(OnNext,
  9.          () =>
  10.          {
  11.              if (AdapterState == AdapterState.Running)
  12.              {
  13.                  Stop();
  14.              }
  15.          });
  16. }

Start vs. Resume

In a lot of the samples, you’ll see these implemented the exact same way. However, in the real world, you may want to put a little more thought into how you handle Resume(). Odds are pretty good, especially if you are using Premium Edition, that you will never need to actually implement Resume(). It gets called after your adapter is put into a suspended state because the input queue is full. Yes, it is possible to fill up the input queue – you need about 200,000 unprocessed events that StreamInsight can’t get pushed through your queries. I have made this happen – but it took some effort. I actually had to continue enqueueing events just as fast as I could on multiple threads. Even with some convoluted queries, it still took about some time to fill up the input queue. Considering that I’ve seen StreamInsight process over 30,000 events/sec on a dual core i7 laptop and over 100,000 events/sec on a low-to-mid level server with CPU cycles to spare, you will have to work at it. But it can happen. Now, if you are using Standard Edition, you won’t be able to get to that level of throughput (and I haven’t done any testing on Standard’s throughput) but you can still expect to push a goodly number of events through standard edition. Now, when the input queue fills up, StreamInsight will return Full from the enqueue. Your adapter will need to call Ready() and then StreamInsight will call Resume() when you can start enqueuing events again. If you are using the StreamInsight 2.1 Reactive model, you won’t get any notification if the input queue fills. Instead, StreamInsight will simply ignore your events until it’s ready to receive events again. This is what I’m going to do in our excercises but it may not be appropriate, depending on your source and your needs. For example, if you are simply pushing recorded data through the engine, it would be easy enough to pause and then start back up again when Resume() is called. If, however, it’s live data, you may need to do something else. You will want to exercise some caution in buffering up events that you miss – you have no way of knowing when the adapter will start back up again and you could eat up FAR more memory than you want to. It is a potentially difficult issue but, fortunately, one that is pretty tough to hit in the real world. Especially if you are using Premium Edition, you’ll saturate network bandwidth (for inbound data) before you fill up the queue. In our exercise, we’re going to do the same thing that StreamInsight 2.1 does … we’re just going to ignore incoming events until Resume() is called.

Back to the code … in the snippet above, I enqueued directly from the OnNext function. While this works – if everything goes right – it doesn’t really represent what we’ll need to do in a real app to handle things like exceptions, queue full and the like. Also, while this class is designed to be a stand-alone input adapter than can be used with any observable source, there may be special case adapters – for example, specific handling of a pause/resume scenario – where we want to override specific behaviors with an inherited class so we want to make sure that we provide the right level of points to override. So we’ll take the enqueue operation and create our own method. In here, we do all of our state checks to make sure that we can enqueue, handle enqueue exceptions and queue full as well as keep track of our last-enqueued CTI. Finally, we will also include a critical section lock around the actual enqueue to make sure that we handline lifetime correctly and don’t report Stopped() when we’re in the middle of an enqueue. It is the same method described in my previous article on output adapter lifetime but applied to input adapters.

Enqueue Event
  1. protected virtual void EnqueueEvent(StreamInputEvent<TPayloadType> publishedEvent)
  2. {
  3.     if (!_canEnqueue) return;
  4.  
  5.     if (publishedEvent.Start < _lastCti)
  6.     {
  7.         return;
  8.     }
  9.     lock (_lockObject)
  10.     {
  11.         if (this.AdapterState != AdapterState.Running)
  12.         {
  13.             return;
  14.         }
  15.         var point = publishedEvent.GetPointEvent();
  16.         try
  17.         {
  18.             var enqueueResult = this.Enqueue(ref point);
  19.             if (enqueueResult == EnqueueOperationResult.Success && publishedEvent.EventKind == EventKind.Cti)
  20.             {
  21.                 _lastCti = point.StartTime;
  22.             }
  23.             if (enqueueResult == EnqueueOperationResult.Full)
  24.             {
  25.                 //Queue full!! Pause enqueuing.
  26.                 _canEnqueue = false;
  27.                 ReleaseEvent(ref point);
  28.                 //Let StreamInsight know we're ready to resume.
  29.                 Ready();
  30.             }
  31.         }
  32.         catch
  33.         {
  34.             ReleaseEvent(ref point);
  35.             throw;
  36.         }
  37.     }
  38. }

Our last step is to create our adapter factory. In our Create method, we’ll do a couple of checks to make sure that our arguments are valid – the payload is the proper type and, since our random data source only supports points, that we are implementing the correct shape.

Input Adapter Factory
  1. public sealed class TestDataInputFactory : ITypedInputAdapterFactory<TestDataInputConfig>
  2. {
  3.  
  4.     [SuppressMessage("Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter", Justification = "By Design")]
  5.     public InputAdapterBase Create <TPayload> (TestDataInputConfig configInfo, EventShape eventShape)
  6.     {
  7.  
  8.         if (typeof(TPayload) != typeof(TestDataEvent))
  9.         {
  10.             //this won't work.
  11.             //throw an exception.
  12.             throw new InvalidOperationException("Specified type must be of " + typeof(TestDataEvent).FullName);
  13.         }
  14.  
  15.         switch (eventShape)
  16.         {
  17.             case EventShape.Point:
  18.                 //Create the publisher.
  19.                 return new ObservableTypedPointInputAdapter<TestDataEvent, TestDataInputConfig>(
  20.                     new TestDataProducer(configInfo));
  21.             default:
  22.                 throw new ArgumentException(string.Format(
  23.                     System.Globalization.CultureInfo.InvariantCulture,
  24.                     "TestDataInputFactory cannot instantiate adapter with event shape {0}",
  25.                     eventShape.ToString()));
  26.         }
  27.  
  28.     }
  29.  
  30.     /// <summary>
  31.     /// Dispose method.
  32.     /// </summary>
  33.     public void Dispose()
  34.     {
  35.     }
  36.  
  37. }

Once we have the adapter factory, we’re ready to go. Because we’ve not done any output adapters yet, we’ll be using the ToPointObservable() method to write to the console. Keep in mind that this works only when hosting StreamInsight in-process; it’s not at all like the IObservable support in 2.1. But it works well enough to show that we have some data flowing through and that our adapter now supports both models from the same data producer!

Run Query
  1. private static void RunQuery(Application cepApplication)
  2. {
  3.     var config = new TestDataInputConfig()
  4.         {
  5.             NumberOfItems = 20,
  6.             RefreshInterval = TimeSpan.FromMilliseconds(500)
  7.         };
  8.  
  9.     var data = CepStream<TestDataEvent>.Create(cepApplication,
  10.         "TestData", typeof (TestDataInputFactory), config, EventShape.Point);
  11.  
  12.     var observable = data.ToPointObservable().Subscribe(
  13.         e =>
  14.             {
  15.                 if (e.EventKind == EventKind.Insert)
  16.                 {
  17.                     Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}",
  18.                                       e.Payload.ItemId, e.Payload.RunNumber, e.Payload.Value);
  19.                 }
  20.                 else
  21.                 {
  22.                     Console.WriteLine("CTI @ {0}", e.StartTime);
  23.                 }
  24.             });
  25. }

Using the same pattern, we can also create the adapters for interval and edge events pretty quickly. Unfortunately, a lot of it is copy-paste-tweak. I say unfortunately because it leads to a lot of redundant and repetitive code that I, for one, would prefer to create and maintain in a single place. However, our key method – enqueue – isn’t defined in any common base class; we have to inherit from the shape-specific adapters and there’s no way around it. While we could probably deal with this with another layer of abstraction, there isn’t, IMHO, a whole lot of value to that. Should that change, we’ll take a look at refactoring a bit then.

Looking at the input adapter factory, you’ll notice that it does some checking to make sure things are valid before creation; factories are good for that. There’s a bunch of other stuff that could be done in the factory as well and having a standard, generic factory interface is a good way to provide yourself with a clean, clear and decoupled way to create what may be complex underlying objects. And there’s no reason why we can’t revisit our existing 2.1-based code to add the factory in there. It also provides a level of consistency between the two different APIs that can make our life simpler, reduce the amount of debugging that we need to do and promote code sharing. That, however, will have to wait for the next post – I am trying to do this in manageable pieces so I’m not writing a novel with each post. In the meantime, you can download the current project from my SkyDrive.

Tags:

Code Sample | StreamInsight

Dual Mode Data Sources–Part I

December 7, 2012 6:47 PM

When StreamInsight 2.1 came out, the Big New Thing for the release was the new model for data ingress and egress – sources and sinks. Based on standard interfaces like IObservable and IObserver and the Reactive Extensions, it makes getting data into and out of StreamInsight a whole lot easier. The previous model works (or worked) well enough but, honestly, it was hard … and harder than it really needed to be. Getting the adapter lifetime just right was not a trivial undertaking and getting wrong meant either hanging on shutdown or an ObjectDisposedException. It took me … and my team … some time to get all of this nailed down to a science. The new model handles all of this for you … you just subscribe and publish. The rest is invisible. Very nice.

But, as when any new API replaces an old API, there is a time of transition from one to the other. There’s a time when developers or solution providers – like me – need to be able to support both APIs. For us, we actually still (sadly) need to support StreamInsight 1.2 and 2.0 in some cases so we can’t go over to the new model. But we also don’t want to rewrite all of our adapters or maintain two codebases of adapters. Instead, we want to support both from a single code base. Fortunately, this is actually pretty easy to accomplish with a little planning and good architectural practices. I’ll walk you through this – in it’s simplest form – and this code will also be the base for a line of additional articles that I’ve been working on and have planned. In fact, I plan to walk through creating a StreamInsight application step by step and discussing the technical details as we go so, by the end, you’ll have walked with me to build a simple StreamInsight application.

Oh … and let me make one thing clear though. I’ve heard it said (from Microsoft field technical sales-type folks) that “you don’t have to write adapters anymore.” That’s not entirely accurate. You don’t have to use the “legacy” adapter API anymore. But you will have to write code to get data in to and out of StreamInsight. You may use a different API now (IObserver/IObservable) but the necessity of writing code for this task is still there. Call it what you want ‘cuz now we’re just arguing semantics.

Sources/Input Adapters

Source is the new name for input adapter. With StreamInsight 2.1, you’ll want to implement IObservable. I won’t go over the basics; you’ll find them in the documentation. But there are some things that you need to consider that aren’t in the documentation. First … CTI's (and I’ll be talking more about these in a future post or two). While you can use AdvanceTimeSettings to have StreamInsight generate your CTIs for you, this isn’t always appropriate to do. So we need to have a way to have CTIs created and enqueued by the adapters … and it needs to be optional. So we’ll start with an wrapper class that allows a data source to create both events (with payloads) and CTIs. This will be our StreamInputEvent class. It will have all of the appropriate temporal properties and event shape metadata as well as the payload.

StreamInputEvent
  1. public class StreamInputEvent<TPayloadType>
  2.     {
  3.         public TPayloadType Payload { get; set; }
  4.         public DateTimeOffset Start{ get; set; }
  5.         public DateTimeOffset End{ get; set; }
  6.         public EdgeType EdgeType{ get; set; }
  7.         public EventKind EventKind{ get; set; }
  8.     }

We’ll also add some helper methods to create the different event shapes as well as a constructor or two:

Helpers & Constructors
  1. public PointEvent<TPayloadType> GetPointEvent()
  2. {
  3.     if (this.EventKind == EventKind.Insert)
  4.     {
  5.         return PointEvent<TPayloadType>.CreateInsert(this.Start, Payload);
  6.     }
  7.     return PointEvent<TPayloadType>.CreateCti(Start);
  8. }
  9.  
  10. //More helpers for Edge and Interval
  11.  
  12. public StreamInputEvent(DateTimeOffset ctiDateTime)
  13. {
  14.     EventKind = EventKind.Cti;
  15.     Start = ctiDateTime;
  16.     EdgeType = EdgeType.Start;
  17.     End = DateTimeOffset.MaxValue;
  18.     Payload = default(TPayloadType);
  19. }
  20.  
  21. public StreamInputEvent(TPayloadType payload)
  22. {
  23.     Start = DateTimeOffset.MinValue;
  24.     End = DateTimeOffset.MaxValue;
  25.     EdgeType = default(EdgeType);
  26.     Payload = payload;
  27.     EventKind = EventKind.Insert;
  28. }
  29.  
  30. public StreamInputEvent(TPayloadType payload, DateTimeOffset startTime, DateTimeOffset endTime, EdgeType edgeType)
  31. {
  32.     Start = startTime;
  33.     End = endTime;
  34.     EdgeType = edgeType;
  35.     Payload = payload;
  36.     EventKind = EventKind.Insert;
  37. }

Now, you’ll note that there isn’t an EventShape specified. The shape can be determined from the event times and we’ve found it useful to have data sources that can adapter to whatever shape we deem fit for a particular use case. But we’ll still have a factory (regardless of how we do it) that allows us to limit a particular data source to specific shapes. And the helpers make it really, really easy when we create the IQStreamable for the temporal processing.

Now that we have that done, let’s take a step back and look at this. We have a data source of some type and we need to get the data into StreamInsight. The wonderful thing about StreamInsight 2.1’s focus on IObservable is that he lets use really step back for a second and look at how we produce data. In fact, it allows use to “hide” our producers behind a standard interface … and that’s what we need to do. At the lowest level, the code closest to the data source, we’ll create a “data producer” that takes the data and publishes it via IObservable – so there’s our ‘native’ 2.1 interface. To support the legacy adapter model, we will just need to subscribe to this with the concrete Input Adapter class.

So let’s get started by creating a base EventProducer class to handle subscribers and disposal. We’ll also need to make sure that we can specify the configuration for the event producer – just like we were forced to do with the legacy adapter model. The reality is simple … unless you are doing a really, incredibly simple demo, you need to have a way to specify configuration. Having a configuration class that is passed to the event producer from the application just makes sense.

StreamEventProducer
  1. public abstract class StreamEventProducer<TPayloadType, TConfigType>
  2.     : IObservable<StreamInputEvent<TPayloadType>>, IDisposable
  3. {
  4.     public TConfigType Configuration { get; protected set; }
  5.  
  6.     protected EventProducerBase(TConfigType configuration)
  7.     {
  8.         this.Configuration = configuration;
  9.     }
  10.  
  11.     protected abstract void Start();
  12.  
  13.     private IObserver<StreamInputEvent<TPayloadType>> _observer;
  14.  
  15.     public virtual IDisposable Subscribe(IObserver<StreamInputEvent<TPayloadType>> observer)
  16.     {
  17.         this._observer = observer;
  18.         Start();
  19.         return this;
  20.     }
  21.  
  22.     protected void PublishException(Exception ex)
  23.     {
  24.         _observer.OnError(ex);
  25.     }
  26.  
  27.     protected void PublishEvent(StreamInputEvent<TPayloadType> newEvent)
  28.     {
  29.         _observer.OnNext(newEvent);
  30.     }
  31.  
  32.     protected virtual void Dispose(bool disposing)
  33.     {
  34.         if (disposing)
  35.         {
  36.             _observer.OnCompleted();
  37.         }
  38.     }
  39. }

If you look at this, you’ll see that it’s pretty simple. You can certainly get more sophisticated with this - by supporting multiple observers to share event producers between multiple sources and/or adapters – but I want to keep it simple and clear. Our concrete event producer will inherit from this class, implement Start() (called when a subscriber actually subscribes) and call the PublishException and PublishEvent as appropriate. Our event producer will also need want to override Dispose(bool disposing) to get notification if when it’s time to shut down.

So let’s go ahead now and create our first event producer. For our sample here, it will be a very simple random data generator because, of course, everyone needs to analyze completely random data in StreamInsight. Well, no of course not, but it’s easy and simple. We’ll start with the configuration class.

TestDataInputConfig
  1. [DataContract()]
  2. public class TestDataInputConfig
  3. {
  4.  
  5.     public TestDataInputConfig()
  6.     {
  7.         RefreshInterval = TimeSpan.FromMilliseconds(500);
  8.         NumberOfItems = 10;
  9.         TimestampIncrement = TimeSpan.FromMinutes(5);
  10.         StartDateTime = DateTimeOffset.Now.AddMonths(-5);
  11.     }
  12.  
  13.     [DataMember]
  14.     public DateTimeOffset StartDateTime { get; set; }
  15.  
  16.     [DataMember]
  17.     public TimeSpan RefreshInterval { get; set; }
  18.  
  19.     [DataMember]
  20.     public TimeSpan TimestampIncrement { get; set; }
  21.  
  22.     [DataMember]
  23.     public int NumberOfItems { get; set; }
  24.     
  25. }

Note that it is marked as DataContract and each property is marked with the DataMember attribute … this is very important. The configuration classes may – and probably will - end up being serialized. This was also true in the adapter model; all of your configuration classes need to be able to be serialized. Now you can also use Serializable() but I’ve found that, at times, the C# compiler goes somewhat insane when you do that and you get exceptions when serializing the configuration classes. Now for our payload class. We have an ItemId and a Value of type double and, since this class is for testing and we want to be able to “see” what’s going on (and for other reasons), we also have some properties on here to help with that – RunNumber (which generation “run” created it) and EnqueueTimestamp (the time it was actually created, which may be different from the event’s start time). Finally, we also have a static helper method that’ll create a bunch of these test events for enqueuing for the “run”.

TestDataEvent
  1. public class TestDataEvent
  2. {
  3.     private static Random rdm = new Random();
  4.     public static List<TestDataEvent> CreateNext(TestDataInputConfig config, int runNumber)
  5.     {
  6.         List<TestDataEvent> newReferenceData =
  7.             new List<TestDataEvent>(config.NumberOfItems);
  8.  
  9.         for (int i = 0; i < config.NumberOfItems; i++)
  10.         {
  11.             newReferenceData.Add(new TestDataEvent()
  12.                     {
  13.                         ItemId = "Item" + i.ToString(),
  14.                         RunNumber = runNumber,
  15.                         EnqueueTimestamp = DateTime.Now,
  16.                         Value = rdm.NextDouble() * 10
  17.                     });
  18.         }
  19.         
  20.         return newReferenceData;
  21.     }
  22.  
  23.     public string ItemId { get; set; }
  24.     public int RunNumber { get; set; }
  25.     public DateTime EnqueueTimestamp { get; set; }
  26.     public double Value { get; set; }
  27. }

Now that we have all of the foundational building blocks completed, we are finally ready to create our producer. For now, we’ll have our producer enqueue the CTIs after each “batch” of events; this keeps all of the events in the same “run” within the same CTI span.

TestDataEventProducer
  1. public class TestDataProducer : StreamEventProducer<TestDataEvent, TestDataInputConfig>
  2.     {
  3.  
  4.         private DateTimeOffset _nextStartTime;
  5.         private Timer _enqueueTimer;
  6.         private int _runNumber = 0;
  7.  
  8.         public TestDataProducer(TestDataInputConfig config): base(config)
  9.         {
  10.             _enqueueTimer = new Timer(ProduceEvents, null, Timeout.Infinite, Timeout.Infinite);
  11.             this._nextStartTime = config.StartDateTime;
  12.         }
  13.  
  14.         protected override void Start()
  15.         {
  16.             // Change the timer to start it.
  17.             _enqueueTimer.Change(TimeSpan.Zero, Configuration.RefreshInterval);
  18.         }
  19.  
  20.         /// <summary>
  21.         /// Main driver to read events from source and enqueue them.
  22.         /// </summary>
  23.         private void ProduceEvents(object state)
  24.         {
  25.             _runNumber++;
  26.  
  27.             var newEvents =
  28.                 TestDataEvent.CreateNext(Configuration, _runNumber);
  29.  
  30.             var eventTimestamp =  _nextStartTime;
  31.  
  32.             var publishEvents = (from e in newEvents
  33.                                  select new StreamInputEvent<TestDataEvent>(e)
  34.                                             {
  35.                                                 Start = eventTimestamp
  36.                                             }).ToList();
  37.  
  38.             foreach (var publishedEvent in publishEvents)
  39.             {
  40.                 this.PublishEvent(publishedEvent);
  41.             }
  42.  
  43.             //Enqueue our CTI
  44.             this.PublishEvent(new StreamInputEvent<TestDataEvent>(eventTimestamp.AddTicks(1)));
  45.             _nextStartTime += Configuration.TimestampIncrement;
  46.  
  47.         }
  48.  
  49.         protected override void Dispose(bool disposing)
  50.         {
  51.             if (disposing)
  52.             {
  53.                 if (_enqueueTimer != null)
  54.                 {
  55.                     _enqueueTimer.Dispose();
  56.                     _enqueueTimer = null;
  57.                 }
  58.             }
  59.             base.Dispose(disposing);
  60.         }
  61.  
  62.  
  63.     }

At this point, we are actually ready to start using the StreamInsight 2.1 APIs to get data from our new producer! Of course, we aren’t done but let’s see how we could do this. It’s there that the GetPointEvent() helper method is really, really handy. It makes the code very readable and it happens to be highly reusable as well, both of which are Very Good Things™.

Running the Process ...
  1. var config = new TestDataInputConfig (){
  2.     NumberOfItems=20,
  3.     RefreshInterval=TimeSpan.FromMilliseconds(500)
  4. };
  5.  
  6. var data = cepApplication.DefineObservable(
  7.     () => new TestDataProducer(config)).ToPointStreamable(e => e.GetPointEvent());
  8.  
  9.  
  10. var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(
  11.     e => Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}", e.ItemId, e.RunNumber, e.Value)));
  12.  
  13. data.Bind(sink).Run();

And when we do that, we get our data flowing. You can download the code from my SkyDrive.

Tags:

Code Sample | StreamInsight

Houston TechFest Sessions

September 7, 2012 11:36 AM

Houston TechFest is tomorrow at the Reliant Center. And, of course, I will be speaking. This year, I am doing two sessions:

Back to Basics: .NET Essentials You May Have Forgotten (3:40 PM – 4:40 PM)

Now that the .NET Framework has been out for 10 years and so many tools are available to make things easier, many developers seem to have forgotten some of the core principles of .NET. In this presentation, I'll review the fundamentals of the .NET Framework and what makes all of that magic happen. From the CLR to IL to memory management and garbage collection, we'll touch on these core concepts that every developer should know.

I originally did this presentation at HDNUG and it was very well received – better than I had thought it would be. However, because the timeframe is a bit more limited at TechFest vs. HDNUG, I’ll need to pare this down a little bit so I don’t go over.


Introducing StreamInsight 2.1 (4:50 PM – 5:50 PM)

StreamInsight provides real-time analytics on large volumes of data with a response time measured in milliseconds, bringing a new level of capabilities to the Sql Server platform. This session will provide an overview of StreamInsight, its capabilities and use cases. It will also provide details on StreamInsight 2.1, what's been added and how it makes your real-time applications easier and more robust.

I originally delivered this at Sql Saturday in Baton Rouge. I’ve tweaked and honed it a bit. Some will be familiar to my other StreamInsight intro presentations – how it works, etc. There are some updates on more use cases that we’ve seen “out in the wild” in the past year or so. StreamInsight is coming into its own now too … we are seeing more and more interest and adoption as folks get a better understanding of where it can add to their business.

I hope to see you there!

Tags:

Community | StreamInsight | .NET Stuff | User Groups