Reactive OpenDDS [Part I]

Middleware News Brief (MNB) features news and technical information about Open Source middleware technologies.

WHAT IS REACTIVE PROGRAMMING?

Reactive programming is a "programming paradigm oriented around data flows and the propagation of change." Relationships can be established between producers, consumers, and transformers of data, with updates occurring asynchronously, rather than by request. This aids in application scalability and better utilization of hardware resources, as threads are not blocked waiting on potentially long-running operations. The result of a database query or the return value from a web service can be delivered to the application when the result is ready, rather than the application waiting for the result to arrive. That is, it is a "push" rather than "pull" model — one reacts to data as it arrives, as opposed to pulling it element by element from a data source, where efficiency is lost as the application waits for the next element to be available.

Previous paradigms, such as asynchronous I/O or event-driven programming, also provide for non-blocking servicing of data or events, but reactive programming unifies these models such that operations as different as reading a file, obtaining mouse movements, or receiving the response of a web request can be handled using the same small set of programming concepts.

Microsoft's Reactive Extensions for .NET (Rx.NET) provides a framework for reactive programming. Microsoft has released similar frameworks for other languages, including JavaScript (RxJS), C++ (RxCpp), Ruby (Rx.rb) and Python (RxPy), and other organizations have done the same for Java (RxJava), Scala (RxScala), Clojure (RxClojure) and more.

Reactive programming centers around two concepts: an observable and an observer. An observable is a data stream, such as a sequence of stock quotes, mouse positions, or file blocks. An observer watches an observable by subscribing to it, and is updated as the state of the observable changes. Three operations can be invoked on an observer, reflecting the state of the observable: OnNext(<data value>) when the watched observable produces a new value, OnCompleted()when the observable has finished producing data, and OnError(<exception>)if the observable has reached an error state.

Observables can be created in several ways, including:

  • Through the use of a subject, an entity that acts both as an observer and observable. Calling methods such as OnNext() on a subject invokes the corresponding method on its observers.
  • Combining existing observables by using operators to combine, filter, transform and otherwise manipulate element streams.
  • By using observable generators, which cause a sequence of values to be produced.
  • By using adapters provided by the framework (such as are available in Rx.NET) that convert from existing paradigms such as asynchronous file I/O, or convert existing data structures to observable ones.
settings blue

SIDEBAR

Code in this article is provided in the associated code archive.

C++ and C# projects use MPC v4.0.54 for project generation, were built with Visual Studio 2013, and use version 4.5 of the .NET Framework.

Java projects use sbt 0.13.1 for compilation and were tested against both Java 7 and 8. OpenDDS 3.5.1 was used for all examples.

The version of Rx.NET (Rx-Main and Rx-Testing packages via NuGet) used in this article is 2.2.5, RxCpp from GitHub export 7d0c86734bb41535f5e6c14684ebd2ed31d52504, and 1.0.0-rc.7 for RxJava.

REACTIVE OPENDDS

We can use the techniques above to create observables from OpenDDS data sample streams. Consider data generated by a fictional sensor as represented by the following IDL:

  1. // SensorIDL\Sensor.idl
  2. module Sensor {
  3.  
  4. #pragma DCPS_DATA_TYPE "Sensor::SensorData"
  5. #pragma DCPS_DATA_KEY "Sensor::SensorData sensorID"
  6.  
  7. struct SensorData {
  8. string sensorID;
  9. unsigned long long date;
  10. double reading;
  11. };
  12. };

A sensor has a name, a time at which a data reading is collected, and the reading itself. Sensor data readings are simulated by a C++-based publisher that generates a total of 20 readings. A value is published every two seconds with the value of the reading ranging from 0 to 9, repeating. These samples are published on the Temperature DDS topic. The time of the sample is the current time represented as seconds since midnight on January 1, 1970 UTC. This publisher will be used for all of the OpenDDS examples in this article.

  1. // SensorPublisher\SensorPublisher.cpp
  2. Sensor::SensorDataDataWriter_var sdw =
  3. Sensor::SensorDataDataWriter::_narrow(dw);
  4. ...
  5. Sensor::SensorData sample;
  6. sample.sensorID = "Temp7";
  7. time_t sampleDate;
  8. for (int i = 0; i < 20; i++) {
  9. // seconds since midnight 1/1/1970 UTC
  10. time(&sampleDate);
  11. sample.date = sampleDate;
  12. sample.reading = i%10;
  13.  
  14. if (sdw->write(sample, DDS::HANDLE_NIL) != DDS::RETCODE_OK)
  15. throw DDSException("sample write() failed");
  16.  
  17. // sleep for 2 seconds
  18. std::this_thread::sleep_for(std::chrono::milliseconds(2000));
  19. }

RX.NET - EVENT ADAPTER

