About the author

J Sawyer is a developer based in Houston, TX who absolutely loves to write code. After spending 9 years at Microsoft, he moved on to other things and is currently the Lead Developer for the RealTime Data Management team at Logica US. He spends his days building Really Cool Things around StreamInsight and having a blast doing it.

He has been involved with HDNUG, one of the oldest and largest .NET-focused user groups in the US, since its inception in 2001 and has watched it grow from 5-10 technologists meeting around a conference table to a thriving community of over 5000 with regular meeting attendance averaging 100 attendees. He currently serves as the Vice President. You can join him at HDNUG on the second Thursday of every month at the Houston Microsoft office.

He also loves to ride his Yamaha FZ1. And sometimes his Ninja 650. And also his Honday XR-400 dirt bike. But he doesn't code and ride at the same time. That would be bad.

Dual Mode Data Sinks–Part II

April 13, 2013 7:04 PM

Well, it’s been a while. Far too long, in fact. I’ve been on a project that’s … well … challenged and that’s consumed far more of my time than I would care to really dwell on. I’ll not say any more than that. So … my apologies for taking so long to continue.

With that out of the way, let’s pick up with our data sinks. There were two major things left undone with the last post. First, the API. I left it without an abstracted, cleaner API that was similar to the adapter model. That’s the first thing to add. Now, we don’t want to go “all the way” with this and produce a runnable process – that would defeat one of the coolest features of a process – the ability to contain multiple sources and sinks on one clean block that starts and stops as a whole. This also allows us to send individual queries to multiple sinks (without DQC or subjects) and makes reuse of the sources across multiple sinks simpler than in the adapter model with DQC. And, to keep in consistent with the adapter model, we want to attach our sink to the stream directly from the IQStreamable interface – and this means extension method. (I’ll confess that I have come to deeply love extension methods, by the way … I really like the clean API that they create). Our extension method will encapsulate creating the observer and then binding the stream to the observer. So that we can include multiple bindings within a process, we’ll just return the streamable binding that we create. Our extension method looks like this:

ToBinding Extension Method
  1. public static IRemoteStreamableBinding ToBinding<TPayload>(
  2.     this IQStreamable<TPayload> stream,
  3.     Application cepApplication,
  4.     Type consumerFactoryType,
  5.     object configInfo,
  6.     EventShape eventShape)
  7. {
  8.     var factory = Activator.CreateInstance(consumerFactoryType) as ISinkFactory;
  9.  
  10.     if (factory == null)
  11.     {
  12.         throw new ArgumentException("Factory cannot be created or does not implement ISinkFactory");
  13.     }
  14.  
  15.     switch (eventShape)
  16.     {
  17.         case EventShape.Interval:
  18.             var intervalObserver =  cepApplication.DefineObserver(() => factory.CreateIntervalObserverSink<TPayload>(configInfo));
  19.             return stream.Bind(intervalObserver);
  20.             
  21.         case EventShape.Edge:
  22.             var edgeObserver = cepApplication.DefineObserver(() => factory.CreateEdgeObserverSink<TPayload>(configInfo));
  23.             return stream.Bind(edgeObserver);
  24.             
  25.         case EventShape.Point:
  26.             var pointObserver = cepApplication.DefineObserver(() => factory.CreatePointObserverSink<TPayload>(configInfo));
  27.             return stream.Bind(pointObserver);
  28.             
  29.         default:
  30.             throw new ArgumentOutOfRangeException("eventShape");
  31.     }
  32.     
  33. }

We won’t call it ToQuery(), doing so, while consistent, wouldn’t be descriptive at all. A binding that then gets run in a process is an inherently different thing from a query. With the adapter model, a query is a single runnable unit of logic; a binding is not. A binding can only run within the context of a process – it is the process, not the binding, that is the closest corollary to the query. But, the naming and the arguments are similar enough. Now, when we want to bind a stream to a sink, it looks like this:

Binding a Stream to a Sink
  1. var sinkConfig = new ConsoleOutputConfig()
  2.     {
  3.         ShowCti = true,
  4.         CtiEventColor = ConsoleColor.Blue,
  5.         InsertEventColor = ConsoleColor.Green
  6.     };
  7.  
  8. var binding = data.ToBinding(cepApplication, typeof (ConsoleOutputFactory), sinkConfig, EventShape.Point);
  9. binding.Run("Hello");

You’ll also notice, if you haven’t noticed before, that this API forces you to use the entire event, not just the payload, when sending to the sink. While you can just send the payload to the sink and ignore the event itself, that seems quite pointless and can also cause quite a bit of confusion. I’ve seen a lot of folks put timestamps in the payload; I typically don’t. In the vast majority of cases, I rely on – and use – the timestamps on the event itself. These are the timestamps that actually matter; these are the timestamps that the StreamInsight engine is using when evaluating the event. These are the timestamps of the event within the application timeline. Anything in the payload is just an attribute of the event; the start time and end time are a key part of the definition. Also, if you only send the payload to your sink, you won’t get any CTIs at all. CTIs are kinda important, I think, for a sink for a couple of reasons. First, even if you have no events coming through but you have CTIs, you know that the engine is running and pumping data through. This is particularly useful when you aren’t getting any data at all in your sink. Without the CTIs, you won’t have a good idea if your query is actually getting processed through the engine or if you have some other error in your logic. Second … CTI’s let you know when you can/should write to a durable store in a batch. Here’s the thing – Insert events don’t get released to your sink until there is a CTI. That CTI tells you, also, that all of the events up to that point in the application time have been released to your sink. So that’s the perfect time to batch up any writes that you have to a durable store (say, Sql Server) and write them in one shot. Batched updates/inserts are going to scale far better than single writes and that’s always a very good thing in a StreamInsight application. You always need to remember that, in many cases, that sink is going to be the biggest potential bottleneck in the application because it usually involves some sort of I/O. And that I/O is always going to be slower than the raw CPU and memory-bound performance that you can get from your queries.

The next thing on our list is a bit tougher. I mentioned it in my previous post and it’s the one thing that I really didn’t like about the entire reactive model when I first saw it. You see, I’m a big fan of untyped adapters, especially output adapters. Yes, they are pretty useful on the input side as well but on the output they are absolutely essential. You can’t always know what your payload is going to look like and you don’t want to be writing a sink for every different payload that you dream up. For input … in a lot of cases, you can do OK since you’ll have a good idea of what the schema is going to look like. This is something that also came up recently on the StreamInsight forum so I know I’m not the only one that misses it. Fortunately, the .NET Framework gives us a way to make this happen. It’s more work than a simple untyped adapter but, with reflection, we can get the same kind of flexibility that we had with untyped adapters in our sinks.

What we want to do is to have, at the end, something similar to what an output adapter provides for us – name/value pairs where nested classes have a [ParentProperty].[ChildProperty] type name. While we’re at it, we’ll simplify it a bit; it’ll be a simple, straightforward dictionary with the name and the value, rather than having a separate event definition and then matching the index with the name, as in an untyped adapter. At the highest level, we’ll have a method called "GetPropertyValues” that takes the object and handles all of the details for us. Of course, it’ll be an extension method.

