Reactive OpenDDS [Part II: Operators]

Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.

INTRODUCTION

Reactive programming is a "programming paradigm oriented around data flows and the propagation of change." Relationships can be established between producers (observables), consumers (observers), and transformers of data (operators), with updates occurring asynchronously, rather than by request. This aids in application scalability and better utilization of hardware resources, as threads are not blocked waiting on potentially long-running operations. In Part I of this article, a method was shown to convert samples as obtained by OpenDDS into observable sequences, as well as a method to simulate these sequences for test purposes. This article describes the operators that can be applied to observable sequences to filter, transform and otherwise manipulate them. We shall use the Reactive Extensions for .NET (Rx.NET), for our examples.

settings blue

Sidebar

Code in this article is provided in the associated code archive.

Projects use MPC v4.0.54 for project generation, and were built with Visual Studio 2013, and use version 4.5 of the .NET Framework.

The version of Rx.NET (Rx-Main and Rx-Testing packages via NuGet) used in this article is 2.2.5.

OPERATORS

The list of operators that operate on observables is considerable, and although a core set is available in all reactive frameworks, individual frameworks may also add operators of their own. For example, RxJava provides a parallel() operator which doesn't have a clear analogue in Rx.NET or RxCpp.

Also, the naming of a given operator can differ from framework to framework. In Part I, we had seen Select() and map()— both apply a function to elements of an observable sequence, and, in our case, can be used to convert the sequence type into a different one. We have also seen the Take() operator to limit the sequence to a given number of elements. We can use the Visual Studio Unit Testing Framework as an easy way to demonstrate the behavior of a number of other operators.

In the testing framework, individual tests are implemented as public class methods that return void, have no parameters, and are marked with the [TestMethod] attribute. The public class that they are contained within is marked with the [TestClass] attribute. Methods marked with additional attributes are used to initialize tests and clean up after them. For example, a method marked with the [TestInitialize] attribute is executed before each test method. By compiling a project containing these attributes in Visual Studio, Visual Studio will identify these methods as tests and allow them to be run from the Test Explorer pane in Visual Studio, or by MSTest.exe from the command line.

scheduler is a mechanism, part of the reactive framework, that controls when subscriptions start, notifications are published, and provides a notion of time. Although the default is a real time scheduler, unit tests for observables can use a virtual time-based scheduler, TestScheduler, as introduced in Part I of this article. The method CreateColdObservable() creates a cold observable (an observable that publishes only once an observer has subscribed, in contrast to hot observables which publish regardless) by specifying the individual notifications produced by the observable: OnNext() to produce a value, and OnCompleted() to indicate that the observable will no longer publish values. The first parameter to both notification methods is the time, in ticks (one ten-millionth of a second) to indicate when the notification is produced. TestScheduler uses these virtual times, rather than wall-clock time, to sequence the output of observables so tests run at expected unit test speeds.