To create a Rx.NET observable from an OpenDDS data stream, it is convenient to first make the OpenDDS data samples accessible to .NET before creating the observable. We do this in a manner similar to what has been shown in previous Middleware News Briefs by publishing a .NET event whenever a data sample arrives.

As OpenDDS is written in unmanaged C++, we must create a .NET adapter, class SensorEventPublisher, that can be used from a .NET language such as C#. It is declared as a managed .NET class as follows, with its implementation in the unmanaged, standard C++ class SensorEventPublisherImpl.

  1. // SensorSubscriberLibEvent\SensorSubscriberLibEvent.cpp
  2. public ref class SensorEventPublisher {
  3. SensorEventPublisherImpl *_impl;
  4. public:
  5. event System::EventHandler<SensorEventLib::SensorEventArgs^>
  6. ^SensorEvent;
  7.  
  8. SensorEventPublisher(array<System::String^>^ args);
  9. ...
  10. void Publish(System::String^ sensorID,
  11. System::DateTime date, double reading);
  12. };

SensorEvent is declared as a .NET event which publishes events of type SensorEventArgs via the Publish() method defined as:

  1. // SensorSubscriberLibEvent\SensorSubscriberLibEvent.cpp
  2. void SensorEventPublisher::Publish(System::String^ sensorID,
  3. System::DateTime date, double reading) {
  4. SensorEvent(this,
  5. gcnew SensorEventLib::SensorEventArgs(sensorID, date,
  6. reading));
  7. }

The sensor event argument structure is defined in its own library, so it can be accessed both by SensorSubscriberLib and any .NET projects that wish to reference it. It inherits from EventArgs, the .NET base class for classes which contain event data.

  1. // SensorEventLib\SensorEventLib.cs
  2. public class SensorEventArgs : EventArgs {
  3. public readonly string SensorID;
  4. public readonly DateTime Date;
  5. public readonly double Reading;
  6.  
  7. public SensorEventArgs(string sensorID, DateTime date,
  8. double reading) {
  9. SensorID = sensorID;
  10. Date = date;
  11. Reading = reading;
  12. }
  13.  
  14. public override string ToString()
  15. {
  16. // Date is in UTC, so convert to local time for display
  17. return "[" + SensorID + ", " + Date.ToLocalTime() + ", " +
  18. Reading + "]";
  19. }
  20. }

For convenience, a ToString() method is provided for pretty printing, converting the time of the sample from UTC to the local time.

In the OpenDDS subscriber, a listener is associated with a data reader for the "Temperature" topic, and is instantiated with a reference to a SensorEventPublisher.

  1. // SensorSubscriberLibEvent\SensorSubscriberLibEvent.cpp
  2. DDS::DataReaderListener_var listener(
  3. new DataReaderListenerImpl(sensorEventPublisher));
  4. DDS::DataReader_var dr =
  5. sub->create_datareader(topic, dr_qos, listener,
  6. OpenDDS::DCPS::DEFAULT_STATUS_MASK);

Data samples received within the on_data_available() method of the listener are converted to .NET types and passed as parameters to Publish().

  1. // SensorSubscriberLibEvent\SensorSubscriberLibEvent.cpp
  2. DataReaderListenerImpl(
  3. SensorEventPublisher ^sensorEventPublisher) :
  4. _sensorEventPublisher(sensorEventPublisher) {}
  5. ...
  6. void DataReaderListenerImpl::on_data_available(
  7. DDS::DataReader_ptr reader) {
  8. try {
  9. DDS::SampleInfo info;
  10.  
  11. Sensor::SensorDataDataReader_var dr =
  12. Sensor::SensorDataDataReader::_narrow(reader);
  13.  
  14. if (dr) {
  15. Sensor::SensorData sample;
  16. DDS::ReturnCode_t error =
  17. dr->take_next_sample(sample, info);
  18. if ((error == DDS::RETCODE_OK) && info.valid_data) {
  19. _sensorEventPublisher->Publish(
  20. gcnew System::String(sample.sensorID.in()),
  21. System::DateTime(1970, 1, 1, 0, 0, 0,
  22. System::DateTimeKind::Utc) +
  23. System::TimeSpan::FromSeconds(sample.date),
  24. sample.reading
  25. );
  26. }
  27. }
  28. }
  29. catch (System::Exception ^e) {
  30. System::Console::WriteLine(e);
  31. }
  32. }

The time of the data sample is converted into a TimeSpan, and that is added to the epoch date to recover the absolute time of the data sample.

Now that the data sample is published as .NET event data, one of the built-in adapters of Rx.NET can be used to convert the event to an observable. One such pattern, Observable.FromEventPattern<>, converts the elements produced by an event to an observable, where, in our case, the SensorEvent created by SensorEventPublisher is used as the event source.