GetPropertyValues
  1. /// <summary>
  2. /// Returns the values of properties as name/value pairs.
  3. /// </summary>
  4. /// <param name="source">The object to read properties from</param>
  5. /// <returns></returns>
  6. public static Dictionary<string, object> GetPropertyValues(this object source )
  7. {
  8.     var propertyDictionary = new Dictionary<string, object>();
  9.     //Get all of the properties.
  10.     AppendPropertyValues(source, string.Empty, propertyDictionary);
  11.     return propertyDictionary;
  12.  
  13. }

The real work is in the AppendPropertyValues method. This takes the dictionary, the object and a property name prefix and appends the properties and their values to the dictionary. To do this, first we get a list of the public instance properties on the object. From there, we loop over them. If one of them is a class, we then recurse into AppendPropertyValues but adding the source property name as the prefix. After that, we also get the fields with the same binding flags; the GetProperties method won’t return the fields and there isn’t a single reflection method that I could find that would give me both in a single call.

AppendPropertyValues
  1. private static void AppendPropertyValues(object source, string prefix, Dictionary<string, object> propertyDictionary)
  2. {
  3.     var properties = source.GetType().GetProperties(BindingFlags.Public | BindingFlags.Instance);
  4.     foreach (var propertyInfo in properties)
  5.     {
  6.         if (_validProp.Contains(propertyInfo.PropertyType))
  7.         {
  8.             var method = propertyInfo.GetGetMethod();
  9.             object value = method.Invoke(source, null);
  10.             propertyDictionary.Add(prefix + propertyInfo.Name, value);
  11.         }
  12.         else if(propertyInfo.PropertyType.IsClass)
  13.         {
  14.             var method = propertyInfo.GetGetMethod();
  15.             object value = method.Invoke(source, null);
  16.             AppendPropertyValues(value, prefix + propertyInfo.Name + ".", propertyDictionary);
  17.         }
  18.     }
  19.     var fields = source.GetType().GetFields(BindingFlags.Public | BindingFlags.Instance);
  20.     foreach (var fieldInfo in fields)
  21.     {
  22.         if (_validProp.Contains(fieldInfo.FieldType))
  23.         {
  24.             object value = fieldInfo.GetValue(source);
  25.             propertyDictionary.Add(prefix + fieldInfo.Name, value);
  26.         }
  27.         else if (fieldInfo.FieldType.IsClass)
  28.         {
  29.             var value = fieldInfo.GetValue(source);
  30.             AppendPropertyValues(value, prefix + fieldInfo.Name + ".", propertyDictionary);
  31.         }
  32.     }
  33. }

With this now in place, we have the same capabilities in our sinks that we had in our output adapters. Using this, we can change our ConsoleDataConsumer to display the payload properties, rather than just some basic header info.

Code Snippet
  1. if (outputEvent.EventKind == EventKind.Insert)
  2. {
  3.     Console.ForegroundColor = Configuration.InsertEventColor;
  4.     
  5.     var eventValues = outputEvent.Payload.GetPropertyValues();
  6.     StringBuilder output = new StringBuilder(2048);
  7.     foreach (var eventValue in eventValues)
  8.     {
  9.         output.Append(eventValue.Key).Append(":").Append(eventValue.Value.ToString()).Append("\t");
  10.     }
  11.     Console.WriteLine("Insert Event Received at " + outputEvent.StartTime + "\t" + output.ToString());
  12. }
  13. else if (Configuration.ShowCti)
  14. {
  15.     Console.ForegroundColor = Configuration.CtiEventColor;
  16.     Console.WriteLine("CTI event received at " + outputEvent.StartTime);
  17. }
  18. Console.ResetColor();

Now … we could certainly do some optimizations on this. For example, we could cache the property names and definitions for each of the types. I’ll leave that as an exercise for later … or for you, dear blog reader.

You can download the solution from my SkyDrive. As you look, you’ll also see that I’ve done some refactoring on the project names and namespaces …

Tags:

StreamInsight | Code Sample

Dual-Mode Data Sinks - Part I

February 18, 2013 12:24 PM

Now that we have the input completed, we need to start working on the output adapters. As with the input adapters/sources, we’ll create an architecture that allows you to use the same core code whether you are using the pre-2.1 adapter model or the 2.1 and later sink model.

As with our StreamInputEvent, we’ll create an abstraction that allows us to handle any event shape with the same code. This StreamOutputEvent should have properties that express all of the possible variations in event shapes as well as a generic property to hold the payload. Now, if you look at several of the StreamInsight samples, you’ll notice that they only send the payload to the sink. Certainly, that makes a couple of things easier but I don’t think that it’s really the best way to do things. A key aspect of everything StreamInsight is the temporal properties and you lose that if you don’t send the full event shape to your sink. And … the shape is really only a way of expressing the event. Internally, all events, regardless of shape, have start and end times that control their lifetime in the query engine. The shape really becomes a way of how you want to “see” the events and handle them in your output. There’s nothing that stops you from expressing the same query as an edge, a point or an interval. It will impact when the event gets “released” to the output adapters/sink by the engine but really, it’s just a matter of how you want to see the event and when you want to get it to your output. But, before I go any further, here’s our StreamOutputEvent:

StreamOutputEvent
public class StreamOutputEvent<TPayload>
{

    /// <summary>
    /// Creates an output event from a source event.
    /// </summary>
    /// <param name="sourceEvent">The source event.</param>
    /// <returns></returns>
    public static StreamOutputEvent<TPayload> Create(PointEvent<TPayload> sourceEvent)
    {
        var outputEvent = new StreamOutputEvent<TPayload>()
            {
                StartTime = sourceEvent.StartTime,
                EventKind = sourceEvent.EventKind,
                EventShape = EventShape.Point
            };

        if (sourceEvent.EventKind == EventKind.Insert)
        {
            outputEvent.Payload = sourceEvent.Payload;
        }
        return outputEvent;
    }

    /// <summary>
    /// Creates an output event from a source event.
    /// </summary>
    /// <param name="sourceEvent">The source event.</param>
    /// <returns></returns>
    public static StreamOutputEvent<TPayload> Create(IntervalEvent<TPayload> sourceEvent)
    {
        var outputEvent = new StreamOutputEvent<TPayload>()
        {
            StartTime = sourceEvent.StartTime,
            EventKind = sourceEvent.EventKind,
            EventShape = EventShape.Interval
        };

        if (sourceEvent.EventKind == EventKind.Insert)
        {
            outputEvent.EndTime = sourceEvent.EndTime;
            outputEvent.Payload = sourceEvent.Payload;
        }
        return outputEvent;
    }

    /// <summary>
    /// Creates an output event from a source event.
    /// </summary>
    /// <param name="sourceEvent">The source event.</param>
    /// <returns></returns>
    public static StreamOutputEvent<TPayload> Create(EdgeEvent<TPayload> sourceEvent)
    {
        var outputEvent = new StreamOutputEvent<TPayload>()
        {
            StartTime = sourceEvent.StartTime,
            EventKind = sourceEvent.EventKind,
            EventShape = EventShape.Edge
        };

        if (sourceEvent.EventKind == EventKind.Insert)
        {
            outputEvent.Payload = sourceEvent.Payload;
            outputEvent.EdgeType = sourceEvent.EdgeType;
            if (sourceEvent.EdgeType == Microsoft.ComplexEventProcessing.EdgeType.End)
            {
                outputEvent.EndTime = sourceEvent.EndTime;
            }
        }
        return outputEvent;
    }