For the examples below, we shall define three cold observables, xsys and zs, that are recreated before each test, as follows. These particular observables demonstrate sequences with notifications at irregular intervals as well as duplicated values.

  1. // ReactiveCS\ReactiveCS.cs
  2. [TestClass]
  3. public class Tests : ReactiveTest
  4. {
  5. TestScheduler scheduler;
  6. ITestableObservable<int> xs, ys, zs;
  7.  
  8. [TestInitialize]
  9. public void TestInit() {
  10. scheduler = new TestScheduler();
  11. xs = scheduler.CreateColdObservable(
  12. OnNext(10, 1),
  13. OnNext(20, 2),
  14. OnNext(40, 3),
  15. OnNext(41, 3),
  16. OnNext(60, 4),
  17. OnCompleted<int>(70)
  18. );
  19. ys = scheduler.CreateColdObservable(
  20. OnNext(5, 10),
  21. OnNext(15, 20),
  22. OnNext(45, 30),
  23. OnCompleted<int>(50)
  24. );
  25. zs = scheduler.CreateColdObservable(
  26. OnNext(10, 1),
  27. OnNext(20, 2),
  28. OnNext(30, 3),
  29. OnNext(40, 3),
  30. OnNext(50, 1),
  31. OnNext(60, 2),
  32. OnNext(70, 3),
  33. OnCompleted<int>(80)
  34. );
  35. }

For each of the operators described in this article, a demonstration of their output when applied to xsys and/or zs, as appropriate, will be presented, as well as a suggestion as how the operator would be useful to apply to an OpenDDS observable.

AMB()

The Amb() operator, given multiple observables, chooses the observable that is first to produce any items. From then on, Amb() selects items from that observable. One use of this operator is to select the quickest responding from a set of redundant observables. While the original inspiration for Amb(), John McCarthy's Ambiguous operator, would arbitrarily choose one of the provided values (or even roll back computation to select an alternative value if the first led to an error), the Amb() operator always selects the first-responding observable.

OpenDDS: Consider multiple stock ticker feeds, or rendundant sensors. The Amb() operator can be used to obtain the quickest-responding feed for increased application response. While similar to the OWNERSHIP DDS quality of service policy, Amb(), once an observable is selected, will use that observable from then on. In contrast, in DDS, the OWNERSHIP_STRENGTH can change dynamically, potentially leading to samples from a different data writer than initially used to be selected.

Unit tests for observables based on the TestScheduler can look like the following. First, create an observer results which is of the type produced by the sequence (int, in our case). Next, apply the operator, and subscribe results to the obervable that was created by the application of the operator. Start() the scheduler, and when it completes, results.Messages contains the sequence that was produced by the observable that was subscribed to. By using AssertEqual(), the generated sequence can be compared against an expected sequence to determine the pass/fail criteria of the test. In the case above, xs.Amb(ys) selects the sequence produced by ys because the first sample of ys is at time 5, which is earlier than the first sample of xs at time 10.

  1. // ReactiveCS\ReactiveCS.cs
  2. [TestMethod]
  3. public void Amb()
  4. {
  5. var results = scheduler.CreateObserver<int>();
  6.  
  7. xs.Amb(ys)
  8. .Subscribe(results);
  9.  
  10. scheduler.Start();
  11.  
  12. results.Messages.AssertEqual(
  13. OnNext(5, 10),
  14. OnNext(15, 20),
  15. OnNext(45, 30),
  16. OnCompleted<int>(50));
  17. }

As a side note, AssertEqual() compares elements in the sequence using the default comparator of the underlying element — in this case, the equality operator of int. If the sequence produces a structure, it may be necessary to implement a custom equality comparison. In particular, if the sequence element contains floating point values, AssertEqual() will fail unless a custom equality comparison is implemented. One such comparison is described here which handles special cases for infinity and near-zero values, but otherwise compares against a supplied tolerance.

  1. public bool NearlyEqual(double a, double b, double epsilon)
  2. {
  3. double absA = Math.Abs(a);
  4. double absB = Math.Abs(b);
  5. double diff = Math.Abs(a - b);
  6.  
  7. if (a == b)
  8. {
  9. // shortcut, handles infinities
  10. return true;
  11. }
  12. else if (a == 0 || b == 0 || diff < Double.MinValue)
  13. {
  14. // a or b is zero or both are extremely close to it
  15. // relative error is less meaningful here
  16. return diff < (epsilon * Double.MinValue);
  17. }
  18. else
  19. {
  20. // use relative error
  21. return diff / (absA + absB) < epsilon;
  22. }
  23. }

MERGE()

The Merge() operator merges the sequences from multiple observers into a single stream, ordered by time. That is, xs.Merge(ys) produces OnNext(5, 10)OnNext(10, 1)OnNext(15, 20)OnNext(20, 2)OnNext(40, 3)OnNext(41, 3)OnNext(45, 30)OnNext(60, 4), and OnCompleted(70). The sequence terminates when the last of the merged sequences terminates. Here, xs completes at 70 and ys at 50, so the sequence produced by Merge() completes at 70.

OpenDDS: Suppose sensors that publish on different topics have similar data — say topics "Temperature1" through "Temperature20" — which are to be processed at one time, as they are all of the same data type. The Merge() operator can combine the data streams into a single stream for analysis.

WHERE()

The Where() operator, also called filter() in some of the reactive frameworks for other languages, takes a function that accepts an element of the sequence type as a parameter, and returns true if it should be present in the resulting sequence, or false if it should not be. For instance, xs.Where(x => x > 2) yields OnNext(40, 3)OnNext(41, 3)OnNext(60, 4), and OnCompleted(70), as these are the only values produced by xs greater than two. The sequence completes at the same moment that the filtered sequence completes.

OpenDDS: OpenDDS provides the PARTITION and content filtering methods to filter which data samples are received, and Where() provides similar behavior. If the same OpenDDS data stream, though, is to be filtered in multiple ways, it may be more efficient to receive a single stream from OpenDDS, subscribe to the OpenDDS observer multiple times and filter each subscription differently, rather than having multiple OpenDDS subscribers with differing PARTITION and content filtering expressions. Then again, OpenDDS quality of service and content filtering can be applied at the publisher, so while subscriber development complexity may be reduced, network performance will still be impacted.

DISTINCT()

The Distinct() operator ensures that no duplicated values are produced by the sequence. zs.Distinct() yields OnNext(10, 1)OnNext(20, 2)OnNext(30, 3), and OnCompleted(80), as the values of 1, 2 and 3 are emitted only the first time they are seen. The emitted sequence terminates when the original sequence does, at time 80.

OpenDDS: If an OpenDDS sample stream should be, say, monotonically increasing but is subject to jitter, the Distinct() operator can ensure that only unique values are processed.

DISTINCTUNTILCHANGED()

The DistinctUntilChanged() operator differs from Distinct() in that it only drops duplicated values if they appear next to each other in the original sequence. That is, zs.DistinctUntilChanged() yields OnNext(10, 1)OnNext(20, 2)OnNext(30, 3),OnNext(50, 1)OnNext(60, 2)OnNext(70, 3), and OnCompleted(80), and only the value 3 produced at time 40 is dropped because the previous sample at time 30 was also 3.

OpenDDS: Since DistinctUntilChanged() will only remove duplicates if they arrive consecutively, it allows data samples to be processed only if they have changed. For example, a sequence of stock values may only need attention if the price has moved, but can otherwise be ignored if the value remains stable.

CONCAT()

The Concat() operator concatenates sequences together — the second sequence begins when the first completes. xs.Concat(ys) yields OnNext(10, 1)OnNext(20, 2)OnNext(40, 3),OnNext(41, 3)OnNext(60, 4)OnNext(75, 10)OnNext(85, 20)OnNext(115, 30), and OnCompleted(120), where the elements from ys immediately follow those of xs. The sequence produced by xs ends at time 70, and the first element of ys is produced at time 5, so the result of the concatenation has the first element of ys emitted at time 70+5 = 75.

OpenDDS: Suppose two OpenDDS data streams exist that represent work items to service, where one stream is high priority and the other low priority. Suppose also that work items must be serviced at the relative time intervals when they arrive — say if a robot on a factory floor needs time to move its manipulator arm to a starting position, the assembly of the part that the work item represents cannot begin until the arm is ready. By concatenating the low priority work observable on to the end of the high priority work observable, it will be ensured that all high priority work is completed first, but all work, regardless of when it arrives, is still executed with the appropriate time intervals between them.

ZIP()

The Zip() operator combines two sequences into one, using a supplied function, and the number of elements produced by the combination is equal to the shorter of the sequences being combined — elements are taken pairwise, so both original sequences must have an element available to combine into one that can be emitted in the resulting sequence. The observer created by xs.Zip(ys, (x, y) => x + y) yields OnNext(10, 11)OnNext(20, 22)OnNext(45, 33), and OnCompleted(60), pairing the 1, 2 and 3 of xs with the 10, 20, and 30 of ys. The times of each element emitted is the time at which an element from each of the zipped sequences was able to be used. That is, while an element of ys is available at time 5, it isn't until time 10 that an element of xs is available to pair with it, so the time of the result of the zip of the two is time 10. The completion time of the emitted sequence is documented to be the end of the shorter sequence, but is not the case in the version of Rx.NET used in this article. In this example, it is the time of the sample that does not have a match in the paired sequence.

OpenDDS: Consider a calculation that can only be performed when a data value arrives from each of three different OpenDDS topics. By using the Zip() operator, the resulting observable wouldn't contain an item to process unless values from all three topics had already arrived. The Zip() operator is related to OpenDDS's implementation of the MultiTopic content subscription feature, as it can be used to unify samples produced by disparate observables.

SAMPLE()

The Sample() operator, given a sampling interval, returns the most recent data sample received within that interval. In order for the sampling interval to apply to the virtual time scheduler, unlike the time-independent operators above, the scheduler must be specified as an argument to Sample(). If the scheduler is allowed to be its default value, the sampling interval would be interpreted as real time, producing incorrect test results.

Sampling xs every 25 ticks can be done by:

  1. // ReactiveCS\ReactiveCS.cs
  2. var results = scheduler.CreateObserver<int>();
  3. xs.Sample(TimeSpan.FromTicks(25), scheduler)
  4. .Subscribe(results);

The sequence produced is OnNext(25, 2)OnNext(50, 3)OnNext(75, 4)OnCompleted(75). That is, one notification generated at the end of each sample interval, containing the most recent value of the observable. For example, at the 25 tick mark, the most recent sequence value from xs has the value 2, produced at time 20. The sequence completes at the end of the last sample interval, not at the point at which the sampled sequence completes.

OpenDDS: The Sample() operator can be used to reduce the data rate of an OpenDDS data stream. For instance, a data sample stream containing time updates may be arriving much more quickly than a clock that needs updates only once per second. The DDS TIME_BASED_FILTER quality of service policy behaves in a similar way as does the Sample() operator, although the sample yielded by TIME_BASED_FILTER will be the first in the sampling interval window, while the sample yielded by the Sample() operator will be the last sample in the window.

THROTTLE()

From its name, one may think that the Throttle() operator reduces the sequence rate below a threshold. Instead, it allows elements to be produced only if a throttling interval has passed without any elements being generated. An example used in this presentation uses Throttle() to limit requests made to a web service that returns words that complete the text that the user is typing. Rather than querying the web service on each character typed, the web service is queried only when the user has stopped typing for a period of time.

As it is also based on a time interval, the scheduler must be supplied. The observable created by xs.Throttle(TimeSpan.FromTicks(15), scheduler) produces the sequence OnNext(35, 2)OnNext(56, 3)OnNext(70, 4), and OnCompleted(70). The sample times are explained as follows. The first sample of xs is at time 10, but as that is within the 15 tick interval (starting from 0), it is skipped. The next value of 2 is produced at time 20, but as the value following it, 3, isn't produced until time 40, the 20 tick gap between 2 and 3 is greater than the throttle interval, so the value of 2 produced at time 20 is emitted from the throttled sequence at time 35 (20 plus the throttle interval). Similarly, the first 3 from xs is dropped, but the second 3 is emitted, as the interval between the second 3 and 4, times of 41 and 60 respectively, is greater than the 15 tick throttle interval. There are no samples following 4, so it can be safely emitted, and the emitted sequence terminates at the same moment that the throttled sequence does.

OpenDDS: Suppose an OpenDDS data stream normally produces values continually, but occasionally stops and restarts, perhaps due to a mechanical fault. The Throttle() operator can be used to signal the application that a restart has occurred and that the device producing the data stream requires maintenance.

GROUPBY()

Unlike the previous operators which produce an observable sequence of elements, the GroupBy() operator yields an observable sequence of observable sequences. The orignal sequence that GroupBy() is applied to is divided into separate observable sequences based on a supplied function. As these new observables are created, they are produced as notifications in the sequence returned by GroupBy(), and, as they appear, they can be treated as any other observable — operators may be applied to them, or they may be subscribed to.

Testing the result of GroupBy is a bit convoluted, but is described here. As new grouped observables are produced, they are added to a list for later examination. As an example, consider the division of the elements of an observable into two groups, one group containing values less than or equal to 2, and the other group containing values 2 or greater. We can set up the test as follows:

  1. // ReactiveCS\ReactiveCS.cs
  2. [TestMethod]
  3. public void GroupBy()
  4. {
  5. // as each group appears, add it to the groups list
  6. var groups =
  7. new List<Tuple<long, bool, ITestableObserver<int>>>();
  8.  
  9. xs.GroupBy(x => x > 2)
  10. .Subscribe(g =>
  11. {
  12. var observer = scheduler.CreateObserver<int>();
  13. g.Subscribe(observer);
  14. groups.Add(Tuple.Create(
  15. scheduler.Clock, g.Key, observer));
  16. });
  17.  
  18. scheduler.Start();

The variable groups contains a list of tuples, where a tuple stores three pieces of information: the time at which the group was created, the group key (here, just true or false — the return value of the grouping function — to identify the two groups), and an observer of the group represented by the tuple. That is, each time a new group is created by the grouping operation, a new observer is built, it subscribes to the new group, and an item is added to the groups list. We then start the scheduler as before.

Next, we create a helper function that validates the contents of a tuple in the groups list — it compares the values in the tuple to ones supplied as arguments to it.

  1. var assertGroup = new Action<int, long, bool,
  2. Recorded<Notification<int>>[]>(
  3. (index, clock, key, messages) =>
  4. {
  5. var g = groups[index];
  6. Assert.AreEqual(clock, g.Item1);
  7. Assert.AreEqual(key, g.Item2);
  8. g.Item3.Messages.AssertEqual(messages);
  9. });

To test that GroupBy() operated as expected, we first confirm that two groups were created. Only two groups should exist as the grouping function can only return true or false.

  1. Assert.AreEqual(2, groups.Count);

Next, we check the first group (group 0). The first element in xs has the value 1 produced at time 10. It is not greater than 2, so the value 1 is added to the "false" group. As the "false" group doesn't yet exist, it is created at time 10. The only other element in the sequence that fails the "greater than 2" test is 2 itself at time 20, so the "false" group should only contain two elements, and terminate when the grouped sequence terminates.

  1. // at time 10, the "false" group appears
  2. assertGroup(0, 10, false, new[] {
  3. OnNext(10, 1),
  4. OnNext(20, 2),
  5. OnCompleted<int>(70)
  6. });

We then check the second group (group 1). The "true" group is created when the first element that is greater than 2 is seen in the sequence (time 40), contains all elements greater than 2 from the original sequence, and also completes at the same time that the original sequence does.

  1. // at time 40 the "true" group appears
  2. assertGroup(1, 40, true, new[] {
  3. OnNext(40, 3),
  4. OnNext(41, 3),
  5. OnNext(60, 4),
  6. OnCompleted<int>(70)
  7. });
  8. }

OpenDDS: Grouping can be used to not only arrange a sequence of OpenDDS data samples into groups by the data values themselves, but can also be used to group based on other OpenDDS properties, such as instance or sample state, transforming a topic-based observable sequence into an instance-based one.

WINDOW()

The Window() operator breaks a sequence into time slices, creating a new observable (as GroupBy() did) for each time slice (window). Splitting xs into 25 tick windows is done by xs.Window(TimeSpan.FromTicks(25), scheduler), and three windows are created. The first window, starting at time 0, contains OnNext(10, 1)OnNext(20, 2), and OnCompleted(25). That is, only two samples from xs are produced within the first 25 ticks, and the window closes at tick 25. The second window ranges from ticks 25 to 50, and contains OnNext(40, 3)OnNext(41, 3), and OnCompleted(50), and the third window starts at tick 50 and ends when the original sequence ends at tick 70. It contains OnNext(60, 4), and OnCompleted(70).

OpenDDS: As with Sample()Window() can be used as another form of rate limiting. Suppose a process can only operate at the rate of 10 samples a second — dividing the incoming data into one-second windows and processing only the first, at most, 10 samples that arrive in each window (by using the Take() operator on each window), will ensure that the process is never overloaded if the OpenDDS data sample rate increases.

BUFFER()

The Buffer() operator is similar to the Window() operator in that it divides the buffered sequence into time slices, but the slices themselves are different. Slices can be created either by a time interval (as with Window()) or by a count of elements, and a buffer is a single instant in time. The creation time, completion time, and element times of the buffer are the same.

A 25 tick-sized buffer can be created with xs.Buffer(TimeSpan.FromTicks(25), scheduler), and, as with Window(), three buffers are created. The first, at time 25 (the end of the interval, unlike Window() which created the window at the start of the interval), contains OnNext(25, 1)OnNext(25, 2), and OnCompleted(25). It contains the same values as does the first window, but both values, and the completion, are at the buffer creation time — tick 25. The second buffer, at time 50, contains OnNext(50, 3)OnNext(50, 3), and OnCompleted(50). Again, the same values as in the second window, but all at the start/end time of the buffer. Lastly, in the same pattern, the third buffer contains OnNext(70, 4), and OnCompleted(70).

A 3 count-sized buffer can be created with xs.Buffer(3). Here, a buffer is built every time three elements have arrived, or the original sequence has completed. So, for xs, two buffers are created. The first one contains OnNext(40, 1)OnNext(40, 2)OnNext(40, 3), and OnCompleted(40), and is at time 40 because that was the time at which the third element arrived. The second buffer contains OnNext(70, 3)OnNext(70, 4), and OnCompleted(70), and is at time 70 because the sequence completed before a third element was received.

OpenDDS: A use of the Buffer() operator is for data smoothing. Consider a noisy analog-to-digital converter that is sending its raw readings over OpenDDS. The application could use the Buffer() operator to produce a buffer after, say, 7 samples have arrived. Out of the 7 samples in the buffer, the highest and lowest sample could be discarded, the remaining 5 averaged, and then the result used for calculation.

GROUPJOIN()

The GroupJoin() is an operator used for combining two observables. A window of time is provided for each observable, and, for elements from each observable that fall into the overlap of the windows, a joining function is applied, producing a new element that is a combination of the matching ones.

A demonstration of GroupJoin() is shown below, as provided in the collection of 101 Rx Samples here. The code is identical to the Rx sample, save a small modification to convert it into a unit test.

Previously, we created observables based on an adapted event, a subject, or a time-based generated sequence. Another way to create an observable is to convert a standard data structure, such as a list, into an observable, via a call to ToObservable().

  1. // ReactiveCS\ReactiveCS.cs
  2. [TestMethod]
  3. public void GroupJoin()
  4. {
  5. var leftList = new List<string[]>();
  6. leftList.Add(
  7. new string[] { "2013-01-01 02:00:00", "Batch1" });
  8. leftList.Add(
  9. new string[] { "2013-01-01 03:00:00", "Batch2" });
  10. leftList.Add(
  11. new string[] { "2013-01-01 04:00:00", "Batch3" });
  12.  
  13. var rightList = new List<string[]>();
  14. rightList.Add(
  15. new string[] { "2013-01-01 01:00:00", "Production=2" });
  16. rightList.Add(
  17. new string[] { "2013-01-01 02:00:00", "Production=0" });
  18. rightList.Add(
  19. new string[] { "2013-01-01 03:00:00", "Production=3" });
  20.  
  21. var l = leftList.ToObservable();
  22. var r = rightList.ToObservable();

A number of predefined special observables are also available. In this example, the windows for each observable are defined by Observable.Never<>, a special observable that never produces any elements, but also never terminates. The joining function pairs an element from the left observable with an observable that generates elements that fall into the window of time of the right observable.

  1. var q = l.GroupJoin(r,
  2. // windows from each left event going on forever
  3. _ => Observable.Never<Unit>(),
  4. // windows from each right event going on forever
  5. _ => Observable.Never<Unit>(),
  6. // create tuple of left event with observable of right events
  7. (left, obsOfRight) => Tuple.Create(left, obsOfRight));

The right observable can then be subscribed to, and for each element pushed to it, a comparison is made against a given element from the left observable. If the element matches specified criteria (here, a time index), then a message is generated.

  1. var messages = new List<string>();
  2.  
  3. // e is a tuple with two items, left and obsOfRight
  4. using (q.Subscribe(e =>
  5. {
  6. var xs = e.Item2;
  7. xs.Where(
  8. // filter only when datetime matches
  9. x => x[0] == e.Item1[0])
  10. .Subscribe(v =>
  11. {
  12. messages.Add(string.Format(
  13. string.Format(
  14. "{0},{1} and {2},{3} occur at the same time",
  15. e.Item1[0],
  16. e.Item1[1],
  17. v[0],
  18. v[1]
  19. )));
  20. });
  21. }))
  22. {
  23. Assert.AreEqual(2, messages.Count);
  24. Assert.AreEqual(
  25. "2013-01-01 02:00:00,Batch1 and " +
  26. "2013-01-01 02:00:00,Production=0 " +
  27. "occur at the same time", messages[0]);
  28. Assert.AreEqual(
  29. "2013-01-01 03:00:00,Batch2 and " +
  30. "2013-01-01 03:00:00,Production=3 " +
  31. "occur at the same time", messages[1]);
  32. }
  33. }

OpenDDS: Similar to OpenDDS's implementation of MultiTopic, GroupJoin() can be used to combine multiple OpenDDS data streams based on common characteristics, such as the data sample reading time, sensor ID, or other criteria.

COUNT(), SUM(), MIN(), MAX(), AVERAGE() AND AGGREGATE()

A number of operators are availble that provide for collecting basic statistics from an observable. When an observable completes, a single value is pushed from each of these operators, yelding the statistical result across all values produced by the observable. As such, these operators are not suitable for observers that produce infinite sequences, as since the observable never terminates, no values will ever be emitted by these operators.

The Count()Sum()Min()Max(), and Average() operators do as their names suggest. The Aggregate() operator provides a means to accumulate values into a single result, and is supplied with an accumulator function and an optional seed value as parameters. We use ToObservable() as before to convert a list into an observable sequence in order to demonstrate these operators.

The test below also shows that a single observable can be subscribed to multiple times. This can be advantageous when used with OpenDDS as, rather than creating multiple subscribers for a given topic that filter the data in various ways, a single subscriber can be used and the resulting observable can be subscribed to as many times as needed to create modified observables for use by the application.

  1. // ReactiveCS\ReactiveCS.cs
  2. [TestMethod]
  3. public void CountSumMinMaxAverageAggregate()
  4. {
  5. var o = new List<int>() { 3, 10, 8 }.ToObservable();
  6. o.Count().Subscribe(e => Assert.AreEqual(3, e));
  7. o.Sum().Subscribe(e => Assert.AreEqual(21, e));
  8. o.Min().Subscribe(e => Assert.AreEqual(3, e));
  9. o.Max().Subscribe(e => Assert.AreEqual(10, e));
  10. o.Average().Subscribe(e => Assert.AreEqual(7, e));
  11. o.Aggregate(6, (acc, i) => acc + i*2)
  12. .Subscribe(e => Assert.AreEqual(48, e));
  13. }

OpenDDS: Provided that an OpenDDS observable terminates, these operators can be used to obtain statistical information from OpenDDS sample data.

SCAN(), SKIP() AND TIMESTAMP()

While Aggregate() applies an accumulator function over the elements of a sequence and generates a single value when the sequence completes, the Scan() operator does the same, but produces a value corresponding to each value of the observed sequence.

Scott Weinstein uses the Scan() operator to create a new operator, ToCommonAggregates(), to collect statistics on an observable that are pushed out as each element in the source observable arrives. As such, unlike Count() and the other standard operators, ToCommonAggregates() is suitable to be used with infinite sequences.

Weinstein's code is shown here. As we did above, Weinstein created a class which will be used as the element type of a new observable that will be created.

  1. // ReactiveCS\ReactiveCS.cs
  2. public class StatInfoItem<T>
  3. {
  4. public T Item { get; set; }
  5. public double Sum { get; set; }
  6. public int Count { get; set; }
  7. public double Mean { get; set; }
  8. public double M2 { get; set; }
  9. public double StdDev { get; set; }
  10. public double Min { get; set; }
  11. public double Max { get; set; }
  12.  
  13. public override string ToString()
  14. {
  15. return "[" + Item + ": Sum=" + Sum + ", Count=" + Count +
  16. ", Mean=" + Mean + ", StdDev=" + StdDev + ", Min=" + Min +
  17. ", Max=" + Max + "]";
  18. }
  19. }

The .NET Framework allows methods to appear to be added to an existing class, without actually modifying that class. As these methods extend an existing class, they are called extension methods. To create an extension method, first create a static class to hold them. In it, create public static methods that take this T as the first parameter, where T is the type that is being extended. As shown in this James Michael Hare blog post, class int can be extended to have a Half() method by code like:

  1. static class Extensions
  2. {
  3. public static int Half(this int source) {
  4. return source / 2;
  5. }
  6. }

which now allows the syntax 4.Half() to be legal, and return the value 2. Weinstein creates an extension method that extends IObservable<T>, the interface of observables in Rx.NET. Two functions must be provided as parameters to ToCommonAggregates(). The first function identifies what component of the observable should be used for statistical analysis (such as a sensor data reading). The second function provides a name to associate with the accumulated statistics (such as a sensor ID).

  1. // ReactiveCS\ReactiveCS.cs
  2. static class Extensions
  3. {
  4. public static IObservable<StatInfoItem<T>>
  5. ToCommonAggregates<T, TSrc>(
  6. this IObservable<TSrc> source,
  7. Func<TSrc, double> dataSelector,
  8. Func<TSrc, T> itemSelector)
  9. {

ToCommonAggregates() uses the Scan() operator to accumulate its values. The Scan() operator is provided a starting value and a function to accumulate values, and each time a value is added to the accumulator, it is also pushed as a sample to subscribers. Here, the accumulator calculates various statistical values, and each time a new element appears in the sequence to which ToCommonAggregates() is applied, a statistical sample is generated. The Skip() operator is also used to drop the initial sample, as it is a seed value that is not fully initialized.

  1. return source.Scan(new StatInfoItem<T>(), (cur, next) =>
  2. {
  3. double data = dataSelector(next);
  4. T itemp = itemSelector(next);
  5. var n = cur.Count + 1;
  6. var delta = data - cur.Mean;
  7. var meanp = cur.Mean + delta / n;
  8. var m2 = cur.M2 + delta * (data - meanp);
  9. var stdDevp = Math.Sqrt(m2 / n);
  10. return new StatInfoItem<T>()
  11. {
  12. Item = itemp,
  13. Sum = data + cur.Sum,
  14. Count = n,
  15. Mean = meanp,
  16. M2 = m2,
  17. StdDev = stdDevp,
  18. Min = Math.Min(data, cur.Min),
  19. Max = Math.Max(data, cur.Max),
  20. };
  21. })
  22. // need a seed, but don't want to include seed value
  23. // in the output
  24. .Skip(1);
  25. }
  26. }

We can test it as follows. We create an observable by transforming ys. The observable ys is a sequence of integers. We apply the Timestamp() operator which associates a timestamp with each element in the sequence, resulting in a type of Timestamped. The Timestamped type is a structure with two fields, Timestamp, giving the time of the sample, and Value, which stores the value that is timestamped — in our case, the original int. We now transform that using the Select() operator to generate a SensorEventArgs type from the timestamp and value. So, at this point, we've now created another simulated OpenDDS sample. We then apply the ToCommonAggregates() operator, identifying the OpenDDS data reading as the data value on which to perform statistical analysis, and the name of the sensor to label the data aggregate.

  1. // ReactiveCS\ReactiveCS.cs
  2. [TestMethod]
  3. public void ToCommonAggregates()
  4. {
  5. var results =
  6. scheduler.CreateObserver<StatInfoItem<string>>();
  7. var obs =
  8. // start with IObservable<int>
  9. ys
  10. // change to IObservable<Timestamped<int>>
  11. .Timestamp(scheduler)
  12. // change to IObservable<SensorEventArgs>
  13. .Select(i => new SensorEventArgs("Temp7",
  14. i.Timestamp.DateTime, i.Value))
  15. // change to IObservable<StatInfoItem<string>>
  16. .ToCommonAggregates(i => i.Reading, i => i.SensorID);

The test looks as before — subscribe, run the scheduler, and examine the results. The observable contains three data samples, causing three statistical aggregates to be generated. The first aggregate is dropped due to the call to Skip(), leaving two. The time of each aggregate corresponds to the original time of each corresponding element in ys, and the aggregate observable completes when ys does.

  1. obs.Subscribe(results);
  2.  
  3. scheduler.Start();
  4.  
  5. results.Messages.AssertEqual(
  6. OnNext(15, new StatInfoItem<string>()
  7. {
  8. Item = "Temp7",
  9. Sum = 30.0,
  10. Count = 2,
  11. Mean = 15.0,
  12. M2 = 50.0,
  13. StdDev = 5.0,
  14. Min = 0.0,
  15. Max = 20.0,
  16. }),
  17. OnNext(45, new StatInfoItem<string>()
  18. {
  19. Item = "Temp7",
  20. Sum = 60.0,
  21. Count = 3,
  22. Mean = 20.0,
  23. M2 = 200.0,
  24. StdDev = 8.16496580927726,
  25. Min = 0.0,
  26. Max = 30.0,
  27. }),
  28. OnCompleted<StatInfoItem<string>>(50)
  29. );
  30. }

OpenDDS: As OpenDDS sample streams are potentially infinite (that is, a sensor generates values essentially forever), the use of Scan() for element-wise statistics gathering is likely more useful than operators such as Count()Average() and such that only produce a value when the stream has terminated.

AND MORE...

A number of other operators exist, and sites such as Introduction to Rx101 Rx Samples and the list of RxJava operators will provide greater detail, but the above gives a flavor of what operators are available in reactive frameworks.

CONCLUSION

Part I of this article showed how to convert an OpenDDS stream of data samples into an observable, but this article showed that the real power of reactive programming is to use operator composition in order to manipulate observable sequences. Reactive programming can lead to more responsive applications, and better use of hardware resources. With these articles, we've seen how to integrate OpenDDS into a reactive application, and operatators that are particularly useful for manipulating OpenDDS sample data.

REFERENCES