We use the Select() projection operator to transform the observable sequence by applying a function to each element that is produced by an observable. Here, we use it to transform the element from what is returned by FromEventPattern<> into the event argument structure itself.

  1. // ReactiveDDSCSEvent\ReactiveDDSCSEvent.cs
  2. var sevp = new SensorEventPublisher(args);
  3.  
  4. var obs = Observable.FromEventPattern<SensorEventArgs>(
  5. eh => sevp.SensorEvent += eh,
  6. eh => sevp.SensorEvent -= eh)
  7. .Select(e => e.EventArgs);

The variable obs, of type IObservable, can now be subscribed to. One variant of the Subscribe() method provides for only an OnNext() notification handler to be specified — here, we only wish to see the samples printed on the screen.

  1. using (obs.Subscribe(Console.WriteLine)) {
  2. sevp.Run();
  3. Console.WriteLine("Press ENTER to stop");
  4. Console.ReadLine();
  5. sevp.Stop();
  6. }

If the OnCompleted() or OnError() notifications also need to be handled, handler functions can be specified as the second and third parameters to Subscribe(), respectively. Subscribe() returns an object representing the subscription, and this object implements the IDisposable interface. Disposing of the subscription unsubscribes the observer from the observable, which is done automatically by the using clause when the code block exits.

The test script Test\testReactiveDDSCSEvent.pl starts the DCPSInfoRepo, the publisher (SensorPublisher), the C#-based subscriber (ReactiveDDSCSEvent), and waits for them to terminate. The output of the test looks like the following:

  1. [Temp7, 10/28/2014 10:29:20 AM, 1]
  2. [Temp7, 10/28/2014 10:29:22 AM, 2]
  3. [Temp7, 10/28/2014 10:29:24 AM, 3]
  4. ...

RX.NET - SUBJECT

If one is interested only in the samples produced by an observable, the method above will work well. Unfortunately, a single Windows event doesn't have the means to represent completion or error states that the adapter can directly map. For this, we must publish through a subject. To take advantage of DDS features, we will use a subject whose elements are subjects that the data samples are pushed through.

We first rename the SensorEventPublisher class to SensorEventPublisherSubject for clarity, and change it to take a subject of subjects as an argument:

Subject<Subject<SensorEventArgs^>^>^

Two namespaces are used to improve code readability as well.

  1. // SensorSubscriberLibSubject\SensorSubscriberLibSubject.cpp
  2. using namespace System::Reactive::Subjects;
  3. using namespace SensorEventLib;
  4.  
  5. public ref class SensorEventPublisherSubject {
  6. SensorEventPublisherSubjectImpl *_impl;
  7. Subject<Subject<SensorEventArgs^>^>^ _obsGroup;
  8. public:
  9. SensorEventPublisherSubject(array<System::String^>^ args,
  10. Subject<Subject<SensorEventArgs^>^>^ obsGroup);
  11. ...
  12. };

The subject of subjects — an observable group — is passed to the data reader listener.

  1. // SensorSubscriberLibSubject\SensorSubscriberLibSubject.cpp
  2. DDS::DataReaderListener_var listener(
  3. new DataReaderListenerImpl(obsGroup));
  4. DDS::DataReader_var dr =
  5. sub->create_datareader(topic, dr_qos, listener,
  6. OpenDDS::DCPS::DEFAULT_STATUS_MASK);

The DataReaderListenerImpl stores both the observable group, as well as creates a map (a .NET Dictionary<>) of observables, which will be described below. As they are .NET types that are stored in an unmanaged C++ class, they must be wrapped with gcroot<>.

  1. // SensorSubscriberLibSubject\SensorSubscriberLibSubject.cpp
  2. class DataReaderListenerImpl : public virtual
  3. OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
  4. gcroot<Subject<Subject<SensorEventArgs^>^>^> _obsGroup;
  5. gcroot<Dictionary<long, Subject<SensorEventArgs^>^>^> _obsMap;
  6. public:
  7. DataReaderListenerImpl(
  8. Subject<Subject<SensorEventArgs^>^>^ obsGroup) :
  9. _obsGroup(obsGroup),
  10. _obsMap(gcnew Dictionary<long,
  11. Subject<SensorEventArgs^>^>()) {}