    public DateTimeOffset StartTime { get; private set;  }
    public EventKind EventKind { get; private set; }
    public DateTimeOffset? EndTime { get; private set;  }
    public EventShape EventShape { get; private set;  }
    public EdgeType? EdgeType { get; private set; }
    public TPayload Payload { get; private set; }
}

With constructors for each different event shape, this is something that we can easily create from an event stream and then send to a single set of code that handles the outbound event.

When creating the sink and hooking it to the stream, it’s a matter of how you create your observer and the type specified for the TElement. Very simply, the key thing that dictates what StreamInsight “sends” to your sink is the type for the observer’s generic class parameter. If you specify IObserver<TPayload>, you’ll only get the payload. However, if you specify IObserver<PointEvent<TPayload>>, you’ll get a point event (and so on for intervals and edges). Since our event consumer should be able to consume events of any shape, we will actually need to implement the observer interface for each of the shapes. While it may be tempting to try to implement one interface based on TypedEvent<T>, it won’t work. Yes, I tried. But StreamInsight requires that your sinks specify the payload as the type for the observer or one of the TypedEvent<T> child classes. If you don’t specify an event shape, StreamInsight will handle the events and release them to the output adapter as though they were point events. For some very basic scenarios, this works. But when you start getting into some of the more interesting scenarios for StreamInsight, you’ll want to do a lot more with your output than to view it as a point. But … this is for the next post. Let’s get back to our implementation.

As we did with our input producer, we’ll create a common, abstract base class for all of our event consumers. Our concrete consumers will inherit from this class and handle whatever is necessary to write the data to our target, whatever it may be. Again, as with the producer, we’ll specify a configuration class; while the reactive StreamInsight API no longer requires it, the reality is that you will want to have a configuration class for your sinks; you don’t want to hard-code things like database connection strings, web service target URIs or anything like that in your event consumers. But the key thing that our base class will do is to implement the interface for each of the event shapes, translate them to our StreamOutputEvent and then send to our actual event consumer’s code.

StreamEventConsumer
public abstract class StreamEventConsumer<TPayloadType, TConfigType> :
    IObserver<PointEvent<TPayloadType>>,
    IObserver<EdgeEvent<TPayloadType>>,
    IObserver<IntervalEvent<TPayloadType>>
{
    protected StreamEventConsumer(TConfigType configuration)
    {
        this.Configuration = configuration;
    }

    public TConfigType Configuration { get; private set; }

    public abstract void Completed();

    public abstract void Error(Exception error);

    public abstract void EventReceived(StreamOutputEvent<TPayloadType> outputEvent);
    public void OnNext(PointEvent<TPayloadType> value)
    {
        EventReceived(StreamOutputEvent<TPayloadType>.Create((PointEvent<TPayloadType>)value));
    }

    public void OnNext(IntervalEvent<TPayloadType> value)
    {
        EventReceived(StreamOutputEvent<TPayloadType>.Create((IntervalEvent<TPayloadType>)value));
    }

    public void OnNext(EdgeEvent<TPayloadType> value)
    {
        EventReceived(StreamOutputEvent<TPayloadType>.Create((EdgeEvent<TPayloadType>)value));
    }

    public void OnCompleted()
    {
        Completed();
    }

    public void OnError(Exception error)
    {
        Error(error);
    }
}

From here, we’ll continue to build the architecture much the same way that we built the event producers. For each consumer, we’ll have a factory that handles the details of creating and starting the consumer based on a common interface.

ISinkFactory
interface ISinkFactory
{
    IObserver<PointEvent<TPayload>> CreatePointObserverSink< TPayload>(object config);
    IObserver<EdgeEvent<TPayload>> CreateEdgeObserverSink<TPayload>(object config);
    IObserver<IntervalEvent<TPayload>> CreateIntervalObserverSink<TPayload>(object config);
}

Between StreamEventConsumer and ISinkFactory, it’s now a small step to create a concrete factory and consumer – as well as the adapters. For simplicity’s sake, we’ll use a console consumer.

Console Data Consumer
public class ConsoleDataConsumer<TPayloadType>:StreamEventConsumer<TPayloadType, ConsoleOutputConfig>
{
    public ConsoleDataConsumer(ConsoleOutputConfig configuration) : base(configuration)
    {
    }

    public override void Completed()
    {
        //Nothing necessary.
    }

    public override void Error(Exception error)
    {
        Console.WriteLine("Error occurred:" + error.ToString());
    }

    public override void EventReceived(StreamOutputEvent<TPayloadType> outputEvent)
    {
        if (outputEvent.EventKind == EventKind.Insert)
        {
            Console.ForegroundColor = Configuration.InsertEventColor;
            Console.WriteLine("Insert Event Received at " + outputEvent.StartTime);
        }
        else if (Configuration.ShowCti)
        {
            Console.ForegroundColor = Configuration.CtiEventColor;
            Console.WriteLine("CTI event received at " + outputEvent.StartTime);
        }
        Console.ResetColor();
    }
}

Our factory handles the dirty details of hooking up to our source streams as well as our output adapters. By forcing the factory to implement methods for each event shape we both ensure that we get the interface that we need when creating the sink and tying it to our streams as well as the opportunity to say, in code, that a particular output sink doesn’t support specific shapes, should that be appropriate.

ConsoleOutputFactory
public class ConsoleOutputFactory:ISinkFactory , ITypedOutputAdapterFactory<ConsoleOutputConfig>  
    {

        public IObserver<PointEvent<TPayload>> CreatePointObserverSink<TPayload>(object config)
        {
            return new ConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);
        }

        public IObserver<EdgeEvent<TPayload>> CreateEdgeObserverSink<TPayload>(object config)
        {
            return new ConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);
        }

        public IObserver<IntervalEvent<TPayload>> CreateIntervalObserverSink<TPayload>(object config)
        {
            return new ConsoleDataConsumer<TPayload>((ConsoleOutputConfig)config);
        }

        public OutputAdapterBase Create<TPayload>(ConsoleOutputConfig configInfo, EventShape eventShape)
        {
            switch (eventShape)
            {
                case EventShape.Interval:
                    return new ObserverTypedIntervalOutputAdapter<TPayload>(CreateIntervalObserverSink<TPayload>(configInfo));
                    break;
                case EventShape.Edge:
                    return new ObserverTypedEdgeOutputAdapter<TPayload>(CreateEdgeObserverSink<TPayload>(configInfo));
                    break;
                case EventShape.Point:
                    return new ObserverTypedPointOutputAdapter<TPayload>(CreatePointObserverSink<TPayload>(configInfo));
                    break;
                default:
                    throw new ArgumentOutOfRangeException("eventShape");
            }
        }

        public void Dispose()
        {
            //throw new NotImplementedException();
        }
    }

We’ve not touched on the output adapters yet so now’s the time to introduce them; they are, after all, already referenced in our factory. As before, we have a single factory for our producers using the 2.1+ Reactive model as well as our legacy adapter mode. As with our input adapters, the output adapters are relatively thin wrappers around our event consumers that handle the details of lifetime. Unlike the input adapters, with the output adapters, we may well get some data after our “stop” event and we want to make sure that we dequeue all events before shutting down. To control this a little better, we use Monitor.Enter and Monitor.Exit directly rather than the basic lock{} block provided by C#. The lock block, by the way, creates, behind the scenes, a Monitor.Enter/Monitor.Exit pair. However, using this directly allows us to minimize the possibility of deadlocks if we get into a scenario where we are actively dequeuing events when we get a Resume call. By using Monitor.TryEnter(), we can attempt to enter our dequeuing thread from other threads without blocking. If the lock has already been acquired, we don’t need to spin up another thread to dequeue and we certainly don’t need to block waiting for a lock that we won’t actually need once we get it. Our dequeue thread will continue to dequeue 1 event at a time until nothing is left in the queue. And we need to make sure that the dequeue operation is synchronized – only 1 thread can dequeue at a time anyway. Adding multiple threads to the dequeue operation typically won’t help us and we want to make sure that we have all available threads available to process actual query results. Now … once we’ve dequeued, we may want to use techniques to multi-thread sending the results to the final target. But … our actual dequeue from each query/process should be single threaded. Keep in mind, however, that you’ll have multiple, single-threaded output sinks in most real-world applications. You will be multi-threaded, have no worries there. And calls into our event consumers can come from any thread, which is why we need to use locks to make sure that we’re properly synchronized. This is particularly important when our output adapter is stopped. After stop is called, we’ll get one more change to empty our queue. We use the monitor to make sure that we do empty all available events from the queue before calling Stopped(). This ensures that we’ll have a nice, clean shutdown with no hangs and no ObjectDisposedExcetpions.

Point Output Adapter
public class ObserverTypedPointOutputAdapter<TPayloadType>
        : TypedPointOutputAdapter<TPayloadType>
    {
        private readonly IObserver<PointEvent<TPayloadType>> _sinkObserver;

        public ObserverTypedPointOutputAdapter(IObserver<PointEvent<TPayloadType>> sinkObserver)
        {
            _sinkObserver = sinkObserver;
        }

        public override void Stop()
        {
            try
            {
                Monitor.Enter(_monitorObject);
                //On last round to dequeue
                EmptyQueue();
                //Completed
                _sinkObserver.OnCompleted();
            }
            finally
            {
                Monitor.Exit(_monitorObject);
            }
            base.Stop();
            Stopped();
        }


        public override void Resume()
        {
            System.Threading.Thread thd = new Thread(DequeueEvents);
            thd.Start();
        }

        public override void Start()
        {
            System.Threading.Thread thd = new Thread(DequeueEvents);
            thd.Start();
        }

        private object _monitorObject = new object();
        private void DequeueEvents()
        {
            if (this.AdapterState != AdapterState.Running)
            {
                return;
            }
            //Ensures only 1 thread is dequeuing and no other threads are blocked.
            if (Monitor.TryEnter(_monitorObject))
            {
                try
                {
                    EmptyQueue();
                }
                catch (Exception ex)
                {
                    _sinkObserver.OnError(ex);
                }
                finally
                {
                    Monitor.Exit(_monitorObject);
                    this.Ready();
                }
                    
            }
        }

        private void EmptyQueue()
        {
            PointEvent<TPayloadType> dequeuedEvent;
            
            while (this.Dequeue(out dequeuedEvent) == DequeueOperationResult.Success  )
            {
                    _sinkObserver.OnNext(dequeuedEvent);
            }
        }
    }

Now that we have all of the core pieces in place, let’s take a look at what we need to do to hook our sink up to the console output. It’s actually very simple.

Hooking up to a stream

private static void RunProcess(Application cepApplication)
{
    var config = new TestDataInputConfig (){
        NumberOfItems=20,
        RefreshInterval=TimeSpan.FromMilliseconds(500)
    };


    var data = RxStream<TestDataEvent>.Create(cepApplication, typeof (TestDataInputFactory), config, EventShape.Point);

    var factory = new ConsoleOutputAdapter.ConsoleOutputFactory();
    
    
    var sink = cepApplication.DefineObserver(() => factory.CreatePointObserverSink<TestDataEvent>
                                                       (new ConsoleOutputAdapter.ConsoleOutputConfig()
                                                           {
                                                               ShowCti = true,
                                                               CtiEventColor = ConsoleColor.Blue,
                                                               InsertEventColor = ConsoleColor.Green
                                                           }));

    data.Bind(sink).Run();

}

There’s one thing that you may notice … the sink needs to know all of the details about the data class. This is far from ideal … and one of the things that I found so powerful about the untyped adapter model – you weren’t tied to the schema of your data classes. There are various ways that we can handle this but that’s a topic for the next entry. Until then, you can download the code from my SkyDrive.

Tags:

Dual Mode Data Sources-Part III

January 2, 2013 9:41 PM

In the two previous postings, we would through creating the data sources and exposing them to both the new Reactive-centric API and the “legacy” adapter-centric API. While we’ve accomplished that, I also said that we’d revisit the 2.1 code to add a layer of consistency to the APIs; we’re going to “bring back” factories and integrate these into our API. Why on earth would I do that? Didn’t we just get rid of them to make things simpler? Well, yes, we did just do away with the requirement of a factory but that doesn’t mean that they aren’t a good idea. Implementing the factory pattern will hide the details of creating the actual data source – our current one is pretty simple but others may get quite complex – and provide a layer to “check” requirements before we try to start the query (we’re already doing this with the adapter factory). I also happen to like consistent APIs and keeping consistency whenever practical and possible.

We’ll start by creating an interface. We may – and likely will – also wind up creating an abstract base class that implements the interface and handles common functionality but that will be refactored later after we write some more sources and get a better feel of how to best define the base class. And … having both an interface and a base class gives us the greatest level of flexibility when implementing later on; because our API is based on interfaces, we can inherit from other, existing code and/or components and just add the interface. If we based our API exclusively on base classes, the single-inheritance rule would limit what that. This interface will be pretty simple and only have a single method.

ISourceFactory
/// <summary>
/// Interface for data source factories.
/// </summary>
public interface ISourceFactory
{
    /// <summary>
    /// Creates an observable source.
    /// </summary>
    /// <typeparam name="TPayload">Type of the payload.</typeparam>
    /// <param name="config">Configuration class.</param>
    /// <param name="eventShape">Shape of the events</param>
    /// <returns></returns>
    IObservable<StreamInputEvent<TPayload>> CreateObservableSource<TPayload>(object config, EventShape eventShape);
}

You may notice a slight different between this and the input adapter factory interface – config is not strongly typed. Yes, we could do that … define a generic interface and then use reflection to call it. But, to be honest, I really didn’t want to go through all of the contortions to make that happen. So the config is an object.

Now, when we implement this on our existing factory, we also refactor things a bit to maximize code reuse.