The on_data_available() method begins in a similar manner as before, plus the observable group and map are set to local variables to "unwrap" them from their gcroot<> holders, which simplifies their use syntactically as it removes a layer of indirection.

  1. // SensorSubscriberLibSubject\SensorSubscriberLibSubject.cpp
  2. void DataReaderListenerImpl::on_data_available(
  3. DDS::DataReader_ptr reader) {
  4. try {
  5. DDS::SampleInfo info;
  6. Subject<Subject<SensorEventArgs^>^>^ obsGroup = _obsGroup;
  7. Dictionary<long, Subject<SensorEventArgs^>^>^
  8. obsMap = _obsMap;
  9. Sensor::SensorDataDataReader_var dr =
  10. Sensor::SensorDataDataReader::_narrow(reader);
  11.  
  12. if (dr) {
  13. Sensor::SensorData sample;
  14. DDS::ReturnCode_t error =
  15. dr->take_next_sample(sample, info);
  16. if (error == DDS::RETCODE_OK) {

In the IDL that defined the OpenDDS sample type SensorData, a key was established to divide samples into instances by the sensor that generated them:

  1. // SensorIDL\Sensor.idl
  2. #pragma DCPS_DATA_KEY "Sensor::SensorData sensorID"

Our data happens to publish only with a sample ID of Temp7, so only one instance is present in the sample data, but in the general case, instances will appear arbitrarily in the sample stream, and must be handled as they arise. As each new instance is detected, a new subject is created which will be used to push its samples. This subject is added to the observable group, so subscribers can become aware that a new instance has been created. It is also stored in an observable map, keyed by the instance handle, so it can be easily retrieved when additional samples on the instance are to be pushed.

  1. // if instance is not found in the map,
  2. // create a new observable for it and push it out
  3. Subject<SensorEventArgs^> ^obs = nullptr;
  4. if (!obsMap->ContainsKey(info.instance_handle)) {
  5. obs = gcnew Subject<SensorEventArgs^>();
  6. obsGroup->OnNext(obs);
  7. obsMap->Add(info.instance_handle, obs);
  8. }
  9. else
  10. obs = obsMap[info.instance_handle];

Once the observable corresponding to the instance has been obtained (either newly created, or retrieved from the map), it is updated with the state of the data. If the data is valid, then it is pushed via a call to OnNext(), but if the data isn't valid, and the instance has been disposed and will no longer publish again, then OnCompleted() will be invoked on the observable.

  1. // update the observable
  2. if (info.valid_data) {
  3. // data is valid - call OnNext with the data
  4. obs->OnNext(gcnew SensorEventLib::SensorEventArgs(
  5. gcnew System::String(sample.sensorID),
  6. System::DateTime(1970, 1, 1, 0, 0, 0,
  7. System::DateTimeKind::Utc) +
  8. System::TimeSpan::FromSeconds(sample.date),
  9. sample.reading));
  10. }
  11. else if (info.instance_state ==
  12. DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) {
  13. // no more data - call OnCompleted on the observable
  14. obs->OnCompleted();
  15. }

Lastly, if a sample could not be taken, we can consider the listener to have failed, and OnError() is called on all observables in the map, as all observables from this point forward are considered to have failed as the listener would no longer be able to update them.

  1. }
  2. else {
  3. // can't take the sample - assume we're dead and
  4. // set an error state on all observables
  5. System::Exception ^ex = gcnew
  6. System::InvalidOperationException(
  7. "take_next_sample() failed");
  8. for each(auto k in obsMap->Keys)
  9. obsMap[k]->OnError(ex);
  10. }
  11. }
  12. }
  13. catch (System::Exception ^e) {
  14. System::Console::WriteLine(e);
  15. }
  16. }

The C# subscriber is somewhat different than before, due to the grouping of the observables. A subject of subjects is created as an observable group, and passed into the instantiated SensorEventPublisherSubject. The group is subscribed to, and each subject in the group (corresponding to each DDS instance) is also subscribed to. Two handlers are supplied, rather than just one, as arguments to Subscribe(). The first argument to Subscribe() is the OnNext() handler, and, as before, the sample is displayed via the call to Console.WriteLine(). The second parameter is an OnCompleted() handler, which, when invoked, displays a completion message, and signals an EventWaitHandle to indicate that the application should terminate as the observable has completed. In the general case, the application shouldn't terminate when just one observable does, but here, as the data only has one instance, only one observable to push data samples is created, so the application can safely terminate when it reaches the completed state.

  1. // ReactiveDDSCSSubject\ReactiveDDSCSSubject.cs
  2. var obsGroup = new Subject<Subject<SensorEventArgs>>();
  3. var sevp = new SensorEventPublisherSubject(args, obsGroup);
  4.  
  5. var waitHandle = new EventWaitHandle(false,
  6. EventResetMode.AutoReset);
  7.  
  8. using (obsGroup.Subscribe(obs => obs.Subscribe(
  9. Console.WriteLine,
  10. () =>
  11. {
  12. System.Console.WriteLine("Completed");
  13. waitHandle.Set();
  14. })))
  15. {
  16. sevp.Run();
  17.  
  18. bool signaled = false;
  19. do
  20. {
  21. signaled = waitHandle.WaitOne(TimeSpan.FromSeconds(1));
  22. } while (!signaled);
  23.  
  24. sevp.Stop();
  25. }

The output of the subject-based example, via the invocation of the Test\testReactiveDDSCSSubject.pl script, is the same as the event-based example, save for the additional Completed message following the data samples.

RXCPP

We can do the same with RxCpp, by updating a group of observables. The on_data_available() method of the data reader, given the observable group and map, first obtains the observable for the instance from the map, creating the observable if it is not found:

  1. // ReactiveDDSCPP\ReactiveDDSCPP.cpp
  2. namespace rxsub = rxcpp::subjects;
  3. ...
  4. if (dr) {
  5. Sensor::SensorData sample;
  6. DDS::ReturnCode_t error = dr->take_next_sample(sample, info);
  7. if (error == DDS::RETCODE_OK) {
  8.  
  9. // if instance is not found in the map, create a new
  10. // observable for it and push it out
  11. auto obs = rxsub::subject<Sensor::SensorData>();
  12. auto obsIt = _obsMap.find(info.instance_handle);
  13. if (obsIt == _obsMap.end()) {
  14. _obsGroup.get_subscriber().on_next(obs);
  15. _obsMap[info.instance_handle] = obs;
  16. }
  17. else
  18. obs = obsIt->second;

Then, if the data is valid, the observable is updated with the data via on_next(), otherwise, if the instance has been disposed, mark the observable as completed with on_completed().

  1. // update the observable
  2. if (info.valid_data) {
  3. // data is valid - call OnNext with the data
  4. obs.get_subscriber().on_next(sample);
  5. }
  6. else if (info.instance_state ==
  7. DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) {
  8. // no more data - call OnCompleted on the observable
  9. obs.get_subscriber().on_completed();
  10. }

Lastly, if the sample could not be taken, set an error state in all observables viaon_error().

  1. }
  2. else {
  3. // can't take the sample - assume we're dead and set
  4. // an error state on all observables
  5. std::exception_ptr ex = std::make_exception_ptr(
  6. std::runtime_error("take_next_sample() failed"));
  7. std::for_each(_obsMap.begin(), _obsMap.end(),
  8. [ex](std::pair<const DDS::InstanceHandle_t,
  9. rxsub::subject<Sensor::SensorData>> &obs){
  10. obs.second.get_subscriber().on_error(ex);
  11. });
  12. }
  13. }

Subscribing to the observable group is done in a similar way, as well. Each instance is subscribed to also by providing two functions, one to handle OnNext() and the other to handle OnCompleted(), with default behavior for the error state.

  1. // ReactiveDDSCPP\ReactiveDDSCPP.cpp
  2. rxsub::subject<rxsub::subject<Sensor::SensorData>> obsGroup;
  3. DDS::DataReaderListener_var listener(
  4. new DataReaderListenerImpl(obsGroup));
  5. ...
  6. obsGroup.get_observable()
  7. .subscribe(
  8. [](rxsub::subject<Sensor::SensorData> &obs) {
  9. obs.get_observable().subscribe(
  10. [](Sensor::SensorData &sd) {
  11. time_t date = sd.date;
  12. std::string cdate(ctime(&date));
  13. cdate.erase(std::remove_if(cdate.begin(), cdate.end(),
  14. [](char c) { return c == 0x0A; }), cdate.end());
  15. std::cout << "[" << sd.sensorID << " " << cdate << " " <<
  16. sd.reading << "]" << std::endl;
  17. },
  18. []() { std::cout << "Completed" << std::endl; }
  19. );
  20. });

The lambda function that pretty prints the data sample uses ctime() to format the date, but strips the extraneous line feed character from the string it returns for nicer display.

The test script Test\testReactiveDDSCPP.pl starts the DCPSInfoRepo, the publisher (SensorPublisher), the C++-based subscriber (ReactiveDDSCPP), and waits for them to terminate. The output of the test looks very similar to the Rx.NET example:

  1. [Temp7 Tue Oct 28 10:30:33 2014 1]
  2. [Temp7 Tue Oct 28 10:30:35 2014 2]
  3. [Temp7 Tue Oct 28 10:30:37 2014 3]
  4. [Temp7 Tue Oct 28 10:30:39 2014 4]
  5. ...
  6. Completed

RXJAVA