Adapter + Source Factory
public sealed class TestDataInputFactory : ITypedInputAdapterFactory<TestDataInputConfig>, ISourceFactory
    {

        [SuppressMessage("Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter", Justification = "By Design")]
        public InputAdapterBase Create<TPayload>(TestDataInputConfig configInfo, EventShape eventShape)
        {
            CheckPayloadType<TPayload>();
            return new ObservableTypedPointInputAdapter<TestDataEvent, TestDataInputConfig>(CreateProducer(configInfo, eventShape));
        }

        public IObservable<StreamInputEvent<TPayload>> CreateObservableSource<TPayload>(object config, EventShape eventShape)
        {
            //Check the payload type.
            CheckPayloadType<TPayload>();
            //Check the config class for the proper type.
            TestDataInputConfig typedConfig = config as TestDataInputConfig;
            if (typedConfig == null)
            {
                //Invalid cast
                throw new ArgumentException("Configuration class must be of type TestDataInputConfig");
            }
            return (IObservable<StreamInputEvent<TPayload>>)CreateProducer(typedConfig, eventShape);
        }

        private static void CheckPayloadType<TPayload>()
        {
            //Check the payload.
            if (typeof(TPayload) != typeof(TestDataEvent))
            {
                //this won't work.
                //throw an exception.
                throw new InvalidOperationException("Specified type must be of " + typeof(TestDataEvent).FullName);
            }
        }


        private TestDataProducer CreateProducer(TestDataInputConfig config, EventShape eventShape)
        {
            switch (eventShape)
            {
                case EventShape.Point:
                    //Create the publisher.
                    return new TestDataProducer(config);
                default:
                    throw new ArgumentException(string.Format(
                        System.Globalization.CultureInfo.InvariantCulture,
                        "TestDataInputFactory cannot instantiate adapter with event shape {0}",
                        eventShape.ToString()));
            }
        }

        public void Dispose()
        {
        }
    }

You’ll notice that both methods … source and adapter … use all of the same validation logic and code. Using this in our existing code isn’t too difficult. Our new “RunProcess()” now looks like the following:

Run Process
private static void RunProcess(Application cepApplication)
{
    var config = new TestDataInputConfig (){
        NumberOfItems=20,
        RefreshInterval=TimeSpan.FromMilliseconds(500)
    };


    var data = cepApplication.DefineObservable(
        () => new TestDataInputFactory().CreateObservableSource<TestDataEvent>(config,EventShape.Point ))
        .ToPointStreamable(e => e.GetPointEvent());

    
    var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(
        e => Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}", e.ItemId, e.RunNumber, e.Value)));

    data.Bind(sink).Run();

}

We can still make it better, though, and make make it look more like the pre-2.1 API. We’ll start by creating an RxStream class and add methods for creating observables. We’ll also want to make sure that these methods are remotable – so we can work with either a local or a remote instance without any code changes. This was a challenge with the pre-2.1 API; you could pretty easily get yourself into a situation where your code work work on a local, in-process instance but not work at all when you were connecting to a remote instance. With DefineObservable, however, it’s actually defining a remote function that returns an observable and that’s what we call … whether in process or remote. If it works in-process then it’ll work out of process. However, you may wind up getting yourself into a situation with mysterious serialization errors … you need to make sure that whatever you pass to your methods if fully serializable. That’s why our configuration class has the DataContract attribute. With our CreateObservable method, we’ll first check to see if we have a reference to the function (and notice that it is of type Func<>). If not, we check to see if it’s been created and deployed. If not, we create and deploy it. We could also put this same code in a static constructor – it wouldn’t make much difference. The actual work is done by an InstantiateObservable private method and that’s what our defined Observable actually calls.

RxStream Observables
private static Func<Type, object, EventShape, IQbservable<StreamInputEvent<TPayload>>> _observable;

public static IQbservable<StreamInputEvent<TPayload>> CreateObservable(Application cepApplication,
    Type sourceFactoryType,
    object configInfo,
    EventShape eventShape)
{
    string entityName = "Observable:" + typeof (TPayload).FullName;
    if (_observable == null)
    {
        //Check the application.
        if (cepApplication.Entities.ContainsKey(entityName))
        {
            _observable =
                cepApplication.GetObservable<Type, object, EventShape, StreamInputEvent<TPayload>>(entityName);
        }
        else
        {
            //Define and deploy.
            _observable = cepApplication.DefineObservable(
                (Type t, object c, EventShape e) => InstantiateObservable(t, c, e));
            _observable.Deploy(entityName);
        }

    }
    return _observable.Invoke(sourceFactoryType, configInfo, eventShape);
}

private static IObservable<StreamInputEvent<TPayload>> InstantiateObservable(Type sourceFactoryType,
                                                                              object configInfo,
                                                                              EventShape eventShape)
{
    var sourceFactory = Activator.CreateInstance(sourceFactoryType) as ISourceFactory;
    if (sourceFactory == null)
    {
        throw new ArgumentException("Specified type is not a source factory.");
    }
    return sourceFactory.CreateObservableSource<TPayload> (configInfo, eventShape);
}

Now all that we have left to do is to create methods to also create the streams, rather than just the observables. This will be our typical use case but we’ll still keep the observable method public as well to give us the most flexibility when we are actually using this in anger.

RxStream Create
public static IQStreamable<TPayload> Create(
    Application cepApplication, Type sourceFactoryType, object configInfo, EventShape eventShape)
{
    return Create(cepApplication, sourceFactoryType, configInfo, eventShape, null);
}

public static IQStreamable<TPayload> Create(
    Application cepApplication, Type sourceFactoryType, object configInfo, EventShape eventShape, AdvanceTimeSettings advanceTimeSettings)
{
 
    var observable = CreateObservable(cepApplication, sourceFactoryType, configInfo, eventShape);

    switch (eventShape)
    {
        case EventShape.Interval:
            return observable.ToIntervalStreamable(e => e.GetIntervalEvent(), advanceTimeSettings);

        case EventShape.Edge:
            return observable.ToEdgeStreamable(e => e.GetEdgeEvent(), advanceTimeSettings);

        case EventShape.Point:
            return observable.ToPointStreamable(e => e.GetPointEvent(), advanceTimeSettings);

        default:
            throw new ArgumentOutOfRangeException("eventShape");
    }
}

Once this is in place, the code to create our stream look remarkably similar to the adapter-centric code:

Creating the Stream
var config = new TestDataInputConfig (){
    NumberOfItems=20,
    RefreshInterval=TimeSpan.FromMilliseconds(500)
};


var data = RxStream<TestDataEvent>.Create(cepApplication, typeof (TestDataInputFactory), config, EventShape.Point);

There’s still more that we’ll need to do … for example, we’ll need to create overloads to import CTIs. But, for now, we’re done with the core API for our input adapters and will be moving on to output adapters/sinks in the next article.

You can download the current from SkyDrive

Tags:

StreamInsight | Code Sample

Using Subjects as a “Feedback Loop”

December 12, 2012 10:01 AM

OK, I’m going to switch gears for a minute here. Yes, I’ll be back to building the app but this is just too cool to not share. It’s something that I’ve thought about quite a bit off and on and a forum post got it going again so I put together a sample last night that shows how it can be done. The question … how do I take the output of a query and feed it back in to the query itself? This has a number of uses; in the case of the forum post, the developer wants to use this to update running totals and balances as orders are processed because these results impact the rest of the events. When working with sensors, this is also how you would do a “deadband” … an output isn’t produced unless the new value is more than a certain percentage change from the previous. For the deadband use case, you can’t just compare to the previous reading; if you do this, slow changes can accumulate but a new value is never reported because the change between any two values is less than the deadband threshold. So you need to compare the current reading value to the last reported value. I’ve done with with a UDO that maintains state (in a dictionary) for each of the items but it’s not as elegant as I’d like. And this is cooler anyway. Smile 

So … what is a subject? A subject is an observer and and observable; it will republish events that it observes (via IObserver) to others that are observing it (via IObservable). For a little primer on subjects, check out this blog entry from Colin on the StreamInsight team; it gives you a hint of what you can do with these. But that’s just the tip of the iceberg; subjects have become one of my favorite little tricks in StreamInsight because there’s so much that you can do with them. As Colin mentions, observers can come and go – so you can hook up multiple sinks to a single output query (something very difficult prior to 2.1). What Colin doesn’t mention is that your sources (observables) can come and go also … you can hook up multiple source queries that then get fed to one or more sinks.

Subjects are created and deployed independently of their sources and sinks – you can create a subject that you are using as a source before you actually hook it up to a sink. Or vice-versa. Subjects also allow you to share query results across processes – similar to what you would do with dynamic query composition but far, far, far more powerful and flexible.

Now, because a subject is both a source and a sink, you can use them to take the output results of a query and feed them back in to the source query … creating a feedback loop. Or a time warp.

Creating a subject is pretty simple. There isn’t much code required. You just need to know what type you’ll be using. You can bind the subject to the payload type only – you only get the payload and no temporal header – or to the TypedEvent (i.e. PointEvent, IntervalEvent, etc), in which case you will get the payload and the temporal header. Let’s start there.

GetOrCreateSubject
private static object _subjectLock = new object();
private static IRemoteSubject<TSubject, TSubject> GetOrCreateSubject<TSubject>(Application cepApplication, string subjectName)
{
    lock (_subjectLock)
    {
        bool subjectExists = cepApplication.Subjects.Any(s => s.Key == subjectName);
        if (subjectExists)
        {
            return cepApplication.GetSubject<TSubject, TSubject>(subjectName);
        }

        else
        {
            return cepApplication.CreateSubject(subjectName, () => new Subject<TSubject>());
        }
    }
}

As the name says, this method will get or create a subject, depending on if it’s already created or not. If you are wondering about the _subjectLock, that’s there to ensure that only one thread is in the middle of this operation at any time. If you are multi-threading calls to this, it is entirely possible to get into a situation where you are calling the same method with the same arguments on different threads and wind up with exceptions as multiple threads try to create the same subject at the same time.

Now, let’s get data running through our test app. I’m using the same app from my recent posts so this will look familiar. We’ll also get a reference to the subject.

Getting Data
var config = new TestDataInputConfig ()
            {
                NumberOfItems=20,
                RefreshInterval=TimeSpan.FromMilliseconds(500)
            };

            var data = cepApplication.DefineObservable(
                    () => new TestDataProducer(config)).ToPointStreamable(e => e.GetPointEvent());


            var lastReportedSubject = GetOrCreateSubject<PointEvent<TestDataEvent>>(cepApplication, "LastKnown");

We aren’t sending data to the subject just yet … that will be from the results later … but we do need to get a stream (IQStreamable) from it. But let’s stop for a moment and discuss some of the potential gotchas. First, remember that a stream in StreamInsight is a temporal stream. All of the events exist in time and the stream moves forward based on CTIs. We also know that we’ll be joining this feedback stream with the data stream and, when you do that, StreamInsight will synch to the slowest stream … the joined stream will move forward only when CTIs from both source streams move past the same timestamp. If our feedback loop simply publishes the CTIs that are generated from our result stream, it will never move forward. Why not? Because it’ll be waiting for CTIs to move past a timestamp from the result stream but the result stream can’t move forward because there is no CTI coming from our feedback stream. Did that make sense? It makes my brain hurt thinking about it too much. Anyway, what we need to do is to directly import the CTIs from the data stream. But that gives us another challenge. If we do that, we now have to worry about CTI violations from the data (insert) events being published from the results. You see, the events produced will have start times that are before the last-issued CTI; they must be or they wouldn’t be in the output stream. So we need to account for this when we enqueue the new events by shifting the start time forward so that it is after the last-issued CTI.

So let’s get started. To handle the CTIs, we’ll first filter the subject observable source so that it only produces Insert events. Then we will Merge these results it the CTIs from the data stream. The result will be an observable that we can then use as the source for the feedback stream. A note, however, on using the Merge technique to import CTIs … it will give you no protection at all from CTI violations. If you try to create/enqueue an event that violates the CTI, you will get an exception that you can’t really catch and it will cause your query to abort.

Feedback Source
var lastReportedObservableSource = lastReportedSubject
                                    //Get only the inserts from the subject, dropping the CTIs
                                    .Where(e => e.EventKind == EventKind.Insert)
                                    //Merge with the CTIs from the data stream.
                                    .Merge(data.ToPointObservable().Where(e => e.EventKind == EventKind.Cti));

Now that we have our observable, we need to make it a stream. While it is still an observable (IQbservable), we don’t have to follow the temporal/CTI rules because it’s not a temporal stream yet. However, when it’s a stream (IQStreamable), it is a temporal stream and we do have to follow the temporal/CTI rules … so we have to shift the timestamps of our point events so that they don’t violate the CTI. With the data source that we are using, we know that the CTIs are 1 tick past the “batch” of events so shifting the timestamp 1 tick will ensure that we’re good. In the real world, you may need to something a bit more sophisticated … see my previous blog article on importing CTIs in 2.1 for a couple of tips; I will be revisiting this in my app-building series.

Feedback Stream
var lastReportedSourceStream = lastReportedObservableSource
                                    .ToPointStreamable(e => e.EventKind == EventKind.Cti ? e :
                                        PointEvent<TestDataEvent>.CreateInsert(e.StartTime.AddTicks(1), e.Payload));

The hard part is done. Now that we have the streams, the rest is a series of StreamInsight queries. First, we want to make sure that we always have the last-known value available in the last reported value stream so we’ll use the (very) common ToSignal pattern. Next, we need to make sure that the initial values reported always go through. If we just join the streams, we’ll never get output because the initial events won’t be in the feedback stream. So we’ll do a LeftAntiJoin so that the events in the data stream with no matches in the feedback stream go through. (Depending on your scenario, it may be perfectly appropriate to “seed” the feedback stream with initial events … in the case of the forum post, these could be current balances when the application starts. If that’s the case, you can probably skip this step.) Then we calculate the percentage change from the last reported value for the current value; in this step, I’m including both the current and the last reported value in the stream (we don’t really need to) so that it’s easier to check the results using the event flow debugger and “see” everything that’s happening. From the calculation, we then select only those items that have changed more than a specified amount. Since we’re using random, very variable data in this sample, I’ve put the “deadband” at 100% but real-world will be somewhat less; anywhere from 1% – 10%, depending on the source. Union the two queries together and we’re done! (with the queries, at least).

The Queries
//Make sure that we have the last reported value always available.
var lastReportedStream = lastReportedSourceStream.ToSignal((e1, e2) => e1.ItemId == e2.ItemId);
    
//Make sure that our initial values always get reported in the output using LeftAntiJoin.
var firstValues =
    from i in data.LeftAntiJoin(lastReportedStream, (e1, e2) => e1.ItemId == e2.ItemId)
    select i;

//Calculate the change from the previous.
var calcStream = from d in data
                 join
                     lr in lastReportedStream
                     on d.ItemId equals lr.ItemId
                 select new
                     {
                         Value = d,
                         LastReported = lr,
                         PctChange = Math.Abs(1 - ((d.Value - lr.Value)/lr.Value))
                     };

//Select only those that have changed more than 100%
var changed = from c in calcStream
              where c.PctChange > 1.0
              select c.Value;

//Union first values with changed values
var final = firstValues.Union(changed);

All that’s left is to bind the output to the subject, creating the feedback loop as well as a console writer sink so that we see results. Then we run the process.

Bind and Run
var sink = cepApplication.DefineObserver(() => Observer.Create<TestDataEvent>(
    e => Console.WriteLine("TestEvent ItemId:{0} RunNumber:{1} Value{2}", e.ItemId, e.RunNumber, e.Value)));

//Bind to subject and console sink so that we see results.
//Run the process.
final.Bind(lastReportedSubject).With(final.Bind(sink)).Run();

I do have to note that when I run this, StreamInsight hangs on shutdown – not a good sign. I’m not exactly sure why this is happening but if I figure it out, I will update this post. Or … if you figure it out, you can send me a note using the “Contact” link on this blog.

You can download the code from my SkyDrive (of course!).

Tags:

Dual Mode Data Sources–Part II

December 11, 2012 9:11 PM

In my previous post, I walked through creating a basic structure for creating StreamInsight data sources using the Reactive model introduced with version 2.1. There was quite a bit of plumbing work to get done but now that it’s finished, we’re ready to move on to the next step … taking what we just created and making it available to the pre-2.1 API. I didn’t mention this in the previous post but these articles are the first in what’s going to be a series of articles that walk through building a StreamInsight application. In these articles, we’re going through the process to create our data sources – or, more accurately, the architectural framework for our data sources. We’ll do the same with sinks/output adapters and then we’ll start pulling in some real(istic) data and doing something more interesting. Each article will build on the next so you can follow along.

Now, back to our input. Since we already have the data flowing via the IObservable interface, it’s really easy to subscribe to this interface and implement an input adapter that uses the exact same code to generate events. All we need to do subscribe to our producer and enqueue the event when OnNext is called:

On Next
  1. public void OnNext(StreamInputEvent<TPayloadType> publishedEvent)
  2. {
  3.     if (AdapterState == AdapterState.Running)
  4.     {
  5.         var newEvent = publishedEvent.GetPointEvent();
  6.         this.Enqueue(ref newEvent);
  7.     }
  8. }

It really is that simple. Well, almost but before we get into that, let’s step back for a second. We have an event producer that creates our events. This producer handles all of the details of connecting to the data source and packaging the data for StreamInsight. Traditionally, we did all of this in the adapter itself but since we’ve already separated it, the adapter really doesn’t need to do much except enqueue. Looking at it that way, why do we need to have separate adapters for each type that we have? Well, the truth is: we don’t. With a little planning, we can have a single set of adapters that subscribe to a producer’s IObservable interface and handle enqueuing the events into StreamInsight as well as manage lifetime. These will be our adapters and they’ll wind up being very thin. So we’ll start by creating a new point typed input adapter. Like the base class, the payload type is a generic argument for the class – there’s no reason why we can’t use this for any type. Because Start() and Resume() are abstract methods on the base class, we have to implement them. We’ll also want to override Stop() and Dispose(bool disposing).

Adapter Skeleton
  1.     public class ObservableTypedPointInputAdapter<TPayloadType>
  2.         :TypedPointInputAdapter<TPayloadType>
  3.     {
  4.         public override void Start()
  5.         {
  6.             throw new NotImplementedException();
  7.         }
  8.  
  9.         public override void Resume()
  10.         {
  11.             throw new NotImplementedException();
  12.         }
  13.  
  14.         public override void Stop()
  15.         {
  16.              base.Stop();
  17.         }
  18.  
  19.         protected override void Dispose(bool disposing)
  20.         {
  21.              base.Dispose(disposing);
  22.         }
  23.     }

So let’s get started. First, we need to get our producer. That’s simple enough; we know that we need to have an adapter factory so that’s where we’ll create it and pass it in as an argument of the constructor. We pop this into a private class field – we won’t be calling this until we subscribe. Speaking of subscribe, this is exactly what we need to do in the Start() method. For the producer’s OnCompleted action, we’ll call the adapter’s Stop() method. Yes, you can call Stop() in an adapter and not stop the queries that are getting events from the stream – this is something that may happen with a read-once reference data adapter. It’s not commonly seen in the wild but we’ll make sure that we handle it correctly. One thing that is important to understand here … if you are having StreamInsight generate your CTIs for you with AdvanceTimeSettings, one of your options is to AdvanceToInfinityOnShutdown. This is directly impacted by the adapter calling Stopped() (or the producer, in fact, calling Complete()). When the source shuts down, StreamInsight will enqueue a CTI with a timestamp of DateTimeOffset.MaxValue, which serves to “clear” the stream of any pending events. It also happens to work very nicely if you happen to have a read-once data source of reference data.

Constructor & Start
  1. public ObservableTypedPointInputAdapter(IObservable<StreamInputEvent<TPayloadType>> eventProducer)
  2. {
  3.     this._eventProducer = eventProducer;
  4. }
  5.  
  6. public override void Start()
  7. {
  8.     _subscription = _eventProducer.Subscribe(OnNext,
  9.          () =>
  10.          {
  11.              if (AdapterState == AdapterState.Running)
  12.              {
  13.                  Stop();
  14.              }
  15.          });
  16. }

Start vs. Resume

In a lot of the samples, you’ll see these implemented the exact same way. However, in the real world, you may want to put a little more thought into how you handle Resume(). Odds are pretty good, especially if you are using Premium Edition, that you will never need to actually implement Resume(). It gets called after your adapter is put into a suspended state because the input queue is full. Yes, it is possible to fill up the input queue – you need about 200,000 unprocessed events that StreamInsight can’t get pushed through your queries. I have made this happen – but it took some effort. I actually had to continue enqueueing events just as fast as I could on multiple threads. Even with some convoluted queries, it still took about some time to fill up the input queue. Considering that I’ve seen StreamInsight process over 30,000 events/sec on a dual core i7 laptop and over 100,000 events/sec on a low-to-mid level server with CPU cycles to spare, you will have to work at it. But it can happen. Now, if you are using Standard Edition, you won’t be able to get to that level of throughput (and I haven’t done any testing on Standard’s throughput) but you can still expect to push a goodly number of events through standard edition. Now, when the input queue fills up, StreamInsight will return Full from the enqueue. Your adapter will need to call Ready() and then StreamInsight will call Resume() when you can start enqueuing events again. If you are using the StreamInsight 2.1 Reactive model, you won’t get any notification if the input queue fills. Instead, StreamInsight will simply ignore your events until it’s ready to receive events again. This is what I’m going to do in our excercises but it may not be appropriate, depending on your source and your needs. For example, if you are simply pushing recorded data through the engine, it would be easy enough to pause and then start back up again when Resume() is called. If, however, it’s live data, you may need to do something else. You will want to exercise some caution in buffering up events that you miss – you have no way of knowing when the adapter will start back up again and you could eat up FAR more memory than you want to. It is a potentially difficult issue but, fortunately, one that is pretty tough to hit in the real world. Especially if you are using Premium Edition, you’ll saturate network bandwidth (for inbound data) before you fill up the queue. In our exercise, we’re going to do the same thing that StreamInsight 2.1 does … we’re just going to ignore incoming events until Resume() is called.

Back to the code … in the snippet above, I enqueued directly from the OnNext function. While this works – if everything goes right – it doesn’t really represent what we’ll need to do in a real app to handle things like exceptions, queue full and the like. Also, while this class is designed to be a stand-alone input adapter than can be used with any observable source, there may be special case adapters – for example, specific handling of a pause/resume scenario – where we want to override specific behaviors with an inherited class so we want to make sure that we provide the right level of points to override. So we’ll take the enqueue operation and create our own method. In here, we do all of our state checks to make sure that we can enqueue, handle enqueue exceptions and queue full as well as keep track of our last-enqueued CTI. Finally, we will also include a critical section lock around the actual enqueue to make sure that we handline lifetime correctly and don’t report Stopped() when we’re in the middle of an enqueue. It is the same method described in my previous article on output adapter lifetime but applied to input adapters.

Enqueue Event
  1. protected virtual void EnqueueEvent(StreamInputEvent<TPayloadType> publishedEvent)
  2. {
  3.     if (!_canEnqueue) return;
  4.  
  5.     if (publishedEvent.Start < _lastCti)
  6.     {
  7.         return;
  8.     }
  9.     lock (_lockObject)
  10.     {
  11.         if (this.AdapterState != AdapterState.Running)
  12.         {
  13.             return;
  14.         }
  15.         var point = publishedEvent.GetPointEvent();
  16.         try
  17.         {
  18.             var enqueueResult = this.Enqueue(ref point);
  19.             if (enqueueResult == EnqueueOperationResult.Success && publishedEvent.EventKind == EventKind.Cti)
  20.             {
  21.                 _lastCti = point.StartTime;
  22.             }
  23.             if (enqueueResult == EnqueueOperationResult.Full)
  24.             {
  25.                 //Queue full!! Pause enqueuing.
  26.                 _canEnqueue = false;
  27.                 ReleaseEvent(ref point);
  28.                 //Let StreamInsight know we're ready to resume.
  29.                 Ready();
  30.             }
  31.         }
  32.         catch
  33.         {
  34.             ReleaseEvent(ref point);
  35.             throw;
  36.         }
  37.     }
  38. }

Our last step is to create our adapter factory. In our Create method, we’ll do a couple of checks to make sure that our arguments are valid – the payload is the proper type and, since our random data source only supports points, that we are implementing the correct shape.

Input Adapter Factory
  1. public sealed class TestDataInputFactory : ITypedInputAdapterFactory<TestDataInputConfig>
  2. {
  3.  
  4.     [SuppressMessage("Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter", Justification = "By Design")]
  5.     public InputAdapterBase Create <TPayload> (TestDataInputConfig configInfo, EventShape eventShape)
  6.     {
  7.  
  8.         if (typeof(TPayload) != typeof(TestDataEvent))
  9.         {
  10.             //this won't work.
  11.             //throw an exception.
  12.             throw new InvalidOperationException("Specified type must be of " + typeof(TestDataEvent).FullName);
  13.         }
  14.  
  15.         switch (eventShape)
  16.         {
  17.             case EventShape.Point:
  18.                 //Create the publisher.
  19.                 return new ObservableTypedPointInputAdapter<TestDataEvent, TestDataInputConfig>(
  20.                     new TestDataProducer(configInfo));
  21.             default:
  22.                 throw new ArgumentException(string.Format(
  23.                     System.Globalization.CultureInfo.InvariantCulture,
  24.                     "TestDataInputFactory cannot instantiate adapter with event shape {0}",
  25.                     eventShape.ToString()));
  26.         }
  27.  
  28.     }
  29.  
  30.     /// <summary>
  31.     /// Dispose method.
  32.     /// </summary>
  33.     public void Dispose()
  34.     {
  35.     }
  36.  
  37. }

Once we have the adapter factory, we’re ready to go. Because we’ve not done any output adapters yet, we’ll be using the ToPointObservable() method to write to the console. Keep in mind that this works only when hosting StreamInsight in-process; it’s not at all like the IObservable support in 2.1. But it works well enough to show that we have some data flowing through and that our adapter now supports both models from the same data producer!

Run Query
  1. private static void RunQuery(Application cepApplication)
  2. {
  3.     var config = new TestDataInputConfig()
  4.         {
  5.             NumberOfItems = 20,
  6.             RefreshInterval = TimeSpan.FromMilliseconds(500)
  7.         };
  8.  
  9.     var data = CepStream<TestDataEvent>.Create(cepApplication,
  10.         "TestData", typeof (TestDataInputFactory), config, EventShape.Point);
  11.  
  12.     var observable = data.ToPointObservable().Subscribe(
  13.         e =>
  14.             {
  15.                 if (e.EventKind == EventKind.Insert)
  16.                 {
  17.                     Console.WriteLine("TestEvent ItemId:{0} Run:{1} Value{2}",
  18.                                       e.Payload.ItemId, e.Payload.RunNumber, e.Payload.Value);
  19.                 }
  20.                 else
  21.                 {
  22.                     Console.WriteLine("CTI @ {0}", e.StartTime);
  23.                 }
  24.             });
  25. }

Using the same pattern, we can also create the adapters for interval and edge events pretty quickly. Unfortunately, a lot of it is copy-paste-tweak. I say unfortunately because it leads to a lot of redundant and repetitive code that I, for one, would prefer to create and maintain in a single place. However, our key method – enqueue – isn’t defined in any common base class; we have to inherit from the shape-specific adapters and there’s no way around it. While we could probably deal with this with another layer of abstraction, there isn’t, IMHO, a whole lot of value to that. Should that change, we’ll take a look at refactoring a bit then.

Looking at the input adapter factory, you’ll notice that it does some checking to make sure things are valid before creation; factories are good for that. There’s a bunch of other stuff that could be done in the factory as well and having a standard, generic factory interface is a good way to provide yourself with a clean, clear and decoupled way to create what may be complex underlying objects. And there’s no reason why we can’t revisit our existing 2.1-based code to add the factory in there. It also provides a level of consistency between the two different APIs that can make our life simpler, reduce the amount of debugging that we need to do and promote code sharing. That, however, will have to wait for the next post – I am trying to do this in manageable pieces so I’m not writing a novel with each post. In the meantime, you can download the current project from my SkyDrive.

Tags:

Code Sample | StreamInsight