We can do the same once again with RxJava. As before, the on_data_available() method of the data reader, given the observable group and map, first obtains the observable for the instance from the map, creating the observable if it is not found. Several types of subjects are available in RxJava, but we use a PublishSubject as it imposes no other constraints on the elements it pushes, such as adding default values, replays, or such.

  1. // ReactiveDDSJ\src\main\java\com\ociweb\DataReaderListenerImpl.java
  2. if (dr != null) {
  3. SensorDataHolder mh = new SensorDataHolder(new SensorData());
  4. SampleInfoHolder sih = new SampleInfoHolder(
  5. new SampleInfo(0, 0, 0,
  6. new DDS.Time_t(), 0, 0, 0, 0, 0, 0, 0, false));
  7. int status = dr.take_next_sample(mh, sih);
  8. if (status == RETCODE_OK.value) {
  9.  
  10. // if instance is not found in the map, create a new
  11. // observable for it and push it out
  12. PublishSubject<SensorData> obs = null;
  13. if (!_obsMap.containsKey(sih.value.instance_handle)) {
  14. obs = PublishSubject.create();
  15. _obsGroup.onNext(obs);
  16. _obsMap.put(sih.value.instance_handle, obs);
  17. } else
  18. obs = _obsMap.get(sih.value.instance_handle);

The lambda function that pretty prints the data sample uses ctime() to format the date, but strips the extraneous line feed character from the string it returns for nicer display.

The test script Test\testReactiveDDSCPP.pl starts the DCPSInfoRepo, the publisher (SensorPublisher), the C++-based subscriber (ReactiveDDSCPP), and waits for them to terminate. The output of the test looks very similar to the Rx.NET example:

  1. [Temp7 Tue Oct 28 10:30:33 2014 1]
  2. [Temp7 Tue Oct 28 10:30:35 2014 2]
  3. [Temp7 Tue Oct 28 10:30:37 2014 3]
  4. [Temp7 Tue Oct 28 10:30:39 2014 4]
  5. ...
  6. Completed

RXJAVA

We can do the same once again with RxJava. As before, the on_data_available() method of the data reader, given the observable group and map, first obtains the observable for the instance from the map, creating the observable if it is not found. Several types of subjects are available in RxJava, but we use a PublishSubject as it imposes no other constraints on the elements it pushes, such as adding default values, replays, or such.

  1. // ReactiveDDSJ\src\main\java\com\ociweb\DataReaderListenerImpl.java
  2. if (dr != null) {
  3. SensorDataHolder mh = new SensorDataHolder(new SensorData());
  4. SampleInfoHolder sih = new SampleInfoHolder(
  5. new SampleInfo(0, 0, 0,
  6. new DDS.Time_t(), 0, 0, 0, 0, 0, 0, 0, false));
  7. int status = dr.take_next_sample(mh, sih);
  8. if (status == RETCODE_OK.value) {
  9.  
  10. // if instance is not found in the map, create a new
  11. // observable for it and push it out
  12. PublishSubject<SensorData> obs = null;
  13. if (!_obsMap.containsKey(sih.value.instance_handle)) {
  14. obs = PublishSubject.create();
  15. _obsGroup.onNext(obs);
  16. _obsMap.put(sih.value.instance_handle, obs);
  17. } else
  18. obs = _obsMap.get(sih.value.instance_handle);

Again, as before, the state of the observable corresponding to the instance is updated with calls to onNext()onCompleted(), and onError() as appropriate.

  1. // update the observable
  2. if (sih.value.valid_data) {
  3. // data is valid - call OnNext with the data
  4. obs.onNext(mh.value);
  5. } else if (sih.value.instance_state ==
  6. NOT_ALIVE_DISPOSED_INSTANCE_STATE.value) {
  7. // no more data - call OnCompleted on the observable
  8. obs.onCompleted();
  9. }
  10.  
  11. } else {
  12. // can't take the sample - assume we're dead and set an
  13. // error state on all observables
  14. Exception e = new IllegalStateException();
  15. for (Integer k : _obsMap.keySet()) {
  16. _obsMap.get(k).onError(e);
  17. }
  18. }
  19. }

Once again, we do the same as above by first subscribing to the observable group, and then to each instance observable. Three handlers must be provided this time, however, due to the parameters of subscribe(). One version of subscribe() requires only an OnNext() handler, but the other needs OnNext()OnError() and OnCompleted() handlers provided in that order. Therefore in order to supply an OnCompleted() handler, all three must be provided. In addition, a handler cannot be null, so an action with an empty body must be given as the OnError() handler.

Also, to format the sample time correctly, a java.util.Date() is created by passing the number of milliseconds since midnight 1/1/1970 UTC as an argument to its constructor.

  1. // ReactiveDDSJ\src\main\java\com\ociweb\ReactiveDDSJ.java
  2. PublishSubject<PublishSubject<SensorData>> obsGroup =
  3. PublishSubject.create();
  4. DataReaderListenerImpl listener =
  5. new DataReaderListenerImpl(obsGroup);
  6. ...
  7. obsGroup.subscribe(new Action1<PublishSubject<SensorData>>() {
  8. @Override
  9. public void call(PublishSubject<SensorData> obs) {
  10. obs.subscribe(
  11. new Action1<SensorData>() {
  12. @Override
  13. public void call(SensorData sd) {
  14. System.out.println("[" + sd.sensorID + " " +
  15. new java.util.Date(sd.date*1000) + " " +
  16. sd.reading + "]");
  17. }
  18. }, new Action1<Throwable>() {
  19. @Override
  20. public void call(Throwable error) {
  21. // no action, but handler can't be null
  22. }
  23. }, new Action0() {
  24. @Override
  25. public void call() {
  26. System.out.println("Completed");
  27. }
  28. });
  29. }
  30. });

If Java 8 lambdas are available, the code becomes much simpler.

  1. // ReactiveDDSJ\src\main\java\com\ociweb\ReactiveDDSJ.java
  2. obsGroup.subscribe((obs) -> {
  3. obs.subscribe(
  4. (sd) -> {
  5. System.out.println("[" + sd.sensorID + " " +
  6. new java.util.Date(sd.date*1000) + " " +
  7. sd.reading + "]");
  8. },
  9. (ex) -> {},
  10. () -> { System.out.println("Completed"); }
  11. );
  12. });

The test script Test\testReactiveDDSJ.pl starts the DCPSInfoRepo, the publisher (SensorPublisher), the Java-based subscriber (ReactiveDDSJ), and waits for them to terminate. The output of the test looks very similar to the previous examples:

  1. [Temp7 Tue Oct 28 10:30:58 CDT 2014 1.0]
  2. [Temp7 Tue Oct 28 10:31:00 CDT 2014 2.0]
  3. [Temp7 Tue Oct 28 10:31:02 CDT 2014 3.0]
  4. [Temp7 Tue Oct 28 10:31:04 CDT 2014 4.0]
  5. [Temp7 Tue Oct 28 10:31:06 CDT 2014 5.0]
  6. ...
  7. Completed

SIMULATING OPENDDS — ABSOLUTE TIME

Although the OpenDDS publisher above simulated a physical sensor, we can simulate incoming data at even a higher level. The result of the above process was to produce an observable that produced 20 values, each two seconds apart. We can take advantage of the generation capability of observers as provided by the various frameworks to mock OpenDDS itself, which can be useful for testing. As described in talks such as this one, this is a general technique. In that talk, Bart de Smet described an observable of points obtained from mouse movements, but as it does not matter to the code where the points themselves originated, as any observable of points, such as one provided in a unit test, would suffice.

RX.NET

Rx.NET provides various overloaded Generate() methods on Observable. The version below takes 5 parameters: A starting value for a numeric index (0), a function that returns true as long as the generation should continue (stop after generating 20 samples), a function that modifies the index (increase it by 1), a function that returns the type produced by the generation (the same SensorEventArgs used previously, though could also be, say, an anonymous class), and lastly, the interval between items generated (2 seconds).

  1. // ReactiveCS\ReactiveCS.cs
  2. var o = Observable.Generate(
  3. 0,
  4. i => i < 20,
  5. i => i + 1,
  6. i => new SensorEventLib.SensorEventArgs(
  7. "Temp7", DateTime.Now, i%10),
  8. i => TimeSpan.FromSeconds(2)
  9. );

RXCPP

For RxCpp, we could use the SensorData struct as generated from the IDL, though, for this example, we'll create a similar structure.

  1. // ReactiveCPP\ReactiveCPP.cpp
  2. typedef std::chrono::system_clock clk;
  3. typedef std::chrono::time_point<clk> tp;
  4.  
  5. struct SensorData {
  6. std::string _sensorID;
  7. tp _date;
  8. double _reading;
  9.  
  10. SensorData(const std::string &sensorID, const tp &date,
  11. const double reading) :
  12. _sensorID(sensorID), _date(date), _reading(reading) {}
  13. };

RxCpp provides an interval() method of observable<> which can be used for time-based generation. It produces an increasing count, as Observable.Generate() did above, but without end. We previously have seen the projection operator (via Select() as used in the Rx.NET OpenDDS example), but an additional operator is needed here. The take() operator limits the number of samples from an observable to the specified number, and map(), the equivalent of Select(), applies a function to each element of the generated sequence and is used here to convert the sequence into a different type. Here, map() is used to transform the observable from producing a sequence of int to a sequence of SensorData.

  1. // ReactiveCPP\ReactiveCPP.cpp
  2. auto sc = rxcpp::schedulers::make_current_thread();
  3. auto so = rxcpp::synchronize_in_one_worker(sc);
  4. auto o =
  5. rxcpp::observable<>::interval(sc.now(),
  6. std::chrono::seconds(2), so)
  7. .take(20)
  8. .map([](int i) {
  9. return SensorData("Temp7", clk::now(), i%10);
  10. });

RXJAVA

Simulating the OpenDDS sample generation in RxJava is very similar to that of RxCpp. As before, we could have used the SensorData class generated from the IDL, but we'll use a custom one for convenience.

  1. // ReactiveJ\src\main\java\com\oci\SensorData.java
  2. public class SensorData {
  3. public final String sensorID;
  4. public final Date date;
  5. public final double reading;
  6.  
  7. public SensorData(String sensorID, Date date,
  8. double reading) {
  9. this.sensorID = sensorID;
  10. this.date = date;
  11. this.reading = reading;
  12. }
  13. }

Creating a time-based observable follows the same pattern as in RxCpp. The interval() method of Observable generates an observable that generates a sequence of integers, with each integer at two second intervals. The take() operator limits the sequence to 20 elements, and the map() operator generates an instance of SensorData corresponding to each integer.

  1. // ReactiveJ\src\main\java\com\oci\ReactiveJ.java
  2. Observable<SensorData> obs =
  3. Observable.interval(2, TimeUnit.SECONDS)
  4. .take(20)
  5. .map(new Func1<Long, SensorData>() {
  6. @Override
  7. public SensorData call(Long i) {
  8. return new SensorData("Temp7", new Date(), i%10);
  9. }
  10. });
  11.  
  12. obs.subscribe(new Action1<SensorData>() {
  13. @Override
  14. public void call(SensorData o) {
  15. System.out.println("[" + o.sensorID + " " + o.date +
  16. " " + o.reading + "]");
  17. }
  18. });

SIMULATING OPENDDS — VIRTUAL TIME

Implicit in the code above is the use of a scheduler, a mechanism that controls when subscriptions start, notifications are published, and provides a notion of time. By default, a scheduler is used that is appropriate for the context (such as by using the current thread of the process), and provides a real time clock. Rx.NET also provides a virtual time scheduler in which "time" can pass at a rate managed by a test environment. A clock time, specified in ticks (one ten-millionth of a second), is supplied as an argument to OnNext()OnCompleted(), or OnError() to indicate when the notification is produced. These times are used to only sequence the operations, rather than, as with the timed sequence generation above, to execute at a specific real-world clock time. Long running observables, by the use of a virtual time scheduler, can now be tested at expected unit test speeds.

In Rx.NET, a useful virtual time scheduler is the TestScheduler, with methods to create observables and observers that "tick" at the virtual time rate. Two types of observables can be created from a TestScheduler, a cold observable and a hot observable. A cold observable publishes items only when an observer has subscribed, while a hot observable publishes regardless. While a hot observable may more closely model a real-world device, such as a temperature sensor that publishes a data reading every minute no matter whether anyone is listening, the use of a cold observable in a unit test eliminates any timing issues in the test setup, to ensure no samples are lost before processing can be performed.

Creating an observable from a TestScheduler is done by providing a list of observer notifications in the call to CreateHotObservable() or CreateColdObservable(). For instance, an observable that produces the integer value 42 and then terminates can be modeled as:

  1. // ReactiveCS\ReactiveCS.cs
  2. var scheduler = new TestScheduler();
  3. var xs = scheduler.CreateColdObservable(
  4. new Recorded<Notification<int>>(100,
  5. Notification.CreateOnNext(42)),
  6. new Recorded<Notification<int>>(200,
  7. Notification.CreateOnCompleted<int>())
  8. );

The syntax can be simplified by executing the code from within a class derived from ReactiveTest.

  1. // ReactiveCS\ReactiveCS.cs
  2. public class Tests : ReactiveTest {
  3. ...
  4. public void Produce42() {
  5. var scheduler = new TestScheduler();
  6. var xs = scheduler.CreateColdObservable(
  7. OnNext(100, 42),
  8. OnCompleted<int>(200)
  9. );
  10. ...

So, to simulate OpenDDS, we can do the above, but, rather than a sequence of integers, produce a sequence of SensorData or SensorEventArgs structures instead.

  1. // ReactiveCS\ReactiveCS.cs
  2. var scheduler = new TestScheduler();
  3. var xs = scheduler.CreateColdObservable(
  4. OnNext(100, new SensorEventArgs("Temp7", new DateTime(100),
  5. 42.0)),
  6. OnCompleted<SensorEventArgs>(200)
  7. );

In addition, to simulate real-world times, tick values corresponding to meaningful dates should be used. Small numbers such as 100 and 200 are handy for clarity in simple tests, but an expression such as new DateTime(2014, 1, 1, 0, 0, 0).Ticks can be used instead to obtain the tick count corresponding to a point in time that would match the OpenDDS sample data being simulated. In this case, using this expression to obtain the number of ticks corresponding to midnight on January 1, 2014 is clearer than hardcoding the value 635241312000000000.

CONCLUSION

Once an OpenDDS data sample stream is transformed into an observable, it can be treated in the same manner as observables created from other sources of data in the application: mouse movements, file reads, database query result returns, and the like. That is, once these disparate types are all converted into observables, they can be subscribed to, transformed, filtered, and otherwise manipulated in a uniform way. This can lead to an application which is more streamlined, simpler to understand, and can provide the user with greater responsiveness.

While this article has shown how to transform OpenDDS data streams into observables, Part II will describe various operators that manipulate reactive streams, demonstrating the full power of reactive programming.

REFERENCES

